Кто такой Analytics Engineer – E2E-решение с использованием bash + dbt + Looker +7


Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.

Мы могли бы долго и нудно обсуждать, кто такой Analytics (Data / Backend) Engineer, какими инструментами он должен владеть, какие buzzwords в тренде и ценятся в CV, однако, на мой взгляд, гораздо интереснее рассмотреть процесс и результаты его деятельности в рамках конкретной прикладной задачи.

В этой публикации:

  • Что значит решение End-to-End и в чем его ценность?

  • Организация Extract & Load данных из асинхронного API MaestroQA

  • Моделирование витрин данных с помощью dbt

  • Поставка ценности для пользователей с помощью Looker

Решение End-to-End - от идеи до создания ценности

В двух словах, End-to-End – это поставка полноценного функционального решения, включающая все детали пазла.

Предлагаю переходить к реальному сценарию - это работа с приложением MaestroQA, автоматизирующим мониторинг и оценку клиентского сервиса (Customer Support).

Одна из самых важных идей заключается в том, что заказчик, кем бы он ни был (Manager, Product Owner, CEO), почти никогда не ставит задачу в инженерных терминах:

  • Налить 100500 гигабайт в Хранилище

  • Добавить multithreading в код

  • Написать супероптимальный запрос

  • Создать 15 dbt-моделей

За любой инженерной задачей стоит решение конкретных бизнес-проблем. Для нас это:

  • Прозрачность Customer Support (фиксируем все оценки, инциденты)

  • Результативность на ладони (отслеживаем динамику показателей во времени)

  • Отчитываемся о KPI команд поддержки (агрегирующие показатели по командам, городам, странам и т.д.)

  • Получаем обратную связь и исправляем ошибки (идентификация слабых/проблемных мест и быстрый feedback)

  • Постоянно учимся и разбираем кейсы (категоризация тем, организация тренингов и разборов)

И это ключевой фокус, который отличает Analytics Engineer от, например, классических Data Engineer, Backend Engineer. Обладая всем спектром инженерных навыков и практик, Analytics Engineer создает ценность для бизнеса и решает прикладные задачи. Говорит на одном языке с заказчиком решений и мыслит в терминах бизнес-показателей.

Получим исходные данные – Extract & Load Data

Окей, теперь ближе к делу. Я выбрал MaestroQA намеренно - это источник, для которого нет готовых коннекторов в SaaS-решениях.

Поэтому нам придется реализовать эту интеграцию и установить ее на расписание самостоятельно.

Выбор инструментов для интеграции данных - во многом вопрос вкуса, но я предпочитаю использовать простые shell-скрипты и оркестрировать их с помощью Airflow.

1. Начнем с изучения документации к API сервиса:

  • Нам доступен ряд методов: request-raw-export, request-groups-export, request-audit-log-export, get-export-data

  • Методы принимают набор параметров: apiToken, startDate, endDate, exportId

  • Результирующие отчеты формируются асинхронно

Асинхронный API означает то, что в ответ на запрос той или иной выгрузки вы получите не саму выгрузку а номер в очереди. Предъявив этот номер в другое окно (метод get-export-data), вы получите выгрузку, как только она будет готова.

Использование Async API несколько усложняет задачу, а именно использованием нескольких методов и сохранением exportId, но так задача становится даже интереснее.

2. Получим API Token.

Это секретный ключ, который позволит выгружать данные, связанные с вашим аккаунтом.

3. Готовим скрипты для выгрузки.

Шаг 1. Запросить выгрузку сырых данных (total_scores.sh):

  • Сформировать запрос к API (JSON_DATA)

  • Получить exportId (талон в очереди)

  • Проверить значение exportId и если ОК, то перейти к выгрузке результата (retrieve.sh)

# 1. Prepare request
JSON_DATA=$(jq -n \
              --arg maestroqa_token "$MAESTROQA_TOKEN" \
              --arg start_date "$START_DATE" \
              --arg end_date "$END_DATE" \
              --arg single_file_export "$SINGLE_FILE_EXPORT" \
              --arg name "$FILE_NAME" \
              '{apiToken: $maestroqa_token, startDate: $start_date, endDate: $end_date, singleFileExport: $single_file_export, name: $name }' )

# 2. Get exportId
EXPORT_ID=$(curl -s -X POST $ENDPOINT \
    -H 'Content-Type: application/json' \
    -d "${JSON_DATA}" \
    | jq -r '.exportId')

# 3. Retrieve data by exportId
if [ -z "$EXPORT_ID" ]
then
      echo "EXPORT_ID is empty"
      exit 1
else
      echo "EXPORT_ID=$EXPORT_ID"
      EXPORT_ID=$EXPORT_ID bash retrieve.sh
fi

Шаг 2. Получить готовую выгрузку (retrieve.sh):

  • Определить функцию для запроса статуса готовности выгрузки (get_status())

  • Сформировать запрос к API (JSON_DATA)

  • Опрашивать API о статусе готовности каждые 10 секунд

  • По готовности (complete) получить файл с результатами и сохранить в S3

# 1. function used to poll to get current status
get_status() {
    curl -s -X GET $RETRIEVE_ENDPOINT \
    -H 'Content-Type: application/json' \
    -d "${JSON_DATA}" \
    | jq -r '.status' \
    | cat
}

# 2. prepare request
JSON_DATA=$(jq -n \
              --arg maestroqa_token "$MAESTROQA_TOKEN" \
              --arg export_id "$EXPORT_ID" \
              '{apiToken: $maestroqa_token, exportId: $export_id }' )

# 3. get current status ("in progress" / "complete")
STATUS="$(get_status)"
printf "STATUS=$STATUS\n"

# 4. poll every 10 seconds
while [ "$STATUS" != "complete" ]; do
  printf "STATUS=$STATUS\n"
  sleep 10
  STATUS="$(get_status)"
done

# 5. Store resulting file to S3
curl -s -X GET $RETRIEVE_ENDPOINT \
  -H 'Content-Type: application/json' \
  -d "${JSON_DATA}" \
  | jq -r '.dataUrl' \
  | xargs curl -s \
  | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv

echo "UPLOADED TO s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv"
  1. Автоматизация на Airflow.

Отлично! После успешного формирования выгрузки в ручном режиме возникает необходимость задать расписание выполнения. Удобнее всего это сделать через Airflow, с возможностью осуществлять повторные попытки, мониторинг, получения уведомлений.

Пример DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

import os
import yaml
from datetime import datetime, timedelta
from slack.notifications import failed_task_slack_notification

### INIT DAG
DAG_NAME = "maestroqa_api"
SCHEDULE_INTERVAL = '0 0 * * *'

DAG_PATH = os.path.dirname(__file__)
CONFIG_FILE_NAME = "config.yml"
CONFIG_PATH = os.path.join(DAG_PATH, CONFIG_FILE_NAME)
CONFIG = yaml.safe_load(open(CONFIG_PATH))["endpoints"]

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2022, 2, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=3),
    "catchup": False,
    "dagrun_timeout": timedelta(minutes=5),
    'on_failure_callback': failed_task_slack_notification
}

dag = DAG(
    DAG_NAME,
    default_args=default_args, 
    schedule_interval=SCHEDULE_INTERVAL
)

os.environ["START_DATE"] = "{{ execution_date.isoformat() }}"
os.environ["END_DATE"] = "{{ next_execution_date.isoformat() }}"

groups = BashOperator(
            task_id=CONFIG['groups']['FILE_NAME'],
            bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['groups']['FILE_NAME']}.sh ",
            env={ **os.environ.copy(), **CONFIG['groups'] },
            trigger_rule="all_done",
            dag=dag
    )

total_scores = BashOperator(
            task_id=CONFIG['total_scores']['FILE_NAME'],
            bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['total_scores']['FILE_NAME']}.sh ",
            env={ **os.environ.copy(), **CONFIG['total_scores'] },
            trigger_rule="all_done",
            dag=dag
    )

groups >> total_scores

Смоделируем витрины данных – Transform Data

К данному этапу у нас ежедневно работает выгрузка новых данных и файлы накапливаются в S3.

Зарегистрируем файлы в S3 в качестве EXTERNAL TABLE

Чтобы иметь возможность обращаться к данным с помощью SELECT-запросов. В этом нам поможет package dbt-labs/dbt_external_tables:

version: 2

sources:

  - name: maestroqa
    database: wheely
    schema: spectrum
    tags: ["sources", "maestroqa"]
    loader: Airflow (S3 via External Tables)
    description: "MaestroQA – customer service quality assurance software"

    tables:

        - name: groups
          identifier: maestroqa_groups
          description: "Agent Groups"
          external:
            location: "s3://{{ var('s3_bucket_name') }}/maestroqa/GROUPS/"
            row_format: serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
            table_properties: "('skip.header.line.count'='1')"
          columns:
            - name: group_name
              data_type: varchar
              description: "Group / Team name: All Agents, UK Team, Ru Team, FR Team, UK Team"
            - name: group_id
              data_type: varchar
            - name: agent_name
              data_type: varchar
            - name: agent_email
              data_type: varchar
            - name: agent_ids
              data_type: varchar
              description: "List of semicolon separated Agent IDs. Used to link with MaestroQA Total Scores table"
            - name: available
              data_type: bool
              description: "Flag indicating if agent is available at the moment"

Результирующие EXTERNAL TABLES будут использоваться в dbt в качестве sources (источников данных)

Моделирование витрины данных

Предлагаю взглянуть на то, как выглядят исходные данные, полученные из ответов API. Выгружаются 2 типа файлов:

  • Groups - справочник агентов, команд

  • Scores - факты оценок и скорингов коммуникаций

Особое внимание обратим на колонку agent_ids, представляющиую собой массив идентификаторов. С этим атрибутом придется повозиться - необходимо разбить массив на элементы и придать таблице плоский вид, добавив суррогатные ключи.

Наша задача - собрать широкую витрину, на основе которой впоследствии можно находить ответы на любые вопрос. Для этого объединим таблицы:

{{
    config (
      materialized='table',
      dist='auto',
      sort=['graded_dt', 'country'],
      tags=['maestroqa']
    )
}}

SELECT

    -- IDs
	  scores.gradable_id
	, scores.agent_id 

    -- dimensions
    , scores.grader
	, scores.agent_name
	, scores.agent_email
	, groups.group_name
	, CASE groups.group_name
      WHEN 'Ru Team' THEN 'RU'
      WHEN 'FR Team' THEN 'FR'
      WHEN 'UK Team' THEN 'GB'
      WHEN 'All Agents' THEN 'All'
    END AS country
        
    -- dates
    , scores.date_graded::DATE AS graded_dt

    -- measures
    , scores.rubric_score
	, scores.max_rubric_score

FROM {{ ref('stg_maestroqa_total_scores') }} AS scores
    LEFT JOIN {{ ref('flt_maestroqa_groups') }} AS groups ON groups.agent_id = scores.agent_id 
        AND groups.group_name IN ('All Agents', 'Ru Team', 'FR Team', 'UK Team')

Итоговый граф зависимостей моделей dbt (DAG) выглядит следующим образом:

Обеспечим доступ к данным через BI – Deliver value

Отлично, к этому этапу помимо набора файлов в S3 у нас есть постоянно обновляющаяся широкая таблица в СУБД (витрина), обращаясь к которой мы можем получить ответы на любые вопросы.

Однако не все пользователи одинаково способны формулировать свои вопросы на чистом SQL. Этот барьер призваны устранить BI-инструменты, основные задачи которых сводятся к:

  • Предоставлению визуального конструктора запросов к данным

  • Формированию набора измерений, метрик и фильтров для последующего использования бизнес-пользователями

  • Группировке ряда визуализаций и ответов в пользовательские дашборды

  • Настройке рассылок данных и уведомлений по определенным правилам

В Looker каждой колонке исходной таблицы можно присвоить статус измерения или метрики, задать правила агрегации, добавить комментарий. Это делается на языке LookML, напоминающем Javascript:

Далее любой пользователь может воспользоваться конструктором для визуального формирования ответов на свои вопросы – в Looker это называется Explore:

Готовые плиточки (tiles) можно группировать в дашборды, рассылать всем заинтересованным пользователям, устанавливать уведомления при достижении пороговых значений метрик:

В целом - это вершина айсберга, относительно проделанной работы, но именно на этом этапе создается основная ценность и польза для компании.

Умение строить комплексные решения, отвечающие на запросы бизнеса

Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.

Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics Engineer и Data Engineer в OTUS.

Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами:

  • Продвинутое моделирование в dbt

  • Развертывание и особенности работы c BI-инструментами

  • Аналитические паттерны и SQL

  • Кейсы: Сквозная аналитика, Company’s KPI, Timeseries analysis

Также своими наблюдениями, опытом и практиками я делюсь в ТГ-канале Technology Enthusiast.

Напишите комментарий, если сталкивались с потребностью строить подобные решения, и какой подход применяли?

Спасибо!




Комментарии (1):

  1. dyadyaSerezha
    /#24342432 / +1

    Организация Extract & Load данных из асинхронного API MaestroQA

    Для преподавателя непростительно выражаться так неграмотно. Данные выгружаются не из API, а из MaestroQA с помощью его API.

    Далее, не очень понятно, зачем тут bash на нижнем уровне, если его вызывает скрипт на Python. Тот же самый нижний уровень прекрасно реализуется на Python и все получается гораздо проще и логичнее.