Потоковая обработка данных при помощи Akka +41


Привет, Хабр! Все привыкли ассоциировать обработку больших данных с Hadoop (или Spark), которые реализуют парадигму MapReduce (или его расширения). В этой статье я расскажу о недостатках MapReduce, о том, почему мы приняли решение отказываться от MapReduce, и как мы приспособили Akka + Akka Cluster на замену MapReduce.



Data Management Platform


Задача, для решения которой нам понадобились инструменты работы с большими данными, – сегментация пользователей. Класс систем, которые решают задачу сегментации пользователей во всём мире принято называть Data Management Platform или сокращённо DMP. На вход DMP поступают данные о действиях пользователей (в первую очередь, это факты посещений тех или иных страниц в интернете), на выходе DMP выдает «профиль пользователя» — его пол, возраст, интересы, намерения и так далее. Этот профиль в дальнейшем используется для таргетирования рекламы, персональных рекомендаций и для персонализации контента в целом. Продробнее о DMP можно почитать тут: http://digitalmarketing-glossary.com/What-is-DMP-definition.

Поскольку DMP работает с данными большого количества пользователей, объёмы данных, которые нужно обрабатывать, могут достигать очень внушительных размеров. Например, наша DMP Facetz.DCA обрабатывает данные 600 миллионов браузеров, ежедневно обрабатывая почти полпетабайта данных.

Архитектура DMP Facetz.DCA


Для того, чтобы обрабатывать такие объёмы данных необходима хорошая масштабируемая архитектура. Изначально мы построили систему на основе стека hadoop. Подробное описание архитектуры вполне заслуживает отдельной статьи, в этом же материале я ограничусь кратким описанием:

  1. Логи действий пользователя складываются на HDFS – распределённую файловую систему, являющуюся одной из базовых компонент экосистемы hadoop

  2. Из HDFS данные складываются в хранилище сырых данных, реализованное на основе Apache HBase – распределенной масштабируемой базы данных, построенной на основе идей Big Table. По сути, HBase – очень удобная для массовой обработки key-value база данных. Все данные пользователей хранятся в одной большой таблице facts. Данные одного пользователя соответствуют одной строке HBase, что позволяет очень быстро и удобно получить всю необходимую информацию о нём.

  3. Один раз в сутки запускается Analytic Engine – большой MapReduce job, который, собственно, и выполняет сегментацию пользователей. По сути, Analytic Engine – контейнер для правил сегментации, которые отдельно готовятся аналитиками. Например, один скрипт может размечать пол пользователя, другой — его интересы и так далее.

  4. Готовые сегменты пользователей складываются в Aerospike – key-value базу данных, которая очень хорошо заточена на быструю отдачу – 99% запросов на чтение отрабатывают менее чем за 1 мс даже при больших нагрузках в десятки тысяч запросов в секунду.



Архитектура Facetz.DCA

Проблемы MapReduce


Разработанная архитектура показала себя хорошо – позволила быстро смасштабироваться до обработки профилей пользователей всего Рунета и размечать их при помощи сотен скриптов (каждый может размечать пользователя по нескольким сегментам). Однако она оказалась не лишённой недостатков. Основная проблема – отсутствие интерактивности при обработке. MapReduce, по своей природе, – парадигма offline–обработки данных. Так, например, если пользователь посмотрел билеты на футбол сегодня, в сегмент «Интересуется футболом» он может попасть только завтра. В некоторых случаях такая задержка является критичной. Типичный пример – ретаргетинг – реклама пользователю товаров, которые он уже посмотрел. На графике приведена вероятность совершения покупки пользователем после просмотра товара по прошествии времени:


График вероятности конверсии после просмотра товара. При отстутсвии real-time движка для нас доступна только зеленая часть, в то время как максимальная вероятность приходится на первые часы.

Видно, что самая большая вероятность покупки – в течение нескольких первых часов. При таком подходе система узнала бы, что пользователь хочет купить товар, только по прошествии суток – когда вероятность покупки практически вышла на плато.

Очевидно, что необходим механизм потоковой real-time обработки данных, который сводит к минимуму задержку. При этом хочется сохранить универсальность обработки – возможность строить сколь угодно сложные правила сегментации пользователей.

Модель Акторов


Поразмыслив, мы пришли к выводу, что лучше всего для решения задачи нам подходит парадигма реактивного программирования и модель акторов. Актор – это примитив параллельного программирования, который умеет:

  • Принимать сообщения
  • Посылать сообщения
  • Создавать новых актор’ов
  • Устанавливать реакцию на сообщения

Модель акторов зародилась в erlang-сообществе, сейчас реализации этой модели существуют для многих языков программирования.

Для языка scala, на котором написана наша DMP, очень хорошим тулкитом является akka. Она лежит в основе нескольких популярных фреймворков, хорошо задокументированна. Кроме того, на Coursera есть прекрасный курс принципы реактивного программирования, в котором эти самые принципы рассказываются как раз на примере akka. Отдельно стоит упомянуть модуль akka cluster, который позволяет масштабировать решения (базирующиеся на акторах) на несколько серверов.

Архитектура Real-Time DMP


Итоговая архитектура выглядит следующим образом:



Поставщик данных складывает информацию о действиях пользователей в RabbitMQ.

  1. Из RabbitMQ сообщения о действиях пользователя вычитывает Dispatcher. Dispatcher-ов может быть несколько, они работают независимо.

  2. Для каждого онлайн-пользователя в системе заводится актор. Dispatcher отправляет сообщение о новом событии (вычитанном из RabbitMQ) соответствующему актору (или заводит новый актор, если это первое действие пользователя и для него ещё нет актора).

  3. Актор, соответствующий пользователю, добавляет информацию о действии в список пользовательских действий и запускает скрипты сегментации (те же, что запускают Analytic Engine при MapReduce-обработке).

  4. Данные о размеченных сегментах складываются в Aerospike. Также данные о сегментах и действиях пользователях доступны по API, подключенным напрямую к акторам.

  5. Если о пользователе не поступало данных в течение часа, сессия считается законченной и актор уничтожается.

Шардированием акторов по кластеру, их жизнью и уничтожением управляет akka, что существенно упростило разработку.

Текущие результаты:


  • Akka-кластер из 6 нод;
  • Поток данных 3000 событий в секунду;
  • 4-6 миллионов пользователей онлайн (в зависимости от дня недели);
  • Среднее время выполнения одного скрипта сегментации пользователей меньше пяти миллисекунд;
  • Среднее время между событием и сегментацией на основе этого события – одна секунда.



Дальнейшее развитие


Наш Real-Time Engine показал себя хорошо и мы планируем развивать его дальше. Список шагов, которые мы планируем предпринять:

  • Персистентность – сейчас Real-Time Engine сегментирует пользователей только на основе последней сессии. Мы планируем добавить подтягивание более старой информации из HBase при появлении нового пользователя.
  • На текущий момент только часть наших данных переведена на realtime-обработку. Мы планируем постепенно перевести все наши источники данных на потоковую обработку, после этого поток обрабатываемых данных возрастёт до 30000 событий в секунду.
  • После завершения перевода на Realtime мы сможем отказаться от ежедневного расчёта MapReduce, что позволит сэкономить на серверах за счёт того, что обрабатываться будут только те пользователи, которые реально сегодня проявили активность в интернете.

Ссылки на похожие решения


В конце хотелось бы привести несколько ссылок на некоторые фреймворки, на основе которых также можно строить потоковую обработку данных:
Apache Storm
Spark Streaming
Apache Samza

Спасибо за внимание, готовы ответить на ваши вопросы.




К сожалению, не доступен сервер mySQL