Управление очередями в Laravel +5


image

В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.

Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки dispatch(), примерно так:

class MyFirstJob extends Job
{
    use DispatchesJobs;

    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle()
    {
        $this->doSomething($this->data);
        $this->dispatch(new MySecondJob($this->data));  // Second task
    }
}

А следующая стадия обработки инициировалась совершенно аналогично:

class MySecondJob extends Job
{
    use DispatchesJobs;

    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle()
    {
        $this->doSomething($this->data);
        if ($this->someCondition($this->data)) {
            $this->dispatch(new MyThirdJob($this->data));  // Third task
        }
    }
}

Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.

Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов dispatch()) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).

Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.

Управление очередями


У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.

<?php

namespace App\Jobs\Pipeline;

use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;

abstract class PipelineAbstract
{
    use DispatchesJobs;

    /**
     * @param array $params
     * @return PipelineAbstract
     */
    public function start(array $params)
    {
        $this->next(null, $params);
        return $this;
    }

    /**
     * @param Job $currentJob
     * @param array $params Set of parameters for starting new jobs
     */
    abstract public function next(Job $currentJob = null, array $params);

    /**
     * @param Job $job
     */
    protected function startJob(Job $job)
    {
        $this->dispatch($job);
    }

}

В методе next() у нас как раз и будет реализован бизнес процесс. startJob() — просто обертка над dispatch() на всякий случай. А start() будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).

Пример реализации бизнес логики:

<?php

namespace App\Jobs\Pipeline;

use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;

class ProcessDataPipeline extends PipelineAbstract
{

    /**
     * @inheritdoc
     */
    public function next(Job $currentJob = null, array $params)
    {
        // Start first job
        if ($currentJob === null)
        {
            $this->startJob(new MyFirstJob($params, $this));
        }

        if ($currentJob instanceof MyFirstJob)
        {
            $this->startJob(new MySecondJob($params, $this));
        }

        if ($currentJob instanceof MySecondJob)
        {
            if ($this->someCondition($params))
            {
                $this->startJob(new MyThirdJob($params, $this));
            }
        }
    }
}

Вот и все. Остается только заменить запуск MyFirstJob.

Было

$this->dispatch(new MyFirstJob($data));

Стало

(new ProcessDataPipeline())->start($data);

А вместо добавления заданий в остальные очереди вызовем метод next().

Было

$this->dispatch(new MySecondJob($data));

Стало

$this->next($data);

Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.

<?php

namespace App\Jobs;

use App\Jobs\Pipeline\PipelineAbstract;

abstract class Job
{
    /**
     * @param array $params
     */
    public function next(array $params)
    {
        if ($this->pipeline)
        {
            $this->pipeline->next($this, $params);
        }
    }
}

И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода next()) обрабатывались нужной реализацией пайплайна.

class MyFirstJob extends Job
{

    /**
     * @param mixed data
     * @param PipelineAbstract|null $pipeline
     */
    public function __construct($data, PipelineAbstract $pipeline = null)
    {
        $this->data = $data;
        $this->pipeline = $pipeline;
    }
}

Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:

Что хорошего


  • Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
  • Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
  • Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.

Кстати, в свежей версии Laravel появился похожий инструмент withChain(), он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье.




К сожалению, не доступен сервер mySQL