Apache Ignite + Apache Spark Data Frames: вместе веселее +19


Привет, Хабр! Меня зовут Николай Ижиков, я работаю в компании «Сбербанк Технологии» в команде развития Open Source решений. За плечами 15 лет коммерческой разработки на Java. Я коммитер Apache Ignite и контрибьютор Apache Kafka.

Под катом вас ожидает видео и текстовая версия моего доклада на Apache Ignite Meetup о том, как использовать Apache Ignite вместе с Apache Spark и какие возможности мы для этого реализовали.



Что умеет Apache Spark


Что такое Apache Spark? Это продукт, который позволяет быстро выполнять распределенные вычисления и аналитические запросы. В основном, Apache Spark написан на Scala.

У Apache Spark богатый API для подключения к различным системам хранения или получения данных. Одна из особенностей продукта — универсальный SQL-like движок запросов к данным, получаемым из различных источников. Если у вас несколько источников информации, вы хотите их объединить и получить какие-то результаты, Apache Spark — это то, что вам нужно.

Одной из ключевых абстракций, которую предоставляет Spark, являются Data Frame, DataSet. В терминах реляционной базы — это таблица, некий источник, который предоставляет данные в структурированном виде. Известна структура, тип каждого столбца, его название и т.п. Data Frame'ы могут быть созданы из различных источников. В качестве примеров можно привести json-файлы, реляционные базы данных, различные hadoop-системы, а также Apache Ignite.

Spark поддерживает join'ы в SQL-запросах. Можно объединять данные из различных источников и получать результаты, выполнять аналитические запросы. Кроме того, есть API для сохранения данных. Когда вы выполнили запросы, провели исследование, то Spark предоставляет возможность сохранить результаты в тот приёмник, который поддерживает такую возможность, и, соответственно, решить задачу по обработке данных.

Какие возможности интеграции Apache Spark с Apache Ignite мы реализовали


  1. Чтение данных из SQL-таблиц Apache Ignite.
  2. Запись данных в SQL-таблицы Apache Ignite.
  3. IgniteCatalog внутри IgniteSparkSession — возможность использовать все существующие SQL-таблицы Ignite'а без регистрации «руками».
  4. SQL Optimization — возможность выполнять SQL-операторы внутри Ignite.

Apache Spark умеет читать данные из Apache Ignite SQL-таблиц и записывать их в виде такой таблицы. Любой DataFrame, который сформирован в Spark, можно сохранить в виде SQL-таблицы Apache Ignite.

Apache Ignite позволяет использовать все существующие SQL-таблицы Ignite в Spark Session без регистрации «руками» — с помощью IgniteCatalog внутри расширения стандартной SparkSession — IgniteSparkSession.

Тут надо немного углубиться в устройство Spark. В терминах обычной базы данных каталог — это место, в котором хранится мета-информация: какие таблицы доступны, какие в них столбцы и т.п. Когда поступает запрос, из каталога подтягивается мета-информация и SQL-движок что-то делает с таблицами, данными. По умолчанию в Spark все прочитанные таблицы (не важно, из реляционной базы данных, Ignite, Hadoop) приходится вручную регистрировать в сессии. В результате вы получаете возможность сделать SQL-запрос над этими таблицами. Spark про них узнает.

Чтобы  работать с данными, которые мы загрузили в Ignite, нам нужно зарегистрировать таблицы. Но вместо регистрации каждой таблицы «руками» мы реализовали возможность автоматически получать доступ ко всем таблицам Ignite.

В чем здесь особенность? По непонятной мне причине, каталог в Spark – это internal API, т.е. сторонний человек не может прийти и создать свою имплементацию каталога. И, поскольку Spark вышел из Hadoop, он поддерживает только Hive. А все остальное вы должны регистрировать руками. Пользователи часто спрашивают, как можно это обойти и сразу делать SQL-запросы. Я реализовал каталог, который позволяет обозревать и обращаться к таблицам Ignite без регистрации ~и sms~, и первоначально предложил этот патч в Spark community, на что получил ответ: такой патч не интересен по каким-то внутренним причинам. И пробросить наружу internal API они тоже не дали.

Сейчас Ignite-каталог — это интересная фича, реализованная с использованием внутреннего API Spark'а. Чтобы использовать этот каталог, у нас есть своя имплементация сессии, Это обычная SparkSession, внутри которой можно делать запросы, обрабатывать данные. Отличия состоят в том, что мы встроили в неё ExternalCatalog для работы с таблицами Ignite, а также IgniteOptimization, о котором будет рассказано ниже.

SQL Optimization — возможность выполнять SQL-операторы внутри Ignite. По умолчанию при выполнении join, группировки, расчёта агрегатов, других сложных SQL-запросов Spark читает данные в режиме row by row. Единственное, что «умеет» источник данных, это эффективно отфильтровать строки.

Если используется join или группировка, Spark вытаскивает все данные из таблицы к себе в память на worker, применяя заданные фильтры, и только потом группирует их или выполняет другие SQL-операции. В случае Ignite это неоптимально, потому что сам Ignite имеет распределенную архитектуру и обладает знаниями о данных, которые в нём хранятся. Поэтому сам Ignite может эффективно подсчитать агрегаты, провести группировку. Кроме того, данных может быть много, и для их группировки нужно будет вычитать всё, поднять все данные в Spark, что довольно затратно.

В Spark предусмотрен API, при помощи которого можно изменять первоначальный план SQL запроса, выполнить оптимизацию и пробросить внутрь Ignite ту часть SQL-запроса, которая может быть там выполнена. Это будет эффективно с точки зрения скорости, а также расхода памяти, потому что мы не будем использовать ее, чтобы вытянуть данные, которые будут тут же сгруппированы.

Как все работает




У нас есть кластер Ignite — это нижняя половина картинки. Zookeeper нет, поскольку нод всего пять. Есть спарковские worker'ы, внутри каждого worker'а поднимается клиентская нода Ignite. Через нее мы можем сделать запрос и прочитать данные, взаимодействовать с кластером. Также клиентская нода поднимается внутри IgniteSparkSession для работы каталога.

Ignite Data Frame


Переходим к коду: как прочитать данные из SQL-таблицы? В случае Spark все достаточно просто и хорошо: говорим, что хотим посчитать какие-то данные, указываем формат – это определенная константа. Дальше у нас есть несколько опций — путь к конфигурационному файлу для клиентской ноды, которая запускается при чтении данных. Мы указываем, какую таблицу хотим прочитать, и говорим Spark загрузить. Получаем данные и можем с ними делать, что хотим.

spark.read
	.format(FORMAT_IGNITE)
	.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
	.option(OPTION_TABLE, "person")
	.load()

После того, как мы сформировали данные — необязательно из Ignite, можно из любого источника – мы можем так же просто все сохранить, указав формат и соответствующую таблицу. Командуем Spark записать, указываем формат. В конфиге прописываем, к какому кластеру коннектиться. Указываем таблицу, в которую хотим сохранить. Дополнительно можем прописать служебные опции — указать primary key, который мы на этой таблице создаем. Если данные просто апендятся без создания таблицы, то этот параметр не нужен. В конце жмем save и данные записываются.

tbl.write.
	format(FORMAT_IGNITE).
	option(OPTION_CONFIG_FILE, CFG_PATH).
	option(OPTION_TABLE, tableName).
	option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, pk).
	save

Теперь давайте посмотрим, как это все работает.


LoadDataExample.scala

Это очевидное приложение сначала продемонстрирует возможности записи. Я выбрал для примера данные по футбольным матчам, скачал статистику с известного ресурса. Тут содержится информация по турнирам: лиги, матчи, игроки, команды, атрибуты игроков, атрибуты команд — данные, которые описывают футбольные матчи в лигах европейских стран (Англия, Франция, Испании и т.п.).

Я хочу загрузить их в Ignite. Мы создаем сессию Spark, указываем адрес мастера и вызываем загрузку этих таблиц, передавая параметры. Пример на Scala, а не на Java, потому что Scala менее многословна и так лучше для примера.

Передаем имя файла, читаем его, указываем, что он мультилайн, это стандартный json-файл. Потом записываем в Ignite. Структуру нашего файла нигде не описываем — Spark сам определяет, какие данные у нас лежат и какая у них структура. Если все проходит гладко, создается таблица, в которой есть все нужные поля нужных типов данных. Вот так мы можем загрузить все внутрь Ignite.

Когда данные загрузятся, мы их сможем увидеть в Ignite и сразу использовать. В качестве простого примера — запрос, который позволяет узнать, какая команда сыграла больше всех матчей. У нас есть две колонки: hometeam и awayteam, хозяева и гости. Выбираем, группируем, считаем count, суммируем и джойним с данными по команде – чтобы ввести имя команды. Та-дам – и данные с json-чиков у нас попали в Ignite. Видим Пари Сен-Жермен, Тулузу — по французским командам у нас оказалось много данных.



Резюмируем. Мы сейчас загрузили данные из источника, json-файла, в Ignite, причем достаточно быстро. Возможно, с точки зрения big data это не слишком большой объем, но для локального компьютера прилично. Схема таблицы взята из json-файла в исходном виде. Таблица создалась, названия столбцов скопировались из исходного файла, создан первичный ключ. ID везде есть, и первичный ключ является ID. Эти данные попали в Ignite, мы можем их использовать.

IgniteSparkSession и IgniteCatalog


Посмотрим, как это работает.


CatalogExample.scala

Достаточно простым способом вы можете получить доступ и делать запросы ко всем вашим данным. В прошлом примере мы запускали стандартную спарковскую сессию. И никакой специфики Ignite там не было — кроме того, что вы должны подложить jar с нужным источником данных — совершенно стандартная работа через public API.  Но, если вы хотите получить доступ к Ignite-таблицам автоматически, можно использовать наше расширение. Отличие состоит в том, что вместо SparkSession мы пишем IgniteSparkSession.

Как только вы создаете обьект IgniteSparkSession, то видите в каталоге все таблицы, которые только что загрузили в Ignite. Можете посмотреть их схему и всю информацию. Spark уже знает про таблицы, которые есть в Ignite, и вы можете легко получить все данные.



IgniteOptimization


Когда вы делаете сложные запросы в Ignite с использованием JOIN, сначала Spark вытаскивает данные, и только потом JOIN их группирует. Чтобы оптимизировать процесс, мы сделали фичу IgniteOptimization — она оптимизирует план запросов Spark и позволяет пробросить внутрь Ignite те части запроса, которые могут быть выполнены внутри Ignite. Покажем оптимизацию на конкретном запросе.

SQL Query:

  SELECT 
    city_id,
    count(*) 
  FROM 
    person p 
  GROUP BY city_id 
  HAVING count(*) > 1

Выполняем запрос. У нас есть таблица person — какие-то сотрудники, люди. У каждого сотрудника известен ID города, в котором он живет. Мы хотим узнать, сколько человек живет в каждом городе. Фильтруем – в каком городе живет больше одного человека. Вот изначальный план, который строит Spark:

== Analyzed Logical Plan ==
city_id: bigint, count(1): bigint
Project [city_id#19L, count(1)#52L]
+- Filter (count(1)#54L > cast(1 as bigint))
   +- Aggregate [city_id#19L], [city_id#19L, count(1) AS count(1)#52L, count(1) AS count(1)#54L]
  	+- SubqueryAlias p
     	+- SubqueryAlias person
        +- Relation[NAME#11,BIRTH_DATE#12,IS_RESIDENT#13,SALARY#14,PENSION#15,ACCOUNT#16,AGE#17,ID#18L,CITY_ID#19L]
          IgniteSQLRelation[table=PERSON]

Relation – это как раз Ignite-таблица. Нет никаких фильтров — мы просто выкачиваем по сети из кластера все данные из таблицы Person. Потом Spark все это агрегирует — в соответствии с запросом и вернет результат запроса.

Легко видеть, что все это поддерево с фильтром и агрегацией может быть исполнено внутри Ignite. Это будет гораздо эффективней, чем вытягивать все данные из потенциально большой таблицы в Spark — этим и занимается наша фича IgniteOptimization. После анализа и оптимизации дерева мы получаем следующий план:

== Optimized Logical Plan ==
Relation[CITY_ID#19L,COUNT(1)#52L]
    IgniteSQLAccumulatorRelation(
      columns=[CITY_ID, COUNT(1)], qry=SELECT CITY_ID, COUNT(1) FROM PERSON GROUP BY city_id HAVING count(1) > 1)

В итоге, у нас получается всего один relation, так как мы оптимизировали все дерево. И внутри уже видно, что на Ignite пойдет запрос, который достаточно близок к изначальному запросу.

Предположим, мы джойнимся с разными источниками данных: например, один DataFrame у нас из Ignite, второй из json, третий опять из Ignite, а четвертый — из какой-то  реляционной базы. В этом случае в плане будет оптимизировано только поддерево. Мы оптимизируем, что можем, закидываем это в Ignite, а все остальное уже будет делать Spark. За счет этого мы получаем выигрыш по скорости.

Другой пример с JOIN:

SQL Query - 
SELECT
  jt1.id as id1,
  jt1.val1,
  jt2.id as id2,
  jt2.val2
FROM
  jt1 JOIN
  jt2 ON jt1.val1 = jt2.val2

У нас есть две таблицы. Мы джойнимся по значению и выбираем из них все — ID, значения. Spark предлагает вот такой вот план:

== Analyzed Logical Plan ==
id1: bigint, val1: string, id2: bigint, val2: string
Project [id#4L AS id1#84L, val1#3, id#6L AS id2#85L, val2#5]
+- Join Inner, (val1#3 = val2#5)
  :- SubqueryAlias jt1
  :  +- Relation[VAL1#3,ID#4L] IgniteSQLRelation[table=JT1]
  +- SubqueryAlias jt2
     +- Relation[VAL2#5,ID#6L] IgniteSQLRelation[table=JT2]

Видим, что он будет вытаскивать все данные из одной таблицы, все данные из второй, джойнить их внутри себя и выдавать результаты. После обработки и оптимизации мы получаем ровно такой же запрос, который уходит в Ignite, где сравнительно быстро исполняется.

== Optimized Logical Plan ==
Relation[ID#84L,VAL1#3,ID#85L,VAL2#5]
IgniteSQLAccumulatorRelation(columns=[ID, VAL1, ID, VAL2],
qry=
  SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2
  FROM JT1 JOIN JT2 ON JT1.val1 = JT2.val2
  WHERE JT1.val1 IS NOT NULL AND JT2.val2 IS NOT NULL)

Покажу еще пример.


OptimizationExample.scala

Мы создаем IgniteSpark-сессию, в которую уже автоматически включены все наши возможности оптимизации. Здесь запрос такой: найти игроков с наибольшим рейтингом и вывести их имена. В таблице player — их атрибуты и данные. Мы джойнимся, фильтруем мусорные данные и выводим игроков с наибольшим рейтингом. Посмотрим, какой план у нас получился после оптимизации, и покажем результаты этого запроса.



Запускаем. Видим знакомые фамилии: Месси, Буффон, Роналду и т.д. Кстати, некоторые почему-то в двух ипостасях встречаются — и Месси, и Роналду. Любителям футбола может показаться странным, что в списке фигурируют неизвестные игроки. Это вратари, игроки с довольно высокими характеристиками — на фоне других игроков. Теперь смотрим на план запроса, который был выполнен. В Spark почти ничего не выполнялось, то есть мы весь запрос отправили опять же в Ignite.

Развитие Apache Ignite


Наш проект – это open source-продукт, поэтому мы всегда рады патчам и обратной связи от разработчиков. Ваша помощь, обратная связь, патчи очень приветствуются. Мы ждем их. На 90% Ignite-комьюнити – русскоязычно. Например, для меня, пока я не начал рабоать над Apache Ignite, не самое лучшее знание английского языка было сдерживающим фактором. По-русски вряд ли стоит на dev-лист писать, но даже если вы напишите что-то не так, вам ответят и помогут.

Что можно улучшить по этой интеграции? Чем можно помочь, если у вас есть такое желание? Список ниже. Звездочки обозначают сложность.


Для проверки оптимизации нужно писать тесты со сложными запросами. Выше я показал несколько очевидных запросов. Понятно, что если написать много группировок и много джойнов, то что-то может упасть.  Это совсем простой таск — приходите и делайте. Если мы найдем какие-то баги по итогам тестов, их надо будет исправить. Там уже будет сложнее.

Еще одна понятная и интересная задача – интеграция Spark с тонким клиентом. Он изначально способен указать какие-то наборы IP-адресов, и этого достаточно, чтобы присоединиться к Ignite-кластеру, что удобно в случае интеграции с внешней системой. Если вдруг захотите присоединиться к решению этой задачи, я лично буду по ней помогать.

Если вы захотели присоединиться к сообществу Apache Ignite, ниже полезные ссылки:


У нас отзывчивый дев-лист, на котором вам обязательно помогут. Он еще далек от идеала, но в сравнении с другими проектами действительно живой.

Если вы знаете Java или С++, ищете работу и хотите разрабатывать Open Source (Apache Ignite, Apache Kafka, Tarantool и т.п.) пишите сюда: join-open-source@sberbank.ru.




К сожалению, не доступен сервер mySQL