Потоковая аналитика: быстрый запуск с SAS ESP +12


Применение аналитических алгоритмов на потоке данных сейчас одна из самых актуальных задач в области построения аналитических систем. Множество высокоточных предиктивных моделей, например, разработанных на показаниях с датчиков промышленных установок, уже готовы предупреждать серьезные аварии на производстве, но для этого их нужно выполнять на конечных устройствах («edge devices»), там, где показания с сенсоров поступают в реальном времени. Решить эту проблему и перенести аналитику в «онлайн» призван продукт SAS Event Stream Processing. В этой публикации хотелось поделится опытом его настройки на примере прикладной задачи – анализа изображений с видеокамер.



Технология


В продуктовой линейке SAS уже много лет присутствует и развивается собственное решение потоковой обработки, SAS Event Stream Processing (далее ESP). Основной задачей перед разработчиками была добиться высокой производительности, на порядки превышающей существующие решения на рынке. Для этого при разработке ESP полностью отказались от хранения промежуточных расчетов и индексов на диске. Все преобразования потока выполняются в ОЗУ, притом сохраняется пособытийный пересчет.


Работа с памятью в SAS ESP. Репозиторий событий в ОЗУ хранит промежуточные результаты и не блокируется в случае параллельных обращений внутренних или внешних процессов благодаря дополнительным индексам.

Движок умеет обрабатывать большие потоки данных — в несколько миллионов событий в секунду. При этом сохраняя низкую латентность, из-за чего он быстро занял свое место в решениях для онлайн маркетинга и противодействия мошенничеству в банках. Там он является интеллектуальным фильтром операций клиентов и мгновенно выявляет нестандартную активность по счетам, выделяя из потока данных мошеннические операции.

Но все же это второстепенная функция ESP. Движок разработан как механизм непрерывного применения аналитических моделей (SAS, С, Python и других) на потоке данных.


Принцип потоковой обработки событий.

В качестве наглядного примера, к форуму SAS Viya Business Breakfast мы в SAS решили настроить демонстрационный стенд, взяв за основу задачу анализа изображений. И вот как мы его настроили и с чем столкнулись в процессе.

Анализ видео на потоке


В нашем распоряжении оказалась аналитическая модель, построенная коллегами из отдела углубленной аналитики на платформе SAS Viya. Суть ее была в том, что, получая на вход изображение водителя в салоне автомобиля, алгоритм (в данном случае — обученная свёрточная нейронная сеть) классифицировал поведение водителя по классам: «нормальная езда», «отвлекается на разговор с пассажиром», «разговаривает по телефону», «пишет SMS» и др.
Пример работы модели: входное изображение отнесено к классу «пишет SMS», основываясь на части изображения, куда попала рука с телефоном.

Возник вопрос, как запустить данный алгоритм в реальном времени, чтобы иметь возможность реагировать на нежелательное поведение. По сценарию снимки с камеры в режиме онлайн уже попадали в виде файлов в сетевую директорию на диске.

Оставалось только: подключится к данным, преобразовать изображения в нужный формат, применить модель, и по вероятности принадлежности к классу поведения выводить в реальном времени предупреждения на онлайн дашбордах. И оказалось это все можно настроить в GUI SAS ESP, без единой строчки программного кода(!).

SAS Event Stream Processing


Проект обнаружения опасных ситуаций в графическом интерфейсе SAS ESP Studio.
В графическом редакторе ESP мы добавили следующие трансформации потока данных:

  1. В качестве входных данных у нас два узла типа source: model_source и input_image. В model_source мы публикуем нашу модель по классификации. Для этого настраиваем файловый коннектор, который считывает команду из текстового файла, содержащую три параметра: название операции, тип формата модели и физический путь к бинарному файлу. ESP отставляет за каждым источником данных возможность быть как статическим так и потоковым, тем самым, мы в любой момент времени можем опубликовать в этот узел новые команды по загрузке моделей, что удобно в промышленной эксплуатации – нам не нужно останавливать проект, чтобы обновить версии алгоритмов.


    Настройка публикации команд в model_source

    Содержание входного файла модели:

    I,N,1,action,load
    I,N,2,type,astore
    I,N,3,reference,/opt/sas/demo/image_processing/d.astore
  2. Второй источник, input_image — это картинки с камеры. Для публикации данных мы используем штатный адаптер — программу, публикующую в ESP данные из источника. В данном случае мы использовали файловый адаптер, но можно и подключится к камере напрямую, например, через адаптер UVC, идущий в составе решения.


    Настройка UVC коннектора к распространенным типам видеокамер.
  3. Изображения могут приходить в формате, отличающемся от того, на котором обучали модель, и это может повлиять на качество классификации. Поэтому после input_image мы сразу добавляем узел resize_image и прописываем в его настройках нужный формат. В данном случае — сжимание до квадрата 100/100 пикселей.

    Настройка обработки изображений
  4. Выполнение модели настраиваем в CNN_score_code. Интерфейс автоматически вытащит из мета-описания модели выходные атрибуты — P__label_c0, P__label_c1, P__label_c2 и P__label_c9 — вероятности каждого класса поведения. Далее в узле max_class мы зададим новое вычисляемое поле, которое выберет класс с максимальной вероятностью.
  5. Теперь, запустив проект, мы сможем получать в реальном времени классификацию для каждого входного изображения. В нашей истории недобросовестный водитель периодически отвлекается от дороги. Но важно отделять те случаи, когда он отвлекался на секунду, от случаев длительного опасного вождения. Для этого добавляем 3 узла: хранение событий за последнюю минуту (copy_1_min), агрегацию по классам поведения(aggr_for_alert) и фильтр фактов «опасного вождения» (filter_alert).

    Настройка параметров хранения событий – за 1 последнюю минуту.

    Настройка подсчета событий в разрезе каждого класса поведения.

    Настройка правил фильтрации.

Индексы


Важно отметить, что у каждой трансформации в ESP есть настройка – “Состояние” (State). Она характеризует, является ли трансформация данных потоковой, т.е. при обработке входного события результат сразу передается далее по диаграмме и удаляется из памяти данного узла. Либо мы можем хранить результат обработки каждого события и построить по нему индекс для возможности доступа к нему в любой момент времени. Например, окно агрегации всегда хранит состояние, и непрерывно пособытийно пересчитывается на основе входного потока.

Запуск


Вернемся, к проекту. Нам остается его запустить .
Для удобства просмотра потоков данных вместе с ESP поставляется конструктор онлайн отчетов – SAS ESP Stream Viewer. В режиме настройки мы видим все запущенные проекты на серверах, и можем подключаться к ним и выбирать удобную визуализацию.


Онлайн дашборды в SAS ESP Stream Viewer

Вот и все. Мы получили возможность автоматизировать реакцию в реальном времени на видеосигнал. В данном случае мы настроили онлайн мониторинг, но с помощью тех же адаптеров/коннекторов мы можем по каждому случаю выявленных нарушений отправлять сообщение, или формировать управляющее действие во внешнюю систему.

Или Python?


Технически кейс был завершен, но для целей демонстрации на форуме он казался недостаточно интерактивным. Его было неудобно, да и совсем небезопасно(!) показывать в реальном времени, поэтому фотографии были отсняты заранее и машина находилась на парковке.

SAS ESP поддерживает потоковое выполнение моделей, сделанных на Python. Для подтверждения мы настроили анализ изображений, а именно поиска объектов в изображении, используя библиотеки OpenCV.

Модель ESP для поиска строительной каски на входном потоке изображений
В диаграмме ESP у нас 2 источника данных: фреймы с онлайн камеры ноутбука и фотография объекта, который ищем. В данном случае мы определяем, не забыл ли потенциальный рабочий на строительном объекте о правилах безопасности: присутствует ли на нем каска. В узле proc_detection мы выполняем следующий код Python:

def compute_total(Image,image_template):
  "Output: score_point"
  import sys
  import cv2
  import numpy as np
  import os
  import base64
  import io
  from imageio import imread
  MIN_MATCH_COUNT = 10 # порог минимального количества совпадений ключевых точек
  DIST_COEFF = 0.55
  img_big = imread(io.BytesIO(base64.b64decode(image_template)))
  sift = cv2.xfeatures2d.SIFT_create() # Initiate SIFT detector
  matcher = cv2.BFMatcher() # BFMatcher with default params
  kp_big, des_big = sift.detectAndCompute(img_big,None)
  img_tpl = imread(io.BytesIO(base64.b64decode(Image)))
  kp_tpl, des_tpl = sift.detectAndCompute(img_tpl,None)
  matches = matcher.knnMatch(des_tpl,des_big,k=2)
  good = []
  for m,n in matches:
      if m.distance < n.distance * DIST_COEFF:
          good.append(m)
  score_point = len(good)
  return score_point


Идентификация наличия каски в изображении с веб-камеры, на форуме SAS Viya Business Breakfast (коллега, maxxts тестирует стенд)

Machine Learning


Если говорить о дальнейшем развитии потоковой обработки, то вероятно акцент будет смещаться на обучение аналитических моделей на потоке. Т.е. в отличие от предыдущих примеров, модели будут не только выполнять, но и обучать внутри SAS ESP. Для этого потребуется дополнительный поток данных, на котором пособытийно выполнять операцию train выбранного алгоритма (например, для начала, простую кластеризацию методом k-средних). Тогда машина с ESP на борту, получит возможность анализировать ситуацию по новым датчикам, которые только подключили. Это даст возможность быстро включать и автоматизировать новые устройства в концепции Индустрии 4.0.

Пожалуйста, пишите в комментариях, с какими задачами сталкивались в потоковой аналитике, и конечно, с удовольствием отвечу на вопросы о SAS Event Stream Processing.




К сожалению, не доступен сервер mySQL