Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/plugin/saiadmin/config/process.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,12 @@
return [
'task' => [
'handler' => plugin\saiadmin\process\Task::class
],
// 异步任务处理定时进程
'async' => [
'handler' => plugin\saiadmin\process\Async::class,
'listen' => 'text://127.0.0.1:8900', // 这里用了text协议,也可以用frame或其它协议
'count' => 4, // 可以设置多进程
'reusePort' => true, // 平均分配进程
]
];
83 changes: 83 additions & 0 deletions src/plugin/saiadmin/process/Async.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

/**
* This file is part of webman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/

namespace plugin\saiadmin\process;

use support\Container;
use support\exception\BusinessException;
use support\Log;
use Workerman\Connection\TcpConnection;

/**
* Class Consumer
* @package process
*/
class Async
{
public function __construct()
{

}

public function onWorkerStart(object $worker)
{

}

public function onConnect(TcpConnection $connection)
{

}

public function onMessage(TcpConnection $connection, $data)
{
try {
//接受请求数据
$data = json_decode($data, true);
//验证参数
if(!is_array($data)){
throw new BusinessException('parameter exception', 404);
}
//验证类
$class = $data['class'] ?? '';
if ($class === '' || !class_exists($class)) {
throw new BusinessException('class not found', 404);
}
//验证方法
$method = $data['method'] ?? '';
if (!method_exists($class, $method)) {
throw new BusinessException('method not found',404);
}
//获取参数
$args = $data['args'] ?? [];
$class = Container::get($class);
call_user_func_array([$class, $method], [...$args]);
$json = ['code' => 200, 'msg' => 'success'];
} catch (BusinessException $exception) {
$json = ['code' => $exception->getCode(), 'msg' => $exception->getMessage()];
Log::channel('async')->info(json_encode($json, JSON_UNESCAPED_UNICODE));
} catch (\Throwable $exception) {
$json = ['code' => 500, 'msg' => ['errMessage'=>$exception->getMessage(), 'errCode'=>$exception->getCode(), 'errFile'=>$exception->getFile(), 'errLine'=>$exception->getLine()]];
Log::channel('async')->error(json_encode($json, JSON_UNESCAPED_UNICODE));
}
$connection->send(json_encode($json));

}

public function onClose(TcpConnection $connection)
{

}
}
47 changes: 40 additions & 7 deletions src/plugin/saiadmin/process/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

class Task
{
protected $logic; //login对象
protected $logic;
public $crontabIds = []; //定时任务表主键id => Crontab对象id

public function __construct()
{
$this->logic = new CrontabLogic;
$dbName = env('DB_NAME');
if (!empty($dbName)) {
$this->logic = new CrontabLogic();
// 连接webman channel服务
Client::connect();
// 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
Expand All @@ -38,11 +38,10 @@ public function onWorkerStart()

public function initStart()
{
$logic = new CrontabLogic();
$taskList = $logic->where('status', 1)->select();
$taskList = $this->logic->where('status', 1)->select();
foreach ($taskList as $item) {
$crontab = new Crontab($item->rule, function () use ($item) {
$this->logic->run($item->id);
$this->run($item->id);
});
$this->crontabIds[intval($item->id)] = $crontab->getId(); //存储定时任务表主键id => Crontab对象id
echo date('Y-m-d H:i:s')." => 定时任务[".$item->id."][".$item->name."]:启动成功".PHP_EOL;
Expand All @@ -60,10 +59,44 @@ public function reload($data)
$item = $this->logic->findOrEmpty($id);// 查询定时任务表数据
if (!$item->isEmpty() && $item->status == 1) {
$crontab = new Crontab($item->rule, function () use ($item) {
$this->logic->run($item->id);
$this->run($item->id);
});
$this->crontabIds[$id] = $crontab->getId(); //存储定时任务表主键id => Crontab对象id
echo date('Y-m-d H:i:s')." => 定时任务[".$item->id."][".$item->name."]:启动成功".PHP_EOL;
}
}
}

private function run($id) : void
{
// 创建异步TCP连接
$connection = new \Workerman\Connection\AsyncTcpConnection('tcp://127.0.0.1:8900');
// 设置连接超时时间
$connection->timeout = 5.0;
// 连接成功回调
$connection->onConnect = function ($connection) use ($id) {
$request = [
'class' => CrontabLogic::class,
'method' => 'run',
'args' => [$id],
];
// 发送数据
$connection->send(json_encode($request) . "\n");
};
// 接收数据回调
$connection->onMessage = function ($connection, $data) {
// 关闭连接
$connection->close();
};
// 连接错误回调
$connection->onError = function ($connection, $code, $msg){
\support\Log::error("异步进程连接错误: $code - $msg");
$connection->close();
};
// 连接关闭回调
$connection->onClose = function ($connection) {
// 连接关闭后的处理
};
// 发起连接
$connection->connect();
}
}