Как настроить мультинодовый кластер Airflow с помощью Celery и RabbitMQ +7


Что такое Airflow?


Apache Airflow — это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера.


Airflow позволяет создавать рабочие процессы в виде направленных ациклических графов (DAG) задач. Разнообразные служебные программы командной строки выполняют сложные операции на DAG. Пользовательский интерфейс легко визуализирует конвейеры, работающие в производственной среде, отслеживает ход выполнения и при необходимости устраняет неполадки.


Программно создавайте, планируйте и контролируйте рабочий процесс. Он предоставляет функциональную абстракцию в виде идемпотентного DAG (направленного ациклического графа). Функция как служба абстракции для выполнения задач с заданными интервалами.


Кластер с одним узлом Airflow


В одноузловом кластере Airflow все компоненты (рабочий, планировщик, веб-сервер) установлены на одном узле, известном как "Master нода". Чтобы масштабировать кластер с одним узлом, Airflow должен быть настроен в режиме LocalExecutor. Worker берет (pull) задачу из очереди IPC (межпроцессное взаимодействие), это очень хорошо масштабируется до тех пор, пока ресурсы доступны на Master нода. Чтобы масштабировать Airflow на много нод, необходимо включить Celery Executor.



Архитектура с одной нодой Airflow


Мультинодовый кластер Airflow


В мультинодовой архитектуре Airflow его демоны распределены по всем рабочим нодам. Поскольку веб-сервер и планировщик будут установлены на главной ноде, а рабочие будут установлены на каждом отдельном рабочей ноде, поэтому он может хорошо масштабироваться как по горизонтали, так и по вертикали. Чтобы использовать этот режим архитектуры, необходимо настроить Airflow с помощью CeleryExecutor.


Серверную часть Celery необходимо настроить для включения режима CeleryExecutor в архитектуре Airflow. Популярными фреймворками / приложениями для бэкэнда Celery являются Redis и RabbitMQ. RabbitMQ — это брокер сообщений. Его задача — управлять обменом данными между несколькими службами задач путем управления очередями сообщений. Вместо канала связи IPC, который был бы в архитектуре с одной нодой, RabbitMQ предоставляет модель механизма публикации — подписки для обмена сообщениями в разных очередях. Каждая очередь в RabbitMQ опубликована с событиями / сообщениями в виде команд задач, работники Celery будут извлекать команды задач из каждой очереди и выполнять их как действительно распределенные и параллельные способы. Что действительно может ускорить действительно мощное одновременное и параллельное выполнение задач в кластере.



Мультинодовая архитектура Airflow


Celery:


Celery — это асинхронная очередь задач, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование. Airflow использует его для выполнения нескольких параллельных операций на уровне задач на нескольких рабочих узлах с использованием многопроцессорности и многозадачности. Мультинодовая архитектура Airflow позволяет масштабировать Airflow, легко добавляя новые воркеры.


Устновка мультинодового кластера Airflow и настройка Celery:


Примечание. Мы используем операционную систему CentOS 7 Linux.


  1. Установка RabbitMQ

yum install epel-release
yum install rabbitmq-server

  1. Включение и запуск RabbitMQ Server

systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service

  1. Включение интерфейса веб-консоли управления RabbitMQ

rabbitmq-plugins enable rabbitmq_management


Номер порта сервера rabbitmq по умолчанию — 15672, имя пользователя и пароль по умолчанию для веб-консоли управления — admin/admin.



  1. Установка протокола транспорта pyamqp для RabbitMQ и адаптера PostGreSQL

pip install pyamqp

amqp:// — это псевдоним, который использует librabbitmq, если он доступен, или py-amqp, если его нет.


Вы должны использовать pyamqp:// или librabbitmq://, если хотите точно указать, какой протокол передачи данных использовать. Протокол pyamqp:// использует библиотеку amqp (http://github.com/celery/py-amqp)


Установка адаптера PostGreSQL: psycopg2


Psycopg — это адаптер PostgreSQL для языка программирования Python.


pip install psycopg2

  1. Установка Airflow.

pip install 'apache-airflow[all]'

Проверьте версию airflow


airflow version


Мы используем версию Airflow v1.10.0, рекомендованную и стабильную в настоящее время.


  1. Инициализация базы данных

airflow initdb

После установки и настройки вам необходимо инициализировать базу данных, прежде чем вы сможете запустить группы обеспечения доступности баз данных и ее задачу. Поэтому последние изменения будут отражены в метаданных Airflow из конфигурации.


  1. Установка Celery

Celery должен быть установлен на главной ноде и на всех рабочих нодах.


pip install celery==4.3.0

Проверка версии Celery


celery --version
4.3.0 (rhubarb)

  1. Изменение файла airflow.cfg для Celery Executor.

executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow 
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow 
dags_are_paused_at_creation = True
load_examples = False

После внесения этих изменений в файл конфигурации airflow.cfg необходимо обновить метаданные airflow с помощью команды airflow initdb, а затем перезапустить airflow.


Теперь вы можете запустить веб-сервер airflow с помощью следующей команды


# default port is 8080
airflow webserver -p 8000

Вы можете запустить планировщик


# start the scheduler
airflow scheduler

Вы также должны запустить airflow на каждом рабочем узле.


airflow worker

Как только вы закончите запускать различные службы airflow, вы можете проверить фантастический интерфейс airflow при помощи команды:


http://<IP-ADDRESS/HOSTNAME>:8000

поскольку мы указали порт 8000 в нашей команде запуска службы веб-сервера, в противном случае номер порта по умолчанию — 8080.


Да! Мы закончили создание кластера с мультинодовый архитектурой Airflow. :)




К сожалению, не доступен сервер mySQL