Author:bromine
Links:https://www.jianshu.com/p/b44 …
Source: Simple Book
The copyright belongs to the author. This article has been reprinted with the authorization of the author and the original text has been rearranged.
Swoft Github:https://github.com/swoft-clou …
Swoft source code analysis series catalog:https://segmentfault.com/a/11 …
Preface
Swoft
The task function of is based onSwoole
TheTask mechanism
In other wordsSwoft
TheTask
The essence of mechanism is rightSwoole
TheTask mechanism
The packaging and strengthening of.
Task delivery
//Swoft\Task\Task.php
class Task
{
/**
* Deliver coroutine or async task
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
*
* @return bool|array
* @throws TaskException
*/
public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
{
$data = TaskHelper::pack($taskName, $methodName, $params, $type);
if(!App::isWorkerStatus() && !App::isCoContext()){
return self::deliverByQueue($data);//见下文Command章节
}
if(!App::isWorkerStatus() && App::isCoContext()){
throw new TaskException('Please deliver task by http!');
}
$server = App::$server->getServer();
// Delier coroutine task
if ($type == self::TYPE_CO) {
$tasks[0] = $data;
$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;
App::profileStart($prifleKey);
$result = $server->taskCo($tasks, $timeout);
App::profileEnd($prifleKey);
return $result;
}
// Deliver async task
return $server->task($data);
}
}
Task deliveryTask::deliver()
According to$type
Parameter passedSwoole
The$server->taskCo()
Or$server->task()
Interface posted toTask process
.Task
It is always executed synchronously.$type
Only the behavior of the delivery operation is affected,Task::TYPE_ASYNC
Corresponding$server->task()
It’s asynchronous delivery.Task::deliver()
Return immediately after calling;Task::TYPE_CO
Corresponding$server->taskCo()
It is coordinated delivery, which gives way to coordinated control after delivery, and when the task is completed or the execution is overtimeTask::deliver()
Before returning from the trip.
Task execution
//Swoft\Task\Bootstrap\Listeners\TaskEventListener
/**
* The listener of swoole task
* @SwooleListener({
* SwooleEvent::ON_TASK,
* SwooleEvent::ON_FINISH,
* })
*/
class TaskEventListener implements TaskInterface, FinishInterface
{
/**
* @param \Swoole\Server $server
* @param int $taskId
* @param int $workerId
* @param mixed $data
* @return mixed
* @throws \InvalidArgumentException
*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
try {
/* @var TaskExecutor $taskExecutor*/
$taskExecutor = App::getBean(TaskExecutor::class);
$result = $taskExecutor->run($data);
} catch (\Throwable $throwable) {
App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
$result = false;
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result;
}
}
Here isswoole.onTask
The event callback, its responsibility is only will beWorker
Packed data delivered by the process is forwarded toTaskExecutor
.
Swoole
TheTask
The essence of the mechanism isWorker process
Deliver time-consuming tasks to synchronizedTask process
(akaTaskWorker
) processing, soswoole.onTask
The event callback for is in theTask process
In the first half of this year. As mentioned above,Worker process
Most of youHTTP
Service code execution environment, but fromTaskEventListener.onTask()
Method, the code execution environment isTask process
In other words,TaskExecutor
And specificTaskBean
Are all implemented inTask process
Of course.
//Swoft\Task\TaskExecutor
/**
* The task executor
*
* @Bean()
*/
class TaskExecutor
{
/**
* @param string $data
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = App::getBean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
}
The task execution thought is very simple, willWorker process
The data sent is unpacked and restored to the original call parameters, according to$name
Parameter to find the correspondingTaskBean
And call its correspondingtask()
Methods. among themTaskBean
Using Class Level Annotations@Task(name="TaskName")
Or ..@Task("TaskName")
Statement.
It is worth mentioning that,@Task
Comments exceptname
Property, and one morecoroutine
Property, the above code will choose to use synergetic according to this parameterrunCoTask()
Or synchronizedrunSyncTask()
carry outTask
. But because and becauseSwoole
TheTask process
The execution of is completely synchronous, and coordination is not supported, so please do not configure this parameter as in the current version.true
. The same is true inTaskBean
The task code written in must be synchronously blocked or can automatically downgrade asynchronous non-blocking and coordination to synchronous blocking according to the environment
Deliver tasks from Process
We mentioned earlier:
Swoole
TheTask
The essence of the mechanism isWorker process
Deliver time-consuming tasks to synchronizedTask process
(akaTaskWorker
) processing.
In other words,Swoole
The$server->taskCo()
Or$server->task()
Only inWorker process
To be used in.
This restriction greatly limits the use of scenarios. How can you be in order to be able toProcess
What about the delivery task?Swoft
In order to bypass this restrictionTask::deliverByProcess()
Methods. The implementation principle is also very simple, throughSwoole
The$server->sendMessage()
Method will call information from theProcess
Deliver toWorker process
, and then delivered to by the Worker process on its behalfTask process
Among them, the relevant codes are as follows:
//Swoft\Task\Task.php
/**
* Deliver task by process
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
* @param int $workId
*
* @return bool
*/
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
/* @var PipeMessageInterface $pipeMessage */
$server = App::$server->getServer();
$pipeMessage = App::getBean(PipeMessage::class);
$data = [
'name' => $taskName,
'method' => $methodName,
'params' => $params,
'timeout' => $timeout,
'type' => $type,
];
$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
return $server->sendMessage($message, $workId);
}
Use after Data Packaging$server->sendMessage()
Deliver toWorker
:
//Swoft\Bootstrap\Server\ServerTrait.php
/**
* onPipeMessage event callback
*
* @param \Swoole\Server $server
* @param int $srcWorkerId
* @param string $message
* @return void
* @throws \InvalidArgumentException
*/
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
/* @var PipeMessageInterface $pipeMessage */
$pipeMessage = App::getBean(PipeMessage::class);
list($type, $data) = $pipeMessage->unpack($message);
App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
$server->sendMessage
After that,Worker process
A is triggered when data is receivedswoole.pipeMessage
The callback of the event,Swoft
Will convert it into its ownswoft.pipeMessage
Event and trigger.
//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
* The pipe message listener
*
* @Listener(event=AppEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (count($params) < 3) {
return;
}
list($type, $data, $srcWorkerId) = $params;
if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
return;
}
$type = $data['type'];
$taskName = $data['name'];
$params = $data['params'];
$timeout = $data['timeout'];
$methodName = $data['method'];
// delever task
Task::deliver($taskName, $methodName, $params, $type, $timeout);
}
}
swoft.pipeMessage
The incident was eventually resolved byPipeMessageListener
Handling. In the relevant monitoring, if foundswoft.pipeMessage
Events byTask::deliverByProcess()
The result is,Worker process
It will be executed once for it.Task::deliver()
And finally deliver the task data to theTaskWorker process
China.
A simple retrospective exercise: fromTask::deliverByProcess()
To [something]TaskBean
Which processes did the final task go through, and which parts of the call chain were executed in which processes?
Post tasks from the Command process or its child processes
//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}
ForCommand
The situation will be more complicated when the task of the process is delivered.
Mentioned aboveProcess
Which is often derived fromHttp/Rpc
Service, as the sameManager
The descendants of the process, they can getSwoole\Server
Through the handle variable of the$server->sendMessage()
,$server->task()
Such methods as task delivery.
But inSwoft
The system, there is a very passer-by role:Command
.Command
The process fromshell
Orcronb
Independent startup, andHttp/Rpc
Service-related processes are not related. thereforeCommand
Process and fromCommand
Started inProcess
There is no way to get the process.Swoole\Server
The call handle of the is passed directly through theUnixSocket
Carrying out the task delivery.
In order to provide task delivery support for this process,Swoft
UsedSwoole
TheTask process
A special function of-Message queue.
In the same projectCommand
AndHttp\RpcServer
By agreeing on amessage_queue_key
Get the same message queue in the system kernel, and thenComand
The process can send a message to theTask process
The delivery task has been completed.
The mechanism does not provide an open method to the outside world and is only included inTask::deliver()
In the method,Swoft
The delivery method will be switched implicitly according to the current environment. However, the implementation of this message queue depends onSemaphore
Expand, if you want to use, you need to compilePHP
Add--enable-sysvmsg
Parameters.
Scheduled task
In addition to common tasks performed manually,Swoft
It also provides a timed task function with a precision of seconds to replace Linux in the project.Crontab
Functions.
Swoft
With two preamblesProcess
-Task Planning Process:CronTimerProcess
And the task execution processCronExecProcess
, and two memory data tables-RunTimeTable
(Task (Configuration) Table)OriginTable
((Task) Execution Table) is used to manage and schedule scheduled tasks.
The structure of each row of records in the two tables is as follows:
\\Swoft\Task\Crontab\TableCrontab.php
/**
* 任务表,记录用户配置的任务信息
* 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
* @var array $originStruct
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];
/**
* 执行表,记录短时间内要执行的任务列表及其执行状态
* 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
* @var array $runTimeStruct
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。
//注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];
Why use Swoole’s memory Table here?
Swoft
The timing task management of is respectively composed ofTask planning processAndTask execution processThe process is responsible. The operation of the two processes jointly manages the scheduled tasks. If the independent process is usedarray()
Such as structure, the two processes must need frequent inter-process communication. While using cross-processTable
(of this articleTable
Unless otherwise specifiedSwoole
TheSwoole\Table
Structure) Direct data sharing between processes, which not only has high performance and simple operation, but also decouples the two processes.
in order toTable
Can be used jointly between the two processes,Table
Must be inSwoole Server
Create and allocate memory before startup. The specific code isSwoft\Task\Bootstrap\Listeners->onBeforeStart()
It is relatively simple, and those who are interested can read it by themselves.
After the background introduction, let’s take a look at the behavior of these two timed task processes.
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
* Crontab timer process
*
* @Process(name="cronTimer", boot=true)
*/
class CronTimerProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
//code....
/* @var \Swoft\Task\Crontab\Crontab $cron*/
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000;
$server->after($time, function () use ($server, $cron) {
// Every minute check all tasks, and prepare the tasks that next execution point needs
$cron->checkTask();
$server->tick(60 * 1000, function () use ($cron) {
$cron->checkTask();
});
});
}
}
//Swoft\Task\Crontab\Crontab.php
/**
* 初始化runTimeTable数据
*
* @param array $task 任务
* @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
CronTimerProcess
YesSwoft
The core method of the scheduled task scheduling process based on is as followsCrontab->initRunTimeTableData()
.
This process usesSwoole
The timer function of the, throughSwoole\Timer
Callbacks executed at the first second of each minute,CronTimerProcess
After each wake-up, the task list will be traversed to calculate the list of tasks to be executed for 60 seconds in the current minute, written into the execution table and marked as not executed.
//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
CronExecProcess
As the executor of the scheduled task, throughSwoole\Timer
Every ..0.5s
Wake yourself up once, and then putExecution table
Go through it once, select the tasks that need to be executed at present, and passsendMessage()
Deliver and update the status in the task execution table.
The execution process is only responsible for the delivery of the task, and the actual execution of the task is still in progress.Task process
ByTaskExecutor
Handling.