Swoft source code analysis-Swoole Task Swoft

  php, swoole

Author:bromine
Links:https://www.jianshu.com/p/b44 …
Source: Simple Book
The copyright belongs to the author. This article has been reprinted with the authorization of the author and the original text has been rearranged.
Swoft Github:https://github.com/swoft-clou …

Swoft source code analysis series catalog:https://segmentfault.com/a/11 …

Preface

SwoftThe task function of is based onSwooleTheTask mechanismIn other wordsSwoftTheTaskThe essence of mechanism is rightSwooleTheTask mechanismThe packaging and strengthening of.

Task delivery

//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//见下文Command章节
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

Task deliveryTask::deliver()According to$typeParameter passedSwooleThe$server->taskCo()Or$server->task()Interface posted toTask process.
TaskIt is always executed synchronously.$typeOnly the behavior of the delivery operation is affected,Task::TYPE_ASYNCCorresponding$server->task()It’s asynchronous delivery.Task::deliver()Return immediately after calling;Task::TYPE_COCorresponding$server->taskCo()It is coordinated delivery, which gives way to coordinated control after delivery, and when the task is completed or the execution is overtimeTask::deliver()Before returning from the trip.

Task execution

//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

Here isswoole.onTaskThe event callback, its responsibility is only will beWorkerPacked data delivered by the process is forwarded toTaskExecutor.

SwooleTheTaskThe essence of the mechanism isWorker processDeliver time-consuming tasks to synchronizedTask process(akaTaskWorker) processing, soswoole.onTaskThe event callback for is in theTask processIn the first half of this year. As mentioned above,Worker processMost of youHTTPService code execution environment, but fromTaskEventListener.onTask()Method, the code execution environment isTask processIn other words,TaskExecutorAnd specificTaskBeanAre all implemented inTask processOf course.

//Swoft\Task\TaskExecutor
/**
 * The task executor
 *
 * @Bean()
 */
class TaskExecutor
{
    /**
     * @param string $data
     * @return mixed
    */
    public function run(string $data)
    {
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;


        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;
        }

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
        }

        return $result;
    }
}

The task execution thought is very simple, willWorker processThe data sent is unpacked and restored to the original call parameters, according to$nameParameter to find the correspondingTaskBeanAnd call its correspondingtask()Methods. among themTaskBeanUsing Class Level Annotations@Task(name="TaskName")Or ..@Task("TaskName")Statement.

It is worth mentioning that,@TaskComments exceptnameProperty, and one morecoroutineProperty, the above code will choose to use synergetic according to this parameterrunCoTask()Or synchronizedrunSyncTask()carry outTask. But because and becauseSwooleTheTask processThe execution of is completely synchronous, and coordination is not supported, so please do not configure this parameter as in the current version.true. The same is true inTaskBeanThe task code written in must be synchronously blocked or can automatically downgrade asynchronous non-blocking and coordination to synchronous blocking according to the environment

Deliver tasks from Process

We mentioned earlier:

SwooleTheTaskThe essence of the mechanism isWorker processDeliver time-consuming tasks to synchronizedTask process(akaTaskWorker) processing.

In other words,SwooleThe$server->taskCo()Or$server->task()Only inWorker processTo be used in.
This restriction greatly limits the use of scenarios. How can you be in order to be able toProcessWhat about the delivery task?SwoftIn order to bypass this restrictionTask::deliverByProcess()Methods. The implementation principle is also very simple, throughSwooleThe$server->sendMessage()Method will call information from theProcessDeliver toWorker process, and then delivered to by the Worker process on its behalfTask processAmong them, the relevant codes are as follows:

//Swoft\Task\Task.php
/**
 * Deliver task by process
 *
 * @param string $taskName
 * @param string $methodName
 * @param array  $params
 * @param string $type
 * @param int    $timeout
 * @param int    $workId
 *
 * @return bool
 */
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
    /* @var PipeMessageInterface $pipeMessage */
    $server      = App::$server->getServer();
    $pipeMessage = App::getBean(PipeMessage::class);
    $data = [
        'name'    => $taskName,
        'method'  => $methodName,
        'params'  => $params,
        'timeout' => $timeout,
        'type'    => $type,
    ];

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
    return $server->sendMessage($message, $workId);
}

Use after Data Packaging$server->sendMessage()Deliver toWorker:

//Swoft\Bootstrap\Server\ServerTrait.php
/**
 * onPipeMessage event callback
 *
 * @param \Swoole\Server $server
 * @param int            $srcWorkerId
 * @param string         $message
 * @return void
 * @throws \InvalidArgumentException
 */
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
    /* @var PipeMessageInterface $pipeMessage */
    $pipeMessage = App::getBean(PipeMessage::class);
    list($type, $data) = $pipeMessage->unpack($message);

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}

$server->sendMessageAfter that,Worker processA is triggered when data is receivedswoole.pipeMessageThe callback of the event,SwoftWill convert it into its ownswoft.pipeMessageEvent and trigger.

//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
 * The pipe message listener
 *
 * @Listener(event=AppEvent::PIPE_MESSAGE)
 */
class PipeMessageListener implements EventHandlerInterface
{
    /**
     * @param \Swoft\Event\EventInterface $event
     */
    public function handle(EventInterface $event)
    {
        $params = $event->getParams();
        if (count($params) < 3) {
            return;
        }

        list($type, $data, $srcWorkerId) = $params;

        if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
            return;
        }

        $type       = $data['type'];
        $taskName   = $data['name'];
        $params     = $data['params'];
        $timeout    = $data['timeout'];
        $methodName = $data['method'];

        // delever task
        Task::deliver($taskName, $methodName, $params, $type, $timeout);
    }
}

swoft.pipeMessageThe incident was eventually resolved byPipeMessageListenerHandling. In the relevant monitoring, if foundswoft.pipeMessageEvents byTask::deliverByProcess()The result is,Worker processIt will be executed once for it.Task::deliver()And finally deliver the task data to theTaskWorker processChina.

A simple retrospective exercise: fromTask::deliverByProcess()To [something]TaskBeanWhich processes did the final task go through, and which parts of the call chain were executed in which processes?

Post tasks from the Command process or its child processes

//Swoft\Task\QueueTask.php
/**
 * @param string $data
 * @param int    $taskWorkerId
 * @param int    $srcWorkerId
 *
 * @return bool
 */
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
    if ($taskWorkerId === null) {
        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
    }

    if ($srcWorkerId === null) {
        $srcWorkerId = mt_rand(0, $this->workerNum - 1);
    }

    $this->check();
    $data   = $this->pack($data, $srcWorkerId);
    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
    if (!$result) {
        return false;
    }

    return true;
}

ForCommandThe situation will be more complicated when the task of the process is delivered.
Mentioned aboveProcessWhich is often derived fromHttp/RpcService, as the sameManagerThe descendants of the process, they can getSwoole\ServerThrough the handle variable of the$server->sendMessage(),$server->task()Such methods as task delivery.

But inSwoftThe system, there is a very passer-by role:Command.
CommandThe process fromshellOrcronbIndependent startup, andHttp/RpcService-related processes are not related. thereforeCommandProcess and fromCommandStarted inProcessThere is no way to get the process.Swoole\ServerThe call handle of the is passed directly through theUnixSocketCarrying out the task delivery.
In order to provide task delivery support for this process,SwoftUsedSwooleTheTask processA special function of-Message queue.

使用消息队列的Task进程.png

In the same projectCommandAndHttp\RpcServerBy agreeing on amessage_queue_keyGet the same message queue in the system kernel, and thenComandThe process can send a message to theTask processThe delivery task has been completed.
The mechanism does not provide an open method to the outside world and is only included inTask::deliver()In the method,SwoftThe delivery method will be switched implicitly according to the current environment. However, the implementation of this message queue depends onSemaphoreExpand, if you want to use, you need to compilePHPAdd--enable-sysvmsgParameters.

Scheduled task

In addition to common tasks performed manually,SwoftIt also provides a timed task function with a precision of seconds to replace Linux in the project.CrontabFunctions.

SwoftWith two preamblesProcess-Task Planning Process:CronTimerProcessAnd the task execution processCronExecProcess
, and two memory data tables-RunTimeTable(Task (Configuration) Table)OriginTable((Task) Execution Table) is used to manage and schedule scheduled tasks.
The structure of each row of records in the two tables is as follows:

\\Swoft\Task\Crontab\TableCrontab.php
/**
 * 任务表,记录用户配置的任务信息
 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
 * @var array $originStruct 
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];

/**
 * 执行表,记录短时间内要执行的任务列表及其执行状态
 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
 * @var array $runTimeStruct 
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行)  1(已执行)  2(执行中) 三种。 
    //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];

Why use Swoole’s memory Table here?

SwoftThe timing task management of is respectively composed ofTask planning processAndTask execution processThe process is responsible. The operation of the two processes jointly manages the scheduled tasks. If the independent process is usedarray()Such as structure, the two processes must need frequent inter-process communication. While using cross-processTable(of this articleTableUnless otherwise specifiedSwooleTheSwoole\TableStructure) Direct data sharing between processes, which not only has high performance and simple operation, but also decouples the two processes.

in order toTableCan be used jointly between the two processes,TableMust be inSwoole ServerCreate and allocate memory before startup. The specific code isSwoft\Task\Bootstrap\Listeners->onBeforeStart()It is relatively simple, and those who are interested can read it by themselves.

After the background introduction, let’s take a look at the behavior of these two timed task processes.

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
/**
 * 初始化runTimeTable数据
 *
 * @param array $task        任务
 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
 * @return bool
 */
private function initRunTimeTableData(array $task, array $parseResult): bool
{
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $this->checkTaskQueue(false);
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL
        ]);
    }

    return true;
}

CronTimerProcessYesSwoftThe core method of the scheduled task scheduling process based on is as followsCrontab->initRunTimeTableData().
This process usesSwooleThe timer function of the, throughSwoole\TimerCallbacks executed at the first second of each minute,CronTimerProcessAfter each wake-up, the task list will be traversed to calculate the list of tasks to be executed for 60 seconds in the current minute, written into the execution table and marked as not executed.

//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}

CronExecProcessAs the executor of the scheduled task, throughSwoole\TimerEvery ..0.5sWake yourself up once, and then putExecution tableGo through it once, select the tasks that need to be executed at present, and passsendMessage()Deliver and update the status in the task execution table.
The execution process is only responsible for the delivery of the task, and the actual execution of the task is still in progress.Task processByTaskExecutorHandling.

The macro implementation of scheduled tasks is as follows:

Swoft定时任务机制.png