Skip to content

Commit 4a605ec

Browse files
authored
Merge pull request #29 from 21TORR/run-worker-command
2 parents 627476d + da8e3c5 commit 4a605ec

2 files changed

Lines changed: 108 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
2.1.0
2+
=====
3+
4+
* (feature) Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command.
5+
* (improvement) Default to limit of 5 messages in run worker command, if no other limit is given.
6+
7+
18
2.0.3
29
=====
310

src/Command/RunWorkerCommand.php

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Torr\TaskManager\Command;
4+
5+
use Symfony\Component\Console\Attribute\AsCommand;
6+
use Symfony\Component\Console\Command\Command;
7+
use Symfony\Component\Console\Input\ArrayInput;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
use Torr\Cli\Console\Style\TorrStyle;
12+
use Torr\TaskManager\Transport\TransportsHelper;
13+
14+
/**
15+
* @final
16+
*/
17+
#[AsCommand("task-manager:run-worker")]
18+
class RunWorkerCommand extends Command
19+
{
20+
/**
21+
*/
22+
public function __construct (
23+
private readonly TransportsHelper $transportsHelper,
24+
) {
25+
parent::__construct();
26+
}
27+
28+
/**
29+
*
30+
*/
31+
#[\Override]
32+
protected function configure () : void
33+
{
34+
$this
35+
->addOption(
36+
'limit',
37+
'l',
38+
InputOption::VALUE_REQUIRED,
39+
'Limit the number of received messages',
40+
)
41+
->addOption(
42+
'time-limit',
43+
't',
44+
InputOption::VALUE_REQUIRED,
45+
'The time limit in seconds the worker can handle new messages',
46+
)
47+
->addOption(
48+
'failure-limit',
49+
'f',
50+
InputOption::VALUE_REQUIRED,
51+
'The number of failed messages the worker can consume',
52+
)
53+
->addOption(
54+
'memory-limit',
55+
'm',
56+
InputOption::VALUE_REQUIRED,
57+
'The memory limit the worker can consume',
58+
);
59+
}
60+
61+
/**
62+
*/
63+
public function execute (InputInterface $input, OutputInterface $output) : int
64+
{
65+
$io = new TorrStyle($input, $output);
66+
$io->title("Task Manager: Run Worker");
67+
$io->comment("Starting task workers for all registered queues.");
68+
69+
if ($this->transportsHelper->hasSyncTransport())
70+
{
71+
$io->caution("The app is using sync transports: that means that registered tasks are directly worked on and not in this worker. These queues are automatically filtered in this worker by Symfony.");
72+
}
73+
74+
$limits = array_filter([
75+
'--limit' => $input->getOption("limit"),
76+
'--time-limit' => $input->getOption("time-limit"),
77+
'--failure-limit' => $input->getOption("failure-limit"),
78+
'--memory-limit' => $input->getOption("memory-limit"),
79+
]);
80+
81+
// if no limits are set, default to 5 messages
82+
if (empty($limits))
83+
{
84+
$limits["--limit"] = 5;
85+
}
86+
87+
$messengerConsumeArguments = new ArrayInput(array_filter([
88+
'command' => 'messenger:consume',
89+
'receivers' => $this->transportsHelper->getOrderedQueueNames(),
90+
...$limits,
91+
]));
92+
93+
// disable interactive behavior for the greet command
94+
$messengerConsumeArguments->setInteractive(false);
95+
96+
$application = $this->getApplication();
97+
\assert(null !== $application);
98+
99+
return $application->doRun($messengerConsumeArguments, $output);
100+
}
101+
}

0 commit comments

Comments
 (0)