Как работать с событиями в Flussonic +11


Работа с событиями в Flussonic для мониторинга


Пользователи часто обращаются с вопросом: как сделать так, что бы Flussonic прислал письмо при падении потока.

Включив зануду можно пробубнить о том, что непонятно что такое падение и и т.п.  Вопросов масса, потому что битрейт потока ненулевой, кадры идут, а там будет белый шум или черный экран. Поток вроде как работает, а по сути нет. Но рассмотрим решение оригинальной задачи с помощью новой системы событий.

Самый простой вариант будет наивным, но рабочим. В конфиг стримера добавляем:

notify no_video {
  sink /etc/flussonic/no_video.lua;
}

в файле /etc/flussonic/no_video.lua пишем:

for k,event in pairs(events) do -- события приходят в обработчик пачками, обработаем целиком группу
   if event.event == "source_lost" or event.event == "stream_stopped" then -- отфильтруем только те события, которые нужны
     mail.send({from = "flussonic@streamer1.mycdn", to = "marketing@team.mycdn", subject = "Source lost", body = "source lost on "..event.media}) -- и пошлем письмо на каждое событие
   end
end

Предфильтрация


Важно не ставить здесь личный емейл, потому что при запуске такого кода на продакшне выгребать прийдется много писем.

Внутри видеостримера постоянно генерируется огромное количество событий: на каждую открытую/закрытую сессию, на каждый записанный в архив фрагмент и т.п., так что начнем оптимизировать этот код с использования предфильтрации в конфиге:

notify no_video {
  sink /etc/flussonic/no_video.lua;
  only event=source_lost,source_ready,stream_stopped;
}

Это важно, потому что фильтрация происходит в том коде, который посылает событие, а не в обработчике, в итоге вы можете здорово разгрузить код и не нагружать одно ядро, а это очень важно на многоядерных машинах.

Конфигурирование


Свеженаписанный lua скрипт безусловно будет пользоваться популярностью, плюс может захотеться для одной группы потоков слать письма одним людям, а для других потоков другим. Можно написать скрипт один раз, а его настройки держать в конфиге флюссоника:

notify no_video {
  sink /etc/flussonic/no_video.lua;
  only event=source_lost,source_ready,stream_stopped;
  to marketing@team.mycdn;
  from flussonic@streamer1.mycdn;
}

и в скрипте используем параметр args:

for k,event in pairs(events) do
   if event.event == "source_lost" or event.event == "stream_stopped" then
     mail.send({from = args.from, to = args.to, subject = "Source lost", body = "source lost on "..event.media})
   end
end

Немножко подправим этот скрипт, что бы слать не много, а одно письмо и только если проблема была:

local body = ""
local count = 0
for k,event in pairs(events) do
   if event.event == "source_lost" or event.event == "stream_stopped" then
     count = count + 1
     body = body.."  "..event.media.."\n"
   end
end

if count > 0 then
  mail.send({from = args.from, to = args.to, subject = "Source lost", body = "source lost on: \n"..body})
end

Debounce


Скрипт, который у нас получился, пока далек от совершенства и это сразу станет понятно на более менее живом продакшне например, с кучей пользовательских камер.

Давайте найдем способ сократить почтовый трафик и сделать письма пополезнее. Получив сообщение о падении стрима мы не будем сразу слать письмо, а подождем 30 секунд, что бы собрать кто ещё упал.

Для этого воспользуемся встроенным в Flussonic механизмом таймера для обработчиков событий. Дело в том, что Flussonic сохраняет состояние обработчика между запросами и запускает обработчик с этим состоянием по очереди в один поток. 

Будьте с этим аккуратны: если вы будете накапливать все-все события в каком-нибудь буфере, то вы сможете уронить сервер.

Как только приходит первое событие о падении, мы будем взводить таймер и не слать ничего до его истечения. За это время будем накапливать информацию об упавших потоках и потом пошлем всё разом.

if not new_dead_streams then
  new_dead_streams = {}
end


local some_stream_died = false

for k,evt in pairs(events) do
  if evt.event == "stream_stopped" or evt.event == "source_lost" then
    new_dead_streams[evt.media] = true
    some_stream_died = true
  end
end

if not notify_timer and some_stream_died then
  notify_timer = flussonic.timer(30000, "handle_timer", "go")
end

function handle_timer(arg)
  local body = "Local time "..flussonic.now().."\n"

  for name,flag in pairs(new_dead_streams) do
    body = body.."  "..name.."\n"
  end
  
  new_dead_streams = {}
  mail.send({from = args.from, to = args.to, subject = "Source lost", body = body})
end

Функция flussonic.timer первым параметром принимает время вызова функции в миллисекундах, потом имя функции и потом аргумент.

Передать локально объявленную таблицу не получится, так что будьте с этим аккуратны: шлите число или строку или пользуйтесь глобальной переменной.

К черту почту, давайте слак


Давайте поменяем рассылку почты на пуш в канал в слаке. Надо зайти в https://YOURTEAM.slack.com/apps/manage/custom-integrations, оттуда перейти в incoming webhooks

Там получите урл вида:

https://hooks.slack.com/services/NB8hv62/ERBAdLVT/VW9teYkRPMp2NMbU

поменяем mail.send на:

    message = {text= body, username= "flussonic" }
    http.post(args.slack_url, {["content-type"] = "application/json"}, json.encode(message))

и поменяем конфигурацию:

notify no_video {
  sink /etc/flussonic/no_video.lua;
  only event=source_lost,source_ready,stream_stopped;
  slack_url https://hooks.slack.com/services/NB8hv62/ERBAdLVT/VW9teYkRPMp2NMbU;
}

Зачем нужны args?


Казалось бы: зачем передавать в скрипт какие-то параметры, если скрипт итак можно поправить?

Во-первых, наша практика такая, что к нам регулярно обращаются за помощью пользователи, которые даже линукс в первый раз видят, а уж редактирование lua скрипта для них просто немыслимая задача: проще дать им написанный нами скрипт из пакета и положить нужные параметры в конфиг.

Во-вторых, один и тот же скрипт может использоваться с разными параметрами для разных стримов. Тут уже и опытному пользователю гораздо проще может оказаться передать аргументы через конфиг, а не через навороченные условия в самом скрипте.

Чего там под капотом?


В Flussonic система событий была давно, чуть ли не с 2012 года, но предыдущая реализация откровенно хромала. Руководствуясь названием модуля из stdlib, был выбран модуль gen_event для реализации этого механизма. 

Если вкратце: не пользуйтесь gen_event никогда, он нигде не нужен. Одна из важнейших причин: если у gen_event по какой-то причине сбойнул обработчик, он тихо молча (или с небольшой руганью в логи) будет удален и никакого перезапуска не будет.

Мы поменяли внутреннюю структуру обработки событий. Каждый обработчик, описанный в конфиге, запускается в отдельном gen_server и при аварийном сбое будет перезапущен. Каждый такой процесс обработчик старается на каждом сообщении собрать в пачку максимальное количество уже совершившихся событий (но не больше лимита, около тысячи) и передать их все вместе обработчику. Такой подход должен снижать накладные расходы на передаче по HTTP или при записи в файл.

Эта переделка позволила перенести фильтрацию событий в посылающий процесс: таким образом если у вас на сервере под тысячу стримов, а следить надо только за тремя, то фильтрацией будет заниматься не одно ядро, а все 48 или скольких там у вас. Это существенный момент для масштабируемости системы.

Тонкий, но важный ньюанс в том, что обработчики событий умеют следить за перегрузкой и в принципе даже сбрасывать события. Если дела обстоят очень плохо, то флюссоник не будет бесконечно накапливать события в очередях, а начнет их выбрасывать. Проверка на перегруженность делается очень нетривиально, потому что в таких случаях нельзя поллить длину очереди сообщений принимающего процесса, так что мы делаем это через самостоятельную проверку процессом, отправляющим события, собственной живости.

Заключение


В этой статье я немножко рассказал о том, как: 

# сделать свой обработчик событий в Flussonic на lua
# послать почту из lua: mail.send# сделать HTTP запрос с json телом:  http.post и json.encode# воспользоваться таймером: flussonic.timer.
-->


К сожалению, не доступен сервер mySQL