From 7c691de000ef7d95984d6e20aba0f02d7ddabaac Mon Sep 17 00:00:00 2001 From: tanhongbin-php <55487039+tanhongbin-php@users.noreply.github.com> Date: Wed, 10 Dec 2025 16:42:11 +0800 Subject: [PATCH 1/4] Add files via upload --- src/plugin/saiadmin/process/Async.php | 83 +++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/plugin/saiadmin/process/Async.php diff --git a/src/plugin/saiadmin/process/Async.php b/src/plugin/saiadmin/process/Async.php new file mode 100644 index 0000000..0228879 --- /dev/null +++ b/src/plugin/saiadmin/process/Async.php @@ -0,0 +1,83 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +namespace plugin\saiadmin\process; + +use support\Container; +use support\exception\BusinessException; +use support\Log; +use Workerman\Connection\TcpConnection; + +/** + * Class Consumer + * @package process + */ +class Async +{ + public function __construct() + { + + } + + public function onWorkerStart(object $worker) + { + + } + + public function onConnect(TcpConnection $connection) + { + + } + + public function onMessage(TcpConnection $connection, $data) + { + try { + //接受请求数据 + $data = json_decode($data, true); + //验证参数 + if(!is_array($data)){ + throw new BusinessException('parameter exception', 404); + } + //验证类 + $class = $data['class'] ?? ''; + if ($class === '' || !class_exists($class)) { + throw new BusinessException('class not found', 404); + } + //验证方法 + $method = $data['method'] ?? ''; + if (!method_exists($class, $method)) { + throw new BusinessException('method not found',404); + } + //获取参数 + $args = $data['args'] ?? []; + $class = Container::get($class); + call_user_func_array([$class, $method], [...$args]); + $json = ['code' => 200, 'msg' => 'success']; + } catch (BusinessException $exception) { + $json = ['code' => $exception->getCode(), 'msg' => $exception->getMessage()]; + Log::channel('async')->info(json_encode($json, JSON_UNESCAPED_UNICODE)); + } catch (\Throwable $exception) { + $json = ['code' => 500, 'msg' => ['errMessage'=>$exception->getMessage(), 'errCode'=>$exception->getCode(), 'errFile'=>$exception->getFile(), 'errLine'=>$exception->getLine()]]; + Log::channel('async')->error(json_encode($json, JSON_UNESCAPED_UNICODE)); + } + $connection->send(json_encode($json)); + + } + + public function onClose(TcpConnection $connection) + { + + } +} From a8d89fbc777a310aa82b72ef8b03d7e1ea07c81c Mon Sep 17 00:00:00 2001 From: tanhongbin-php <55487039+tanhongbin-php@users.noreply.github.com> Date: Wed, 10 Dec 2025 16:43:25 +0800 Subject: [PATCH 2/4] Add async task configuration to process.php Added configuration for async task processing with handler, listener, process count, and port reuse settings. --- src/plugin/saiadmin/config/process.php | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/plugin/saiadmin/config/process.php b/src/plugin/saiadmin/config/process.php index 44a49b7..a01d713 100644 --- a/src/plugin/saiadmin/config/process.php +++ b/src/plugin/saiadmin/config/process.php @@ -2,5 +2,12 @@ return [ 'task' => [ 'handler' => plugin\saiadmin\process\Task::class + ], + // 异步任务处理定时进程 + 'async' => [ + 'handler' => plugin\saiadmin\process\Async::class, + 'listen' => 'text://127.0.0.1:8900', // 这里用了text协议,也可以用frame或其它协议 + 'count' => 4, // 可以设置多进程 + 'reusePort' => true, // 平均分配进程 ] ]; From b77fbca4cbd655508a156c6b70bde8d1c3c70e60 Mon Sep 17 00:00:00 2001 From: tanhongbin-php <55487039+tanhongbin-php@users.noreply.github.com> Date: Wed, 10 Dec 2025 16:44:00 +0800 Subject: [PATCH 3/4] Refactor Task class to use static CrontabLogic methods --- src/plugin/saiadmin/process/Task.php | 47 +++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/plugin/saiadmin/process/Task.php b/src/plugin/saiadmin/process/Task.php index cc4a032..afc8db9 100644 --- a/src/plugin/saiadmin/process/Task.php +++ b/src/plugin/saiadmin/process/Task.php @@ -12,14 +12,12 @@ class Task { - protected $logic; //login对象 public $crontabIds = []; //定时任务表主键id => Crontab对象id public function __construct() { $dbName = env('DB_NAME'); if (!empty($dbName)) { - $this->logic = new CrontabLogic(); // 连接webman channel服务 Client::connect(); // 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调 @@ -38,11 +36,10 @@ public function onWorkerStart() public function initStart() { - $logic = new CrontabLogic(); - $taskList = $logic->where('status', 1)->select(); + $taskList = CrontabLogic::where('status', 1)->select(); foreach ($taskList as $item) { $crontab = new Crontab($item->rule, function () use ($item) { - $this->logic->run($item->id); + $this->run($item->id); }); $this->crontabIds[intval($item->id)] = $crontab->getId(); //存储定时任务表主键id => Crontab对象id echo date('Y-m-d H:i:s')." => 定时任务[".$item->id."][".$item->name."]:启动成功".PHP_EOL; @@ -57,13 +54,47 @@ public function reload($data) unset($this->crontabIds[$id]); //删除定时任务表主键id => Crontab对象id echo date('Y-m-d H:i:s')." => 定时任务[".$id."]:移除成功".PHP_EOL; } - $item = $this->logic->findOrEmpty($id);// 查询定时任务表数据 + $item = CrontabLogic::findOrEmpty($id);// 查询定时任务表数据 if (!$item->isEmpty() && $item->status == 1) { $crontab = new Crontab($item->rule, function () use ($item) { - $this->logic->run($item->id); + $this->run($item->id); }); $this->crontabIds[$id] = $crontab->getId(); //存储定时任务表主键id => Crontab对象id echo date('Y-m-d H:i:s')." => 定时任务[".$item->id."][".$item->name."]:启动成功".PHP_EOL; } } -} \ No newline at end of file + + private function run($id) : void + { + // 创建异步TCP连接 + $connection = new \Workerman\Connection\AsyncTcpConnection('tcp://127.0.0.1:8900'); + // 设置连接超时时间 + $connection->timeout = 5.0; + // 连接成功回调 + $connection->onConnect = function ($connection) use ($id) { + $request = [ + 'class' => CrontabLogic::class, + 'method' => 'run', + 'args' => [$id], + ]; + // 发送数据 + $connection->send(json_encode($request) . "\n"); + }; + // 接收数据回调 + $connection->onMessage = function ($connection, $data) { + // 关闭连接 + $connection->close(); + }; + // 连接错误回调 + $connection->onError = function ($connection, $code, $msg){ + \support\Log::error("异步进程连接错误: $code - $msg"); + $connection->close(); + }; + // 连接关闭回调 + $connection->onClose = function ($connection) { + // 连接关闭后的处理 + }; + // 发起连接 + $connection->connect(); + } +} From eb6820d35e15b85180c4c5de3c09ddcc9199584c Mon Sep 17 00:00:00 2001 From: tanhongbin-php <55487039+tanhongbin-php@users.noreply.github.com> Date: Wed, 10 Dec 2025 17:03:43 +0800 Subject: [PATCH 4/4] Refactor Task class to use CrontabLogic instance --- src/plugin/saiadmin/process/Task.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/plugin/saiadmin/process/Task.php b/src/plugin/saiadmin/process/Task.php index afc8db9..4f3f5cd 100644 --- a/src/plugin/saiadmin/process/Task.php +++ b/src/plugin/saiadmin/process/Task.php @@ -12,10 +12,12 @@ class Task { + protected $logic; public $crontabIds = []; //定时任务表主键id => Crontab对象id public function __construct() { + $this->logic = new CrontabLogic; $dbName = env('DB_NAME'); if (!empty($dbName)) { // 连接webman channel服务 @@ -36,7 +38,7 @@ public function onWorkerStart() public function initStart() { - $taskList = CrontabLogic::where('status', 1)->select(); + $taskList = $this->logic->where('status', 1)->select(); foreach ($taskList as $item) { $crontab = new Crontab($item->rule, function () use ($item) { $this->run($item->id); @@ -54,7 +56,7 @@ public function reload($data) unset($this->crontabIds[$id]); //删除定时任务表主键id => Crontab对象id echo date('Y-m-d H:i:s')." => 定时任务[".$id."]:移除成功".PHP_EOL; } - $item = CrontabLogic::findOrEmpty($id);// 查询定时任务表数据 + $item = $this->logic->findOrEmpty($id);// 查询定时任务表数据 if (!$item->isEmpty() && $item->status == 1) { $crontab = new Crontab($item->rule, function () use ($item) { $this->run($item->id);