Дао интеграции Сбербанка: от локальных сетей к Kafka и потоковой разработке +41


Привет, Хабр! Меня зовут Михаил Голованов, в Сбертехе я занимаюсь технической архитектурой и перспективными разработками. У нас, как и у любого современного банка, есть множество систем, которые поддерживают разные стороны работы банка: вклады, счета, зачисление денег, кредитование, финансовые рынки, акции и т.д. Всякий раз, когда появляется какая-то новая система, мы начинаем следующий уровень увлекательной игры под названием «Интеграция». И каждый следующий уровень сложнее предыдущего — ведь систем нужно охватывать все больше и больше. Этот пост — то, что в геймерских кругах именуется walkthrough: сначала мы пробежимся по локальным сетям и затем через очереди сообщений перейдем к масштабному этапу потоковых вычислений посредством Apache Kafka в широко распределенных сетях.  



Сначала немного теории — перечислим, что мы оцениваем в интеграции с учетом банковской специфики:

  • Производительность. Все просто: чем выше, тем лучше.
  • Латентность — задержки передачи информации. Допустимая латентность зависит от того, с какими системами мы имеем дело. Если вы придете в банкомат снять деньги с карты, то лишняя секунда погоды не сделает. А если ждать заставят 30 секунд, то вряд ли вам это понравится. Бывают операции, в которых латентность не так важна. Если вы подаете заявку на кредит, то вполне можете подождать решения десять минут — и тут 30 секунд не принципиальны. Но в принципе, чем ниже, тем лучше.
  • Масштабирование. Масштабирование бывает двух видов. При вертикальном масштабировании вы добавляете мощностей на одной машине, и у вас  увеличивается производительность. При горизонтальном — ставите рядом с машиной еще n-ное количество таких же.
  • Отказоустойчивость. Вот это для нас очень важно. Если в банке что-то отказывает и клиенты не обслуживаются — это очень плохо для всех. Сюда можно приписать еще один важный показатель — время восстановления.
  • Консистентность. Предположим, зачисление денег прошло, а списание нет. А вам нужно подбивать баланс. Второй пример: вы отправляете перевод, и у вас деньги с карточки списались, а тому человеку, которому вы их перечисляете, они не зачислились. Это значит, что система находится в неконсистентном состоянии. И вызывает массу неудобств. Очень желательно, чтобы все данные находилась в консистентном состоянии.

Начало истории


Первым был этап локальных сетей — формирования классической двухзвенной архитектуры и рассвет серверов баз данных (MS SQL, Oracle и других). В Сбербанке был большой, мощный сервер базы данных, который обслуживал всю организацию. Клиентские машины по локальной сети подключались к нему, получали и записывали информацию.

Затем началось активное распространение интернет-технологий. Количество пользователей бизнес-приложений стало очень быстро расти. Мы уперлись в возможности сервера базы данных и перешли к трехзвенной схеме. База данных выполняла роль хранилища. В сервере приложений находилась бизнес-логика — правила манипуляции информацией. Тонкий клиент — браузер — подключался к серверу приложений и взаимодействовал с конечным пользователем.



У этой архитектуры есть много преимуществ:

  • Не нужно ставить на машины клиентское ПО и обновлять программы — достаточно обновить сервер приложений и базу данных, и сразу всем клиентам становилась доступна новая версия.
  • База данных реляционно хранит все данные организации —  за счет проверки ключей и поддержки транзакций мы автоматически получаем консистентность.
  • JEE-сервера приложений хорошо кластеризуются, масштабируются и берут на себя большую часть работы по бизнес-логике.
  • Интерфейсы веб-приложений на основе JavaScript по насыщенности и возможностям приближаются к нативным.

Лет 10-15 назад это было очень круто и сильно облегчило жизнь.

Следующим камнем преткновения стало синхронное взаимодействие составляющих системы. Клиент запрашивал информацию с сервера приложений и блокировался — ждал, пока не получит ответ. И серверные компоненты также ожидали ответов друг от друга. Здесь производительность серьезно проседала.

Для решения проблем с синхронным обменом используется промежуточное ПО для создания очередей сообщений. После записи в такую очередь вызывающий компонент не ждет ответа и может выполнять другую полезную работу. Обрабатывающий компонент читает поступившие в очередь сообщения и формирует ответы. Которые вызывающий компонент слушает в отдельном потоке.



Достоинства такой архитектуры:

  • Асинхронный обмен значительно повышает производительность систем.
  • Если ваш сервер на некоторое время остановился, клиент об этом не узнает. Он будет просто закидывать запросы на обработку, пока очередь доступна. В это время можно быстро и незаметно поднять серверную часть, вычитать из очереди и обработать то, что поступило в течение последних нескольких минут. В итоге клиент практически ничего не заметит.
  • Если централизовать очереди в брокере сообщений, на предприятии можно получить единую точку управления информационными потоками.

Таким образом мы построили нашу внутреннюю архитектуру обработки информации лет семь назад. Все стало замечательно.

Встречаем Kafka


Два года назад мы решили переходить с  платных продуктов крупных вендоров на более функциональный open source. Посмотрели, что можно сделать с очередями сообщений, решили оставить архитектуру интеграции без изменений, а на open source перевести очереди сообщений. Просканировали рынок и наткнулись на Apache Kafka — распределенный программный брокер сообщений с открытым кодом, написанный на Scala и Java (в банке это наш основной технологический стек). Тогда была актуальна Kafka версий 0.8–0.9.

Быстро развернули пилот: производительность Kafka оказалась как минимум в несколько раз выше нашего решения, десятки тысяч сообщений в секунду, а то и больше, около ста. Существующие очереди на том же оборудовании вытягивали в лучшем случае 5-7 тысяч.

В нашем предыдущем Message Queue (MQ) построение кластера требовало много нетривиальных действий. Топология получилась сложная: возникали шлюзы, которые распределяли нагрузку, обеспечивали работу кластера Message Queue и т.д. С Kafka все оказалось проще: ставим новую машину, поднимаем на ней Kafka, прописываем номер узла Kafka в кластере, и узел подключается к кластеру сам. Если нужно отключить машину, то достаточно только остановить Kafka на узле — брокер сам выйдет из кластера. Таким образом легко можно масштабироваться в обе стороны. Масштабирование при этом получаем близкое к линейному: поставите второй брокер — будет обрабатывать в два раза больше, если третий — то в три. На пилоте ставили десять узлов, и эта зависимость сохранялась.

Kafka поддерживает сразу два унифицированных стиля взаимодействия.

  • Point-to-point — кто-то выкладывает обработчику информацию, обработчик ее забирает, и друг с другом взаимодействуют только эти две стороны.  
  • Publish-subscribe — когда кто-то выкладывает информацию, и ее читает сразу много потребителей.

В старых парадигмах JMS это были два разных интерфейса, а в Kafka все унифицировано, и разницы между Point-to-point и Publish-subscribe нет, в том числе и в API для программиста. Это большое упрощение.

В Kafka все сообщения являются персистентными, то есть записываются на диск. В очередях это работает иначе. Когда все сообщения в очереди хранятся в оперативной памяти, все работает достаточно быстро — несколько тысяч сообщений в секунду. Как только мы включаем режим персистивности — запись сообщений на диск — производительность падает в несколько раз. Однако без этого режима не обойтись, ведь информация в оперативной памяти стирается, как только выключается машина. А у нас немало данных, которые не хочется терять — например, данные о переводе денег. В Kafka сообщения персистентные «из коробки», и при этом все работает быстро.

Переезжаем с JMS на Kafka


Быстрее, удобней, да еще и бесплатно. Ура, бросаем JMS, переезжаем на Kafka! Раньше у нас были обычные очереди, а теперь Kafka-топики. Суть та же: записали в топик и забыли, а кто-то с другой стороны асинхронно читает.


Как это все устроено внутри? Кафка, по сути дела, это append-only distributed log, то есть лог, запись в который всегда идет в конец. Для обеспечения масштабирования топик разделен на партиции. В каждой партиции всегда есть start offset (номер первого записанного сообщения) и end offset (номер последнего записанного сообщения). Запись всегда происходит в конец лога, и номера сообщений непрерывно монотонно увеличиваются. Последовательная запись на диск осуществляется с хорошей скоростью и обеспечивает персистентность — в отличие от записи в произвольный участок файла, особенно медленной на HDD.

Что происходит на стороне читателя? Экземпляру читателя при создании присваивается группа, в соответствии с которой читатель  подписываются на топики — наборы партиций (логов).

Читатель бесконечно осуществляет вызов (poll), то есть запрашивает у Кафки данные. Если что-то было записано, Kafka эти данные отдает. Читатель их обрабатывает, в ответ сообщает commit, и указатель перемещается вперед на одно сообщение. Так в топике можно сделать много партиций и посадить много читателей. Одна партиция в одной группе читается одним читателем, и это достаточно простая и понятная схема масштабирования. Если мы хотим, чтобы все работало быстрее, мы увеличиваем количество партиций в топике и читателей в группе, и за счет распараллеливания все работает быстрее. В Kafka писатели называются продьюсерами, а читатели консьюмерами.

Проблемы с пачками сообщений


Опытный стенд на Kafka радовал всем, кроме кое-какого неприятного момента: одни и те же сообщения начали дублироваться. Сразу подумали, что дело в том, кто записывает. Но нет, запись шла один раз, а чтение — два, три, порой даже четыре раза. В результате в Кафке падала производительность и появлялось большое количество дублей.


Оказывается, группы читателей работают немного по-своему. В рамках одного цикла опроса обработчик получает сразу пачку сообщений. Дальше он начинает ее обрабатывать и должен коммитить обработку сообщений. В нашем случае в обработке были не только одиночные сообщения — некоторые собирали в себе пачку информации, много бизнес-событий. Обработчик получает такую пачку, например, из 200 событий и последовательно начинает обрабатывать каждое из них.

Тем временем брокер начинает отсчет тайм-аута — некоторого интервала времени, после которого, не получив коммит обработки сообщения, он начинает считать обработчика мертвым, выбрасывает его из группы и ставит взамен одного из живых. Если за один poll приходило две-три большие пачки сообщений, время тайм-аута часто истекало и консьюмер выкидывало. На Кафке начинался ребаланс — перестроение группы консьюмеров, когда присоединяли нового или, как в нашем случае, выбрасывали старого. Вместо якобы мертвого Kafka подставляла соседнего, якобы живого консьюмера. Пачки сообщений начинали по кругу убивать всю группу. Через некоторое время в ней, по мнению сервера, вообще не оставалось живых консьюмеров, и чтение прекращалось.

Что делать? Первый вариант: давайте не будем передавать пачками. Но система, которая их передавала, иначе работать не могла, потому что была не очень онлайновой. Может, создадим отдельный пул сообщений? Но тогда мы разорвем чтение и обработку сообщений, и схема станет хрупкой.

В этот момент как нельзя кстати вышла десятая версия Kafka с несколькими очень полезными для нас фичами:

  • KIP-62 – heartbeat в отдельном потоке. Раньше подтверждение того, что обработчик живой и сама обработка шли в одном потоке. В десятой Kafka ввели отдельное сообщение «я живой», которое можно кидать не в основном а в отдельном фоновом потоке. И идут эти сообщения гораздо быстрее, чем обработка основной пачки. Ребаланса не происходит, и мы можем довольно долго прокручивать большие сообщения.
  • KIP-41 – максимальное число сообщений в одном poll. Раньше оно было ограничено только доступной памятью. Если кто-то много записал, то обработчик мог сразу могли брать 10, 30, 50 сообщений. С десятой Кафки можно установить точное число: вычитываемых сообщений за один poll.
  • Выставление настроек таймаутов.

С новыми настройками система стала работать стабильно, и массовые дубли прекратились. Но все равно не до конца. В этот момент мы поняли: Kafka — это не совсем очередь. Это другая структура данных, партиционированный лог.

В чем разница? В очереди все читатели конкурентно читают сообщения, и поэтому она не всегда упорядочена. В Кафке в рамках партиций чтение идет последовательно, и партиция всегда упорядочена. В очереди сообщения после прочтения удаляются, а в Кафке нет — просто двигается указатель прочитанных сообщений. Через некоторое время (тайм-аут) в Кафке удаляется весь файл, и начинается запись в новый файл (сегмент). Сообщения удаляются пачками, как файлы из файловой системы — получается гораздо менее затратно, чем с очередями, которые удаляют каждое сообщение. И, как мы описали выше, включение/отключение одного из консьюмеров в режиме чтения влияет на остальных. Происходит ребаланс, и в течение некоторого времени брокер не отдает читателям никаких сообщений, пока ребаланс не произойдет.

Добиваем дубли


Урок выучили, систему стабилизировали, дубли свелись к долям процента, но полностью от них мы не избавились. Дубли возникали из-за ребалансов, избежать которых не получалось — как минимум, они происходили при вводе нового консьюмера в топологию, или если Кафка посчитает, что нужно провести какую-нибудь оптимизацию.

Мы стали думать, что делать с оставшимися дублями. Было три варианта:

  • Ничего не делать. Бывают такие задачи, где дубли совсем не страшны. Например, вы мониторите какой-то бизнес-процесс на предмет прохождения определенной стадии. Если вы два раза получили информацию об этом, ничего страшного. Или, например, если клиент запрашивает баланс по своей карте и два раза его получает.
  • Бороться с последствиями. Тут существует два основных подхода: дедупликация и компенсационная логика.  
  • Устранить причину — сделать такую систему, где дубли не будут возникать. Это самый правильный, но самый сложный путь.

Если ничего не делать, то тут ключевым понятием является идемпотентность. Идемпотентность — это если операция повторяется несколько раз, и это никак не влияет на системы с точки зрения стороннего наблюдателя, с точки зрения состояния данных.

Идемподентными могут быть операции чтения или мониторинга. Есть способы сделать бизнес-операции идемподентными, даже если они таковыми не являются. Например, при переводе денег мы генерируем уникальный ID — вводим уникальный идентификатор операции. После первой обработки ID мы поменяли баланс на счете и сохранили информацию об этом. В дальнейшем по ID мы определяем, что перевод уже сделан, и движение денег не производим. Однако для этого надо каждый раз перерабатывать логику, делать анализ задач — в общем, способ затратный.

Второй подход – создание дедупликатора. Можно поставить общее хранилище, и когда сообщение приходит во второй раз, его игнорировать. Но здесь необходимо завести дополнительное хранилище для трафика вызовов — на больших объемах оно может стать точкой отказа и причиной падения производительности. А поскольку обычно хранилище удаленное, мы получаем дополнительно один сетевой вызов и рост латентности. На небольших нагрузках дедупликатор — вполне рабочая схема, но это не наш случай.

Третий подход — сделать компенсацию на уровне бизнес-логики. Наши прикладные программисты должны будут все время помнить, что операция может повториться. А как определить, повторилась ли она из-за интеграции или пользователь действительно каждую секунду пытается перевести кому-то пять рублей? Это трудоемко и может вызывать множество ошибок, поэтому компенсационная логика — крайний вариант.

Была мысль о том, чтобы добавлять к операциям транзакции. Тогда повторные операции будут отклоняться, потому что будет повтор транзакции. В Java есть даже технология распределенной транзакции (XA-транзакция). Однако в Кафке она не поддерживается и вряд ли будет поддерживаться.


Как и ожидалось, остается бороться с причиной.

Переносим commit до обработки


Когда идет ребалансировка группы, то закоммитить сообщение старым консьюмером уже нельзя. Возникает ошибка о том, что этот консьюмер больше не работает с этой партицией. Мы всегда делали commit после обработки, но можно перенести коммит до обработки. Тогда консьюмер прочитает сообщение из Кафки и сразу подтвердит прочтение.

А что если в момент, когда мы уже закоммитили, но еще не обработали, произойдет сбой в обработчике? В этом случае мы потеряем это сообщение, ведь Кафка считает, что уже нам его отдавала, а обработчик еще не отработал до конца. Такая гарантия обработки сообщения называется at most once, то есть «максимум один раз». Ее можно использовать для некоторых, не очень важных операций. Но не для операций, связанных с деньгами,-  ведь потерять перевод никому не хочется.

Назначаем обработчикам топики


Можно не использовать механизм автобалансирующихся групп читателей, а явно назначать каждой партиции обработчик с помощью вызова метода assign. Это когда мы явно говорим: ты обработчик, вот твой топик, вот твоя партиция, работай с ней. В этом случае можно делать ранний коммит — at most once, а можно, для гарантии, поздний — at least once. За счет того, что только один обработчик производит коммиты и обработку, если сильно постараться, можно сделать и exactly once — то есть точно один раз.

Но чем плох assign? Вы прибили гвоздем обработчик к партиции. Теперь, если он умер, с ним надо что-то делать: перезапускать, смотреть, что он последнее обработал, и так далее. Для администратора системы это достаточно трудозатратно: нужно следить, чтобы обработчики были живы, вручную перезапускать их и так далее. То есть мы начинаем делать работу консьюмер-группы. И если в процессе появляется человек, вы можете забыть про быстрое время восстановления системы, поскольку у него сразу появляется желание разобраться, что было обработано, а что – нет. Люди в лучшем случае реагируют за минуты, компьютеры — за доли секунды. Мы получаем exactly once, но сильно теряем в отказоустойчивости. И должны будем очень много тратить на эксплуатацию.

Переоценка распределенных сетей


В итоге окончательное покорение Кафки мы тогда отложили. Вернулись к вопросу через год-полтора. Нас устраивала производительность, масштабируемость, отказоустойчивость. Вот только с консистентностью беда — проклятые дубли. Вряд ли опытные разработчики Кафки могли проигнорировать такую проблему. Возможно, мы неправильно ей пользовались? Нет. Решение скрывалось на еще более глубоком уровне, чем можно было предположить.

Как оказалось, в большой распределенной среде просто не работают некоторые принципы, на которых держалось раньше наше проектирование IT-систем. Этим принципам посвящен труд Л. Питера Дойча «Fallacies of distributed computing», написанный в 1994-1997 гг.

  1. Сеть больше не так надежна, как раньше. Из-за большого числа элементов она не может все время работать быстро и безотказно.
  2. Задержки передачи информации больше не равны нулю, как в локальной сети. Да, скорость доступа между памятью самая высокая. Если же мы связываемся с дисками несколько десятков раз, производительность таким же образом падает. А если еще и с сетью связываемся, замедление бывает и в сто раз. Нельзя пренебрегать задержкой взаимодействия между распределенными компонентами.
  3. Пропускная способность конечна. При больших объемах сети мы быстро упираемся в потолок, особенно при взаимодействии с удаленными серверами.
  4. Сеть больше не безопасна. При работе через интернет невозможно контролировать все, могу где-нибудь что-нибудь взломать.
  5. Топология все время меняется. Какие-то машины все время включаются или выключаются. Среди тысяч серверов Google порядка десятка всегда в неработоспособном состоянии.
  6. Администратор уже не один. Их могут быть сотни, каждый по-своему управляет своей частью системы.

Приняв эти истины, мы сформулировали три основные характеристики распределенной системы:

1. Сбои – это норма

Если на одной машине сбой был экстраординарным событием, то когда у вас много машин, постоянно где-то что-то не работает. Неработоспособность системы – это отклонение от каких-то характеристик или полный выход из строя. Нарушение работоспособности – это отказ. Сбой – это самоустраняющийся отказ. И нам нужно делать такие системы, чтобы мы переживали сбои. Чем больше система, тем отказы в ней разнообразнее и чаще. Нужно добиваться, чтобы отказы превращались в сбои, чтобы отказы самоустранялись. Потому что бегать по всей большой системе и все руками чинить – с ума сойти можно.

2. Координация сложна

Чем больше машин, тем сложнее обеспечить координацию, тем более через сеть. Координация сложна. Задержка по сети между узлами, ненадежность связи, изменчивая топология — бороться с этим нельзя, нужно просто стараться этого избегать. Чем меньше различные части системы координируются между собой, тем лучше. Если они справляются независимо друг от друга, это идеальный вариант.

3. Время неоднородно.

В разных частях системы за счет задержек разное время. И на разных компьютерах тоже разное время. Очень часто при проектировании путают три вещи: время, длительность и порядок сообщений. Например, если вы делаете чат, то важно не конкретное время, а порядок. Если вам отправили вопрос, а вы на него ответили, важно, чтобы все первым увидели вопрос, а вторым – ответ. Бывает что важна длительность, а не порядок. Например, если вы замеряете тайм-аут.


Но хуже всего то, что время плавает. Даже самые точные атомные часы тоже плавают. Обычные кварцевые часы на локальной машине могут сбиваться на пару миллисекунд из-за разогрева машины и других физических причин. Синхронизация времени между машинами дает десятки миллисекунд. С этим нужно смириться и понимать, что в разных частях вашей системы время разное.

С учетом новых условий пришлось переоценить вопрос обработки информации. До этого момента у нас было два основных варианта:

  • Batch. Вы накапливаете определенный массив информации и запускаете «молотилку», которая производит расчеты в оффлайне и выдает результат. Расчет может длиться минуты, часы, дни, но мы можем спокойно переработать большие объемы. Если что-то сломалось или мы поняли, что у нас ошибка в алгоритме, входной массив информации не меняется — это хорошо. Мы можем устранить ошибки, снова запустить и получить ответ, который нас устраивает. Это происходит не в онлайне, и результат всегда детерминирован. Если входной массив и алгоритм не менялись, вы, конечно, получите те же результаты.
  • Request-Reply. Этот онлайн-вариант используется, когда нужно получить результат быстро, как в веб-браузере, например. Вы даете какой-то базе данных запросы и быстро получаете ответ. Но поскольку эти вызовы никак не упорядочены, воспроизвести это вы больше не можете. В следующую секунду состояние базы может измениться, и, кинув тот же запрос, вы получите совершенно другой результат. То есть, результат недетерминирован, но его можно получить быстро.

В каждом случае приходится идти на жертвы. А можно ли и точно, и быстро? Для ряда случаев мы нашли способ.

Потоковая архитектура


Итак, мы живем в распределенной системе со своими особенностями. Kafka имеет ограничения, связанные с дублями. А централизованную реляционную базу нельзя раздувать вечно — есть ограничения в масштабировании. Что делать? Попробуем реализовать часть задач в потоковой архитектуре. Наше знакомство с ней началось со статьи «Introducing Kafka Streams: Stream Processing Made Simple» Джея Крепса, нынешнего CEO Confluent, разработчика Кафки.

Потоковая архитектура базируется на понятии потока. Поток – это упорядоченный по времени набор неизменяемых сообщений. Все, что происходит в нашей системе, последовательно пишется в журнал по мере того, как события происходят во времени. Если событие записано в журнал, его уже нельзя поменять. Если вы создали пользователя, то нельзя вернуться назад и отменить действие. Можно только выложить новое событие «корректировка пользователя» или что-то подобное. В общем, это сильно похоже на нашу жизнь. Когда что-то случается, мы уже не можем вернуться назад и изменить то, что произошло. Мы можем только отреагировать на произошедшее и создать новое событие.

Через потоки событий асинхронно обмениваются сообщениями модули бизнес-логики. Соответственно, если модуль хочет взаимодействовать с новыми модулями, он отправляет событие в их журналы событий. Таким образом, вся система – это модули и взаимосвязь между ними в журналах событий.

Поток событий бесконечен, они строго упорядочены по времени, и этот порядок после записи никогда не меняется. Cостояние модуля – это результат обработки определенного потока. Если мы приведем модуль в начальное состояние X и проиграем определенный поток, мы получим состояние Y. Делая это каждый раз, мы будем получать одно и то же конечное состояние, поскольку начальное состояние фиксировано, поток событий тот же, алгоритм обработки тот же.

Как такую систему масштабировать? С помощью партиций.



В примере выше создано три партиции. События распределяются по ним в соответствии с ключами, которые мы событиям присваиваем. К1–К3 в первой партиции, К4–К6 во второй и К7–К9 в третьей. События внутри одной партиции упорядочены по времени. К каждой партиции привязан обработчик, который последовательно обрабатывает события. Обработал одно — перешел к следующему. То есть он управляет своей локальной базой данных. Состояние обработчика определяется начальным состоянием и потоком. Общая скорость работы системы зависит от количества и скорости обработчиков.

В этой схеме нет центральной базы и координации, потому что каждый обработчик занимается своей партицией и ничего не знает про остальных. Мы просто бросаем в поток события. Разные события могут относиться к изменению одной сущности и быть логически связаны. Если такие события попадут в разные партиции, то из-за независимости обработчиков по времени у нас получится разбежка и результаты лягут не в те обработчики и все будет плохо.

Все логически связанные события должны оказаться в одной партиции, чтобы далее отправиться в один и тот же экземпляр разработчика. Как это сделать? Есть два подхода.

Первый подход — партиционирование потока при записи. Этот метод используется в Kafka Streams. Мы должны из записываемой информации сгенерировать некоторый ключ, по которому определяется партиция. Записав этот ключ, мы можем спокойно работать дальше и быть уверены, что логически связанные события попадут в один обработчик. Недостаток способа: если меняется топология топика, приходится осуществлять репартиционирование — перераспределение данных по партициям, что весьма ресурсоемко.

Второй подход — партиционирование потока при чтении. Его можно реализовать с помощью оператора, доступного в Apache Flink и похожих движках. В нем логически связанные события могут попадать в разные партиции. Но потом весь топик читается одним кластером, который при чтении каждого события вычисляет его ключ. Вычислив ключ, он понимает, где в топологии кластера находится нужный обработчик, и отправляет событие туда. Недостаток способа: дополнительное сетевое взаимодействие. Если мы читаем на одном узле кластера, а обработчик находится на другом узле, то отправлять приходится через сеть.

Между этими способами нельзя сделать однозначный выбор, нужно в каждом конкретном случае учитывать ресурсы и требования.

Отказоустойчивость обработчиков и хранилища


Бывает, что обработка каждого события не зависит от того, какие события приходили раньше и в каком состоянии находится обработчик. В таких задачах обработчику не нужна даже локальная база данных для хранения его состояния. Такой класс задач называется stateless processing.
Чаще всего у нас возникают другие задачи — аналитика, агрегация — которые зависят от тех событий, что были раньше. В этом случае возникает необходимость в хранилище данных. Его можно сделать в виде key-value. Тогда обработчик при обработке каждого события будет помещать необходимые данные агрегации, расчетов и другую историю в один из ключей свего key-value хранилища.

Как обеспечить отказоустойчивость такого хранилища? Можно периодически сбрасывать его данные в какое-нибудь персистентное хранилище — сохранять снэпшоты. В тот же снэпшот мы можем положить и последний offset партиций, который был удачно обработан. С помощью серии таких снэпшотов можно понять, когда в каком состоянии находилась система. Если произойдет сбой в обработке потоков, можно будет просто развернуть подходящий снэпшот и offset, запустить реплей потока, заново обработать кусок лога и снова идти вперед. Такая техника очень полезна в двух случаях:

  • Исправление ошибок в обработчике. Допустим, в какой-то момент времени алгоритмы обработчика допустили ошибку логики, и дальше все события начали обрабатываться неправильно. Тогда мы находим последний снэпшот до ошибки, откатываем систему, ставим обработчик с хорошей, проверенной логикой, проводим реплей потока — вуаля, ошибка нивелирована.
  • Аварийная остановка обработчика. При этом база данных может оставаться в консистентом состоянии. Это поможет обеспечить корректность результатов при перезапуске.

Проблемы могут возникнуть не только с обработчиками. Хорошо бы обеспечить и отказоустойчивость самого хранилища. Для этого есть два подхода:

  1. Асинхронная репликация хранилища на другие узлы. Этот подход использует Kafka Streams. Подход основан на том, что любое key-value хранилище можно превратить в поток и наоборот. Для этого необходимо, чтобы ключи и их порядок в хранилище были независимы. Ключом события является ключ в хранилище, значением — то, что находится в хранилище по ключу. Если на основном узле возникли проблемы, можно обратиться к узлу репликации, поднять там обработчик, он переедет на другой узел, и все будет надежно.
  2. Сохранить состояние хранилища в доступном всему кластеру ресурсе — например, в Hadoop или другом распределенном хранилище. Схема работы примерно такая же, только обработчик читает файловую систему по команде из кластера. Затем загружает данные локально, и все снова работает.

Мы научились делать надежные обработчики и хранилища. Kafka обеспечивает надежное хранение потока. Осталось понять, что делать с консистентностью.

Что делать с консистентностью?


Известно, что каждая партиция потока обрабатывается независимо, своим обработчиком. Какие-то обработчики чуть быстрее, какие-то отстают.


Красными флажками обозначены последние сообщения, которые прочитали обработчики: K2, K4 и K7.

Если остановить время и посмотреть на базы данных обработчиков в какой-то произвольный момент, они будут не совсем консистентны. Но если не прерывать работу обработчиков, а только остановить запись новых сообщений, то рано или поздно все обработчики справятся со всеми событиями своих потоков, и базы данных придут в консистентное состояние. Такая гарантия консистентности называется eventual consistency. Это хуже, чем в реляционных базах данных, где снимок БД будет консистентен в любой момент. Но гораздо лучше, чем полное отсутствие консистентности.

Ослабление консистентности — это то, от чего в нашей архитектуре не убежать. Насколько это критично, нужно смотреть по конкретным задачам. Если разбежки недопустимы, то нужно искать другой вариант, либо модифицировать эту архитектуру: искусственно замедлять более быстрые обработчики. Но это сделает архитектуру очень хрупкой. Те, кому удавалось подобное, расплачивались пропускной способностью и латентностью.

Как нам усилить консистентность? Люди, работавшие с реляционными базами данных, могут предложить сделать распределенные транзакции. В разных партициях берутся разные события, но их обработку мы включаем в одну транзакцию. Как только транзакция коммитится, изменения становятся видны сразу пачкой. Если транзакция отменена, изменения видны не будут.

Идея хорошая. Но проблема в том, что в сильно распределенной системе распределенные транзакции работают очень медленно и плохо. Возникает много точек синхронизации, блокировок, с ростом числа участников транзакции ситуация становится хуже и хуже. В итоге в больших распределенных системах со сложной логикой распределенные транзакции не работают или работают очень медленно.

Агрегация событий


До сих пор мы говорили об обработке единичных событий. Но есть такой класс задач — алгоритмы аналитики — где результатом обработки потока являются агрегаты событий. Например, в каждом событии платежа передается сумма. Мы, проанализировав эти суммы, хотим знать, сколько всего клиент заплатил, сколько перевел на другой счет за какое-то время. Либо хотим получить какие-то средние значения, минимальные, максимальные… вариантов много.

Как это реализовать? Мы не знаем общее число событий в потоке — оно постоянно увеличивается. Но мы можем взять события за определенный промежуток времени, зафиксированные в журнале и посчитать. Соответственно, чтобы отвечать на такие вопросы в нашей архитектуре, нужно разбить поток на окна агрегации.

Самый простой вид окно агрегации — tumbling window — когда эти окна без разрывов следуют друг за другом. Рубим весь поток, например, на часовые окна и внутри каждого можем проводить агрегацию. Как можно это сделать? Вот у вас открылось окно. Обработчик открывает в базе данных специальную область для этого окна, и в ней начинается подсчет того, что происходит — суммируются события с одинаковым ключом.



В верхнем окне у нас два события с ключом K1. Когда окно закроется, в верхнем синем квадрате у нас будет два события с ключом K1. При закрытии окна эту информацию мы можем скинуть в поток (отреплицировать наше хранилище), и поток сможет прочитать тот, кому нужна эта аналитика. Затем откроется следующее окно, где могут быть уже другие события с другим количеством. В любой момент времени есть открытое окно, в котором хранятся поступающие события.

Таким образом мы превращаем поток, где много событий, в другой агрегированный поток, где у нас идет разделение по ключам и событиям соответствует гораздо меньше записей.

Время машины и время потока


Вроде бы с агрегацией разобрались, но упустили одну важную вещь. Если мы открываем окна по текущему времени, то может получиться, что подсчет агрегации будет вестись по событиям, которые на самом деле происходили довольно давно. Например, мы накопили поток, остановили все на час-два и хотим получить аналитику, запустив и обработав этот поток через два часа. Окна начнут открываться по текущему времени, хотя события были сгенерированы гораздо раньше. Получится, что в текущем окне по текущему времени произошли какие-то события, а в окнах, которые были старше, этих событий нет. Итоговая аналитика в этой ситуации будет неверной: окажется, что в более раннее время ничего не было, и все произошло прямо во время обработки.

Чтобы избежать этого, нужно понять, что окна и время в потоке — это совсем не текущее время на нашей машине. В события нужно обязательно закладывать время, когда они случились, когда были записаны в поток. И тогда окна нужно открывать не на основании текущего времени, а синхронизировать их с временем прохождения событий.

Для корректной работы нужно понимать, как течет время в потоке. Один из популярных способов — вставлять в поток метки времени — вотермарки (watermarks).



Если вотермарки совпадают с окнами агрегации, то мы можем ориентироваться, когда закрывать старое и открывать новое окно. Это решит нашу проблему. Бывают ситуации, что вотермарки не выровнены — например, мы не знаем шаг агрегации и т.д. В этом случае нужно вставлять их чаще и учитывать время событий, чтобы понимать, куда поток продвинулся.

Вотермарки также полезны при создании time-based снэпшотов хранилищ. Снэпшоты должны быть привязаны не к физическому времени, а к времени потока. Реальное время течет равномерно, время в потоке — нет. Как только сообщения быстро обрабатываются, время ускоряется, и наоборот.

Теперь надо решить: кто и когда должен генерировать вотермарки. Есть несколько вариантов:

Кто записывает данные в поток, тот и генерирует. Неплохой вариант. Помимо системы-источника, вотермарки может вставлять сама система лога. Или можно сделать компонент, который при получении событий сам себе генерирует вотермарки. Все это называется семантикой времени потока.

Мы знаем, что после вотермарка нельзя записать событие, которое наступило до него. Но что если мы записали вотермарк и потом тут же забросили в поток события, которые произошли раньше этого вотермарка? Это может иметь неприятные последствия. По вотермарку закроется окно агрегации, и тут приходит событие, которое просто отскакивает от закрытого окна. Представьте возможные последствия, когда речь идет о финансах.



Как решить эту проблему? Можно при закрытии окна дать какое-то время на поступление поздних событий. В итоге вы получите более корректную агрегацию. Но результат сгенерируется не сразу, а спустя время, которое отведено на прибытие поздних событий. Можно отправлять уточненную аналитику с учетом опоздавших событий дополнительным сообщением в поток агрегации. Но в любом случае мы идем на компромисс между точностью и скоростью агрегации.

Вообще в нашем случае различают не два, а даже три вида времени. Processing Time — текущее время на локальной машине. Event Time — время, когда событие было сгенерировано источником. Ingestion Time – время поступления события в лог (поток при записи подставляет его).

Итак, мы научились делать надежные приложения в ненадежных распределенных системах — обрабатывать потоки с нормальным масштабированием и отказоустойчивостью. Но пришлось согласиться на eventual consistency. Можно в любой момент откатить поток в более раннее состояние и заново его прогнать, избегая ошибок. Как здорово было бы проворачивать такие трюки со временем в реальной жизни.

Доступ к данным. Разгадка тайны дублей


Потоковая архитектура — это лишь одна из возможных в нашей IT-инфраструктуре. С потоками взаимодействуют другие системы, которые построены совсем на других принципах. Оценивая потоковую архитектуру со стороны, можно заметить несколько проблем:

  1. Потоковая система может обращаться наружу по принципу ad-hoc — запрашивать баланс пользователя и т.д. В момент реплея — когда в потоковой системе произошел откат — она не контролирует внешнее окружение. Один и тот же запрос баланса пользователя до реплея и после реплея могут дать разные результаты. Это повлияет на агрегацию и корректность алгоритмов работы потоковой системы. При каждом реплее результаты могут меняться, в зависимости от того, как изменились внешние данные.
  2. Внешняя система обращается к потоковой за какими-то данными. Внешняя система может получить данные от потоковой системы по запросу или через push. А после реплея — получить второй push, возможно, с другими результатами. Из-за этого мы и получали в Кафке дубли — она была предназначена для потоковых архитектур, а мы с ней работали в классической.

Подумаем, как организовать доступ к данным потоковой архитектуры — данным в базах данных обработчиков событий. Есть несколько подходов.

1. Queryable state — запрос снаружи о состоянии потокового кластера. Знакомый всем веерный query.



Схема стандартна, за исключением одного компонента — это Query Controller. Ему поступает запрос, SQL или какой-нибудь еще. Контроллер дублирует запрос во все существующие обработчики. Каждый обработчик смотрит, что он может вернуть, и дает ответ в query controller. А он агрегирует результаты и возвращает ответ тому, кто сделал запрос. Эта схема реализует один полезный паттерн — CQRS, Command-Query Responsibility Segregation, то разделение каналов модификации и чтения сущностей. Изменения идут через потоки, а чтение — совсем по другому каналу, через Query Controller.

Не смешивать сущности полезно, но есть несколько нюансов. При запросе веером вы получите ответ не раньше чем ответит самый медленный обработчик, и по нему будет определяться латентность. Если не усиливать самый медленный обработчик, система не может стать быстрее. Нужно либо усиливать его, либо разбивать на несколько более быстрых. После этого все будет измеряться по второму самому медленному разработчику. Так что для конкретного роста мощностей, скажем, в два раза, придется проводить анализ состояния всего кластера, которое может зависеть от множества факторов. Таков недостаток схемы queryable state.

2. Copy by request



В queryable state мы не делали никаких предположений о том, в каких именно обработчиках могут храниться данные. Если такие сведения есть, то можно закидывать запросы на выборку данных не сразу все обработчикам, а адресно — только тем, где нужные данные имеются. И сразу получать ответ. То есть мы копируем все состояние обработчика или его часть по запросу. Направлять такие запросы и получать ответ можно как через queryable controller — здесь агрегация сводится к случаю, когда мы ждем один ответ. А можно подмешивать такие сообщения в поток изменений — тогда у нас получается всего один канал общения с потоковой системой. Для разных запросов используют разные способы.

Теперь у нас нет проблемы с поиском самого медленного обработчика — мы четко знаем, где лежат данные и что надо улучшать, если экземпляр медленно отработал. Но нам нужно знать принцип разделения потока и понимать, как запрос отражается на локальных состояниях обработчиков. Если это возможно — лучше использовать copy by request вместо веерного queryable state.

3. Continuous distribution



Более нативный для потоковых систем способ — не что иное как потоковая репликация онлайн-данных. К обработчикам приходят данные из партиций, они обрабатывают их и выкидывают результаты обработки в другой поток. Остальные сервисы, которые интересуются этими данными, могут подписаться на этот поток, потреблять его и делать локальные копии данных у себя. Получается, что в обработчиках у нас есть мастер-копия данных — key-value — которая выкидывается в поток. Этот поток потребляется сервисами, которые создают у себя витрины. Подобным образом в поточной архитектуре обеспечивается надежность, но здесь мы еще и бизнес-логику строим.

Преимущества метода — надежность, отказоустойчивость за счет откатов при сбоях. Недостаток — вместе с мастер-копией во всем кластере может существовать много других копий данных, что потребует много ресурсов. Но зато когда какой-нибудь сервис захочет сделать что-то сложное, и в его копии окажутся необходимые данные другого сервиса, это позволит работать в локальной парадигме, без дополнительных транзакций и синхронизаций.

Если требования к задержкам не слишком высоки и есть достаточно ресурсов для хранения копий данных, в потоковых системах рекомендуется использовать именно этот способ. Как правило, потоковые архитектуры хороши именно там, где минусы continuous distribution не играют особой роли.

К чему мы пришли?


Подведем итоги приключения. Прямо по пунктам:

  • Kafka может заменить JMS в определенных условиях.
  • Kafka и потоковая обработка хороши для больших распределенных систем.
  • Eventual consistency — это неизбежная плата за использование потоковой обработки.
  • Все наши пайплайны с Кафкой и потоковой обработкой работают при некритичных требованиях к латентности, но зато обеспечивают масштабируемую производительную обработку

В новой парадигме мы уже запустили некоторые задачи, например, генерацию оповещений пользователей и мониторинг онлайн-платежей. Все работает замечательно. Сама по себе потоковая архитектура возникла недавно, и появится еще много вариантов реализации ее механизмов. Мы продолжаем ее изучать и, если будет интересно, еще расскажем о своих новых приключениях.




К сожалению, не доступен сервер mySQL