Red Architecture — красная кнопка помощи для сложных и запутанных систем — часть 3 (многопоточность нам в помощь) +2


Заключительную часть описания Red Architecture посвятим многопоточности. Ради справедливости стоит сказать, что начальный вариант класса v нельзя считать оптимальным, так как в нём ничего нет для решения одной из главных проблем к которой неминуемо приходят разработчики real world приложений. Для полного понимания текущей статьи необходимо познакомиться с концепцией Red Architecture здесь.

Red Architecture

Забегая вперёд скажу, что нам удастся решить все проблемы многопоточности не выходя за пределы класса v. Причём изменений будет гораздо меньше чем могло показаться, и в итоге код класса v с полностью решёнными проблемами многопоточности будет состоять из немногим более 50 строк! Причём эти 50 с небольшим строк будут более оптимальны, чем вариант класса v, описанный в первой части. При этом конкретный код, решающий проблемы синхронизации потоков займёт всего лишь 20 строк!

По ходу текста мы будем разбирать отдельные строки из листинга законченных классов v и Tests, которые приведены в конце данной статьи.

Где можно применить Red Architecture?


Хочу подчеркнуть, что приведённые здесь примеры, как и вся концепция Red Architecture, предлагается для использования на всех возможных языках и платформах. С#/Xamarin и платформа .NET выбраны для демонстрации Red Architecture исходя из моих личных предпочтений, не более того.

Два варианта класса v


У нас будет два варианта класса v. Второй вариант, идентичный по функционалу и способу использования первому, будет устроен несколько сложнее. Зато его можно будет использовать не только в “стандартном” C# .NET окружении, но и в PCL окружении Xamarin, а значит — для мобильной разработки сразу под три платформы: iOS, Android, Windows 10 Mobile. Дело в том, что в PCL окружении фреймворка Xamarin не доступны потокобезопасные (thread safe) коллекции, поэтому вариант класса v для Xamarin/PCL будет содержать больше кода для синхронизации потоков. Именно его мы и рассмотрим в данной статье, поскольку упрощённый вариант класса v (тоже есть в конце данной статьи) представляет меньшую ценность с точки зрения понимания проблем многопоточности и способов их решения.

Чуть-чуть оптимизации


Прежде всего мы избавимся от базового класса и сделаем класс v самодостаточным. Нам не нужен механизм нотификации базового класса, который мы использовали до текущего момента. Унаследованный механизм не позволяет решить проблемы многопоточности оптимальным способом. Поэтому мы теперь “сами” будем рассылать события функциям-обработчикам:

static Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>>(); 
// ...
foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
    lock(handlr)
        try
        {
            handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) })); 

В методе Add() в цикле foreach мы копируем элементы из HashSet'a в List и итерация происходит уже по листу, а не хешсету. Нам необходимо это сделать, поскольку значение, возвращаемое выражением handlersMap[key] является глобальной переменной, доступной из публичных мутирующих состояние класса методов, таких как m() и h(), следовательно, возможна ситуация, когда HashMap возвращаемый выражением handlersMap[key] будет модифицирован другим потоком во время итерации по нему в методе Add(), а это вызовет эксепшн рантайма, поскольку пока не закончена итерация по коллекции внутри foreach, её (коллекции) модификация запрещена. Именно поэтому мы «подставляем» для итерации не глобальную переменную, а List в который скопированы элементы глобального HashSet'a.

Но этой защиты недостаточно. В выражении

new List<NotifyCollectionChangedEventHandler>(handlersMap[key]) 

к значению (хешсету) handlersMap[key] неявно применяется операция копирования. Это определённо вызовет проблемы если в период между началом и окончанием операции копирования какой-нибудь другой поток попытается добавить или удалить элемент в копируемом хешсете. Поэтому мы ставим лок (Monitor.Enter(handlersMap[key])) на данный хешсет непосредственно перед началом foreach

Monitor.Enter(handlersMap[key]);
foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
{ 
// ...

и “релизим” (Monitor.Exit(handlersMap[key])) лок сразу после входа в цикл foreach

            
foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
{
    if (Monitor.IsEntered(handlersMap[key]))
    {
        Monitor.PulseAll(handlersMap[key]);
        Monitor.Exit(handlersMap[key]);
    }
    // ...

По правилам объекта Monitor количество вызовов Enter() должно соответствовать количеству вызовов Exit(), поэтому у нас есть проверка if (Monitor.IsEntered(handlersMap[key])) которая гарантирует, что если лок был установлен, то мы из него выйдем только один раз, в начале первой итерации цикла foreach. Сразу после строки Monitor.Exit(handlersMap[key]) хешсет handlersMap[key] станет снова доступен для использования другими потоками. Таким образом мы ограничиваем блокировку хешсета минимально возможным временем, можно сказать, что в данном случае хешсет будет заблокирован буквально на мгновение.

Сразу после цикла foreach мы видим повторение кода освобождения лока.

// ...
if (Monitor.IsEntered(handlersMap[key]))
{
     Monitor.PulseAll(handlersMap[key]);
     Monitor.Exit(handlersMap[key]);
}
// ...

Этот код необходим на тот случай, если в foreach не произошло ни одной итерации, что возможно, когда для какого-то из ключей в соответствующем ему хешсете не будет ни одного обработчика.

Следующий код требует развёрнутого пояснения:

 lock(handlr)
    try {
        // ...

Дело в том, что в концепции Red Architecture единственным объектом созданным за пределами класса v и требующим синхронизации потоков являются функции-обработчики. Если бы мы не могли управлять кодом, который вызывает наши функции обработчики, нам бы пришлось в каждом обработчике “городить” что-нибудь вроде

void OnEvent(object sender, NotifyCollectionChangedEventArgs e)
{
    lock(OnEvent);
    
    // полезный код метода
    
    unlock(OnEvent);
}

Обратите внимание на строки lock() unlock() между которыми находится полезный код метода. Если внутри обработчика модифицируются данные, являющиеся по отношению к нему внешними, то lock() и unlock() было бы необходимо добавить. Потому что одновременно входящие в эту функцию потоки будут менять значения внешних переменных в хаотичном порядке.

Но вместо этого мы добавили всего одну строку на всю программу — lock(handlr), причём сделали это внутри класса v не трогая ничего за его пределами! Теперь мы можем писать сколько угодно функций обработчиков не задумываясь об их потокобезопасности, поскольку реализация класса v нам гарантирует, что только один поток может войти в данный конкретный обработчик, другие потоки будут “стоять” на lock(handlr) и ждать окончания выполнения работы в данном обработчике предыдущим вошедшим в него потоком.

foreach, for(;;) и многопоточность


В листинге Tests (в конце статьи) есть метод foreachTest(string[] a) проверяющий работу цикла for(;;) во время одновременного входа в этот метод и, следовательно, в цикл for(;;) двух потоков. Ниже приведена возможная часть вывода этого метода:

//…
~: string20
~: string21
~: string22
~: astring38
~: astring39
~: string23
~: string24
~: astring40
~: astring41
~: string25
~: astring42
~: string26
~: astring43
~: astring44
~: string27
~: astring45
~: string28
//…

Мы видим, что несмотря на перемешанный вывод строк “string” и “astring”, числовой суффикс у каждой из строк идёт по порядку, т.е. для вывода каждой из строк локальная переменная i берётся верная. Такой вывод говорит о том, что одновременный вход двух потоков в for(;;) является безопасным. Вероятно, все переменные, объявляемые в рамках конструкции for(;;), например, пременная int i, создаются на стеке того потока, который вошёл в for(;;). Именно поэтому доступ к переменным, создаваемым внутри for(;;) не нуждается в “ручной” синхронизации, поскольку они и так доступны только тому потоку, в стеке которого были созданы. Так дела обстоят на С# и платформе .NET. На других языках, хоть и маловероятно, может быть иное поведение, поэтому такой тест не будет лишним.

try… catch — норма, а не исключение


try… catch На первый взгляд эта конструкция кажется не нужной, однако она имеет важное значение. Она призвана защитить нас от ситуации, когда на момент вызова handlr.Invoke() объект в котором определён handlr был уничтожен. Уничтожение объекта может быть произведено другим потоком либо сборщиком мусора в любой момент между строками

foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))

и

handlr.Invoke();

В обработке исключения — блоке catch мы проверяем, если хендлер сслыается на нулевой (удалённый) объект, мы просто его удаляем из списков обработчиков.
?
lock (handlr)
try
{
    handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) }));
    
#if __tests__
    /* check modification of global collection of handlers for a key while iteration through its copy */
    handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { });
#endif
}
catch (Exception e)
{
    if (e is NullReferenceException)
        // handler invalid, remove it
        m(handlr);
}

Инициализация класса v


Статический конструктор — одна из отличительных особенностей C#. Его нельзя вызвать напрямую. Он вызывается автоматически только однажды, перед созданием первого объекта данного класса. Мы используем его для инициализации handlersMap — для всех ключей из k подготавливаем для использования пустые HashSet'ы, предназначенные для хранения функций-обработчиков каждого из ключей. В отсутствие статического конструктора в других языках подойдёт любой метод инициализирующий объект.

   
static v()
{
    foreach (k e in Enum.GetValues(typeof(k)))
    handlersMap[e] = new HashSet<NotifyCollectionChangedEventHandler>();
    
    new Tests().run();
}

Как быть с thread unsafe коллекцией?


C# класс HashSet не обеспечивает синхронизацию при модификации из нескольких потоков (не thread safe), поэтому мы должны синхронизировать модификацию данного объекта, а именно удаление и добавление элементов. В нашем случае для этого достаточно добавить одну строку lock(handlersMap[key]) непосредственно перед операцией удаления/добавления элемента в методах m(), h() класса v.? В этом случае блокирующим поток объектом у нас будет объект HashMap ассоциированный с данным конкретным ключом key. Это обеспечит возможность модификации данного конкретного хешсета только одним потоком.

«Побочные эффекты» многопоточности


Стоит упомянуть о некоторых «побочных эффектах» мультипоточности. В частности, код функций-обработчиков должен быть готов к тому, что в некоторых случаях он будет вызван уже после «отписки» функции-обработчика от получения событий. То есть после вызова m(key, handler) handler в течение некоторого времени (вероятно, считанных долей секунды) всё ещё может быть вызван. Это возможно из-за того, что на момент вызова handlersMap[key].Remove(handler) в методе m(), данный хендлер может быть уже скопирован другим потоком в строке foreach (var handlr in new List(handlersMap[key]))?, и будет вызван в методе Add() класса v уже после своего удаления в методе m().

Простые правила для решения сложных проблем


Напоследок хочу обратить внимание на то, что мы, будучи прилежными разработчиками, не нарушаем соглашений по использованию локов. В частности, такие соглашения указаны на этой странице в разделе Remarks docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement. Они общие для всех языков, не только для C#. Суть этих соглашений в следующем:

  • Не использовать публичные типы в качестве объекта для лока.

Мы используем для локов 2 типа объектов и оба приватные. Первый тип — объект HashSet, который является приватным для класса v. Второй тип — объект типа функция-обработчик. Функции обработчики объявляются приватными во всех объектах, которые их объявляют и используют для получения событий. В случае с Red Architecture только класс v должен вызывать функции обработчики напрямую, и ничто другое.

Листинги


Ниже приведён законченный код классов v и Tests. В C# вы можете использовать их прямо скопировав отсюда. “Перевод” этого кода на другие языки будет для вас небольшой и занимательной задачей.

Ниже приведён код “универсального” класса v, который может быть использован в том числе в проектах мобильных приложений на платформе Xamarin/C#.

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading;

namespace Common
{
    public enum k {OnMessageEdit, MessageEdit, MessageReply, Unused, MessageSendProgress, OnMessageSendProgress, OnIsTyping, IsTyping, MessageSend, JoinRoom, OnMessageReceived, OnlineStatus, OnUpdateUserOnlineStatus }
    
    public class v
    {
        static Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, HashSet<NotifyCollectionChangedEventHandler>>();
        
        public static void h(k[] keys, NotifyCollectionChangedEventHandler handler)
        {
            foreach (var key in keys)
            lock(handlersMap[key])
            handlersMap[key].Add(handler);
        }
        
        public static void m(NotifyCollectionChangedEventHandler handler)
        {
            foreach (k key in Enum.GetValues(typeof(k)))
            lock(handlersMap[key])
            handlersMap[key].Remove(handler);
        }
        
        public static void Add(k key, object o)
        {
            Monitor.Enter(handlersMap[key]);
            foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
            {
                if (Monitor.IsEntered(handlersMap[key]))
                {
                    Monitor.PulseAll(handlersMap[key]);
                    Monitor.Exit(handlersMap[key]);
                }
                
                lock (handlr)
                try
                {
                    handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) }));
                    
#if __tests__
                    /* check modification of global collection of handlers for a key while iteration through its copy */
                    handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { });
#endif
                }
                catch (Exception e)
                {
                    if (e is NullReferenceException)
                        // handler invalid, remove it
                        m(handlr);
                }
            }
            
            if (Monitor.IsEntered(handlersMap[key]))
            {
                Monitor.PulseAll(handlersMap[key]);
                Monitor.Exit(handlersMap[key]);
            }
            
        }
        
        static v()
        {
            foreach (k e in Enum.GetValues(typeof(k)))
            handlersMap[e] = new HashSet<NotifyCollectionChangedEventHandler>();
            
            new Tests().run();
        }
    }
}

Ниже приведён код “упрощённого” класса v, который может быть использован на “стандартной” платформе С# .NET. Единственное его отличие от “универсального” собрата — это использование вместо HashMap коллекции ConcurrentBag — типа, предоставляющего “из коробки” синхронизацию потоков при доступе к себе. Использование ConcurrentBag вместо HashSet позволило удалить из класса v большую часть синхронизирующего потоки кода.

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Collections.Concurrent;
using System.Threading;

namespace Common
{
    public enum k { OnMessageEdit, MessageEdit, MessageReply, Unused, MessageSendProgress, OnMessageSendProgress, OnIsTyping, IsTyping, MessageSend, JoinRoom, OnMessageReceived, OnlineStatus, OnUpdateUserOnlineStatus }
    
    public class v
    {
        static Dictionary<k, ConcurrentBag<NotifyCollectionChangedEventHandler>> handlersMap = new Dictionary<k, ConcurrentBag<NotifyCollectionChangedEventHandler>>();
        
        public static void h(k[] keys, NotifyCollectionChangedEventHandler handler)
        {
            foreach (var key in keys)
            handlersMap[key].Add(handler);
        }
        
        public static void m(NotifyCollectionChangedEventHandler handler)
        {
            foreach (k key in Enum.GetValues(typeof(k)))
            handlersMap[key].Remove(handler);
        }
        
        public static void Add(k key, object o)
        {
            foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
            {
                lock (handlr)
                try
                {
                    handlr.Invoke(key, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, new List<KeyValuePair<k, object>>(){ new KeyValuePair<k, object>(key, o) }));
                    
#if __tests__
                    /* check modification of global collection of handlers for a key while iteration through its copy */
                    handlersMap[key].Add((object sender, NotifyCollectionChangedEventArgs e) => { });
#endif
                }
                catch (Exception e)
                {
                    if (e is NullReferenceException)
                        // handler invalid, remove it
                        m(handlr);
                }
            }
        }
        
        static v()
        {
            foreach (k e in Enum.GetValues(typeof(k)))
            handlersMap[e] = new ConcurrentBag<NotifyCollectionChangedEventHandler>();
            
            new Tests().run();
        }
    }
}

Ниже приведён код класса Tests который тестирует многопоточное использование класса v, а также функций обработчиков. Обратите внимание на комментарии. В них много полезной информации по поводу того как работает тестирующий и тестируемый код.

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;

namespace ChatClient.Core.Common
{
    class DeadObject
    {
        void OnEvent(object sender, NotifyCollectionChangedEventArgs e)
        {
            var newItem = (KeyValuePair<k, object>)e.NewItems[0];
            Debug.WriteLine(String.Format("~ OnEvent() of dead object: key: {0} value: {1}", newItem.Key.ToString(), newItem.Value));
        }
        
        public DeadObject()
        {
            v.h(new k[] { k.OnlineStatus }, OnEvent);
        }
        
        ~DeadObject()
        {
            // Accidentally we forgot to call v.m(OnEvent) here, and now v.handlersMap contains reference to "dead" handler
        }
    }
    
    public class Tests
    {
        
        void OnEvent(object sender, NotifyCollectionChangedEventArgs e)
        {
            var newItem = (KeyValuePair<k, object>)e.NewItems[0];
            Debug.WriteLine(String.Format("~ OnEvent(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value));
            
            if (newItem.Key == k.Unused)
            {
                // v.Add(k.Unused, "stack overflow crash"); // reentrant call in current thread causes stack overflow crash. Deadlock doesn't happen, because lock mechanism allows reentrancy for a thread that already has a lock on a particular object
                // Task.Run(() => v.Add(k.Unused, "deadlock")); // the same call in a separate thread don't overflow, but causes infinite recursive loop
            }
        }
        
        void OnEvent2(object sender, NotifyCollectionChangedEventArgs e)
        {
            var newItem = (KeyValuePair<k, object>)e.NewItems[0];
            Debug.WriteLine(String.Format("~ OnEvent2(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value));
        }
        
        void foreachTest(string[] a)
        {
            for (int i = 0; i < a.Length; i++)
            {
                Debug.WriteLine(String.Format("~ : {0}{1}", a[i], i));
            }
        }
        
        async void HandlersLockTester1(object sender, NotifyCollectionChangedEventArgs e)
        {
            var newItem = (KeyValuePair<k, object>)e.NewItems[0];
            Debug.WriteLine(String.Format("~ HandlersLockTester1(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value));
            await Task.Delay(300);
        }
        
        async void HandlersLockTester2(object sender, NotifyCollectionChangedEventArgs e)
        {
            var newItem = (KeyValuePair<k, object>)e.NewItems[0];
            Debug.WriteLine(String.Format("~ HandlersLockTester2(): key: {0} value: {1}", newItem.Key.ToString(), newItem.Value));
        }
        
        public async void run()
        {
            // Direct call for garbage collector - should be called for testing purposes only, not recommended for a business logic of an application
            GC.Collect();
            
            /*
             * == test v.Add()::foreach (var handlr in new List<NotifyCollectionChangedEventHandler>(handlersMap[key]))
             * for two threads entering the foreach loop at the same time and iterating handlers only of its key
             */
            Task t1 = Task.Run(() => { v.Add(k.OnMessageReceived, "this key"); });
            Task t2 = Task.Run(() => { v.Add(k.MessageEdit, "that key"); });
            
            // wait for both threads to complete before executing next test
            await Task.WhenAll(new Task[] { t1, t2 });
            
            
            
            /* For now DeadObject may be already destroyed, so we may test catch block in v class */
            v.Add(k.OnlineStatus, "for dead object");
            
            
            /* test reentrant calls - causes stack overflow or infinite loop, depending on code at OnEvent::if(newItem.Key == k.Unused) clause */
            v.Add(k.Unused, 'a');
            
            
            /* testing foreach loop entering multiple threads */
            var s = Enumerable.Repeat("string", 200).ToArray();
            var n = Enumerable.Repeat("astring", 200).ToArray();
            t1 = Task.Run(() => { foreachTest(s); });
            t2 = Task.Run(() => { foreachTest(n); });
            
            // wait for both threads to complete before executing next test
            await Task.WhenAll(new Task[] { t1, t2 });
            
            
            /* testing lock(handlr) in Add() method of class v */
            v.h(new k[] { k.IsTyping }, HandlersLockTester1);
            v.h(new k[] { k.JoinRoom }, HandlersLockTester2);
            
            // line 1
            Task.Run(() => { v.Add(k.IsTyping, "first thread for the same handler"); });
            // line 2
            Task.Run(() => { v.Add(k.IsTyping, "second thread for the same handler"); });
            // line below will MOST OF THE TIMES complete executing before the line 2 above, because line 2 will wait completion of line 1
            // since both previous lines 1 and 2 are calling the same handler, access to which is synchronized by lock(handlr) in Add() method of class v
            Task.Run(() => { v.Add(k.JoinRoom, "third thread for other handler"); });
        }
        
        
        public Tests()
        {
            // add OnEvent for each key
            v.h(new k[] { k.OnMessageReceived, k.MessageEdit, k.Unused }, OnEvent);
            
            // add OnEvent2 for each key
            v.h(new k[] { k.Unused, k.OnMessageReceived, k.MessageEdit }, OnEvent2);
            
            /* == test try catch blocks in v class, when handler is destroyed before handlr.Invoke() called */
            var ddo = new DeadObject();
            // then try to delete object, setting its value to null. We are in a managed environment, so we can't directly manage life cicle of an object.
            ddo = null;
        }
    }
}

Код, регистрирующий функцию-обработчик, а также сама функция-обработчик для такого класса v могли бы выглядеть следующим образом:

код регистрации функции обработчика

// add OnEvent for each key
v.h(new k[] { k.OnMessageReceived, k.MessageEdit, k.Unused }, OnEvent); 

код функции-обработчика

void OnEvent(object sender, NotifyCollectionChangedEventArgs e)
{
    var newItem = (KeyValuePair<k, object>)e.NewItems[0];
    Debug.Write("~ OnEvent(): key {0} value {1}", newItem.Key.ToString(), newItem.Value);
}

Общее описание Red Architecture находится здесь.




К сожалению, не доступен сервер mySQL