В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.
Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки dispatch()
, примерно так:
class MyFirstJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
$this->dispatch(new MySecondJob($this->data)); // Second task
}
}
class MySecondJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
if ($this->someCondition($this->data)) {
$this->dispatch(new MyThirdJob($this->data)); // Third task
}
}
}
dispatch()
) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;
abstract class PipelineAbstract
{
use DispatchesJobs;
/**
* @param array $params
* @return PipelineAbstract
*/
public function start(array $params)
{
$this->next(null, $params);
return $this;
}
/**
* @param Job $currentJob
* @param array $params Set of parameters for starting new jobs
*/
abstract public function next(Job $currentJob = null, array $params);
/**
* @param Job $job
*/
protected function startJob(Job $job)
{
$this->dispatch($job);
}
}
next()
у нас как раз и будет реализован бизнес процесс. startJob()
— просто обертка над dispatch()
на всякий случай. А start()
будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;
class ProcessDataPipeline extends PipelineAbstract
{
/**
* @inheritdoc
*/
public function next(Job $currentJob = null, array $params)
{
// Start first job
if ($currentJob === null)
{
$this->startJob(new MyFirstJob($params, $this));
}
if ($currentJob instanceof MyFirstJob)
{
$this->startJob(new MySecondJob($params, $this));
}
if ($currentJob instanceof MySecondJob)
{
if ($this->someCondition($params))
{
$this->startJob(new MyThirdJob($params, $this));
}
}
}
}
MyFirstJob
.$this->dispatch(new MyFirstJob($data));
(new ProcessDataPipeline())->start($data);
next()
.$this->dispatch(new MySecondJob($data));
$this->next($data);
<?php
namespace App\Jobs;
use App\Jobs\Pipeline\PipelineAbstract;
abstract class Job
{
/**
* @param array $params
*/
public function next(array $params)
{
if ($this->pipeline)
{
$this->pipeline->next($this, $params);
}
}
}
next()
) обрабатывались нужной реализацией пайплайна.class MyFirstJob extends Job
{
/**
* @param mixed data
* @param PipelineAbstract|null $pipeline
*/
public function __construct($data, PipelineAbstract $pipeline = null)
{
$this->data = $data;
$this->pipeline = $pipeline;
}
}
withChain()
, он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье.
К сожалению, не доступен сервер mySQL