Skip to content

Commit f1fc42a

Browse files
committed
Add task-manager:run-worker command as wrapper for Symfony messengers consume command
1 parent 627476d commit f1fc42a

2 files changed

Lines changed: 97 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
2.1.0
2+
=====
3+
4+
* (feature) Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command.
5+
6+
17
2.0.3
28
=====
39

src/Command/RunWorkerCommand.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Torr\TaskManager\Command;
4+
5+
use Symfony\Component\Console\Attribute\AsCommand;
6+
use Symfony\Component\Console\Command\Command;
7+
use Symfony\Component\Console\Input\ArrayInput;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
use Torr\Cli\Console\Style\TorrStyle;
12+
use Torr\TaskManager\Transport\TransportsHelper;
13+
14+
/**
15+
* @final
16+
*/
17+
#[AsCommand("task-manager:run-worker")]
18+
class RunWorkerCommand extends Command
19+
{
20+
/**
21+
*/
22+
public function __construct (
23+
private readonly TransportsHelper $transportsHelper,
24+
) {
25+
parent::__construct();
26+
}
27+
28+
/**
29+
*
30+
*/
31+
#[\Override]
32+
protected function configure () : void
33+
{
34+
$this
35+
->addOption(
36+
'limit',
37+
'l',
38+
InputOption::VALUE_REQUIRED,
39+
'Limit the number of received messages',
40+
)
41+
->addOption(
42+
'time-limit',
43+
't',
44+
InputOption::VALUE_REQUIRED,
45+
'The time limit in seconds the worker can handle new messages',
46+
)
47+
->addOption(
48+
'failure-limit',
49+
'f',
50+
InputOption::VALUE_REQUIRED,
51+
'The number of failed messages the worker can consume',
52+
)
53+
->addOption(
54+
'memory-limit',
55+
'm',
56+
InputOption::VALUE_REQUIRED,
57+
'The memory limit the worker can consume',
58+
);
59+
}
60+
61+
/**
62+
*/
63+
public function execute (InputInterface $input, OutputInterface $output) : int
64+
{
65+
$io = new TorrStyle($input, $output);
66+
$io->title("Task Manager: Run Worker");
67+
$io->comment("Starting task workers for all registered queues.");
68+
69+
if ($this->transportsHelper->hasSyncTransport())
70+
{
71+
$io->caution("The app is using sync transports: that means that registered tasks are directly worked on and not in this worker. These queues are automatically filtered in this worker by Symfony.");
72+
}
73+
74+
$messengerConsumeArguments = new ArrayInput(array_filter([
75+
'command' => 'messenger:consume',
76+
'receivers' => $this->transportsHelper->getOrderedQueueNames(),
77+
'--limit' => $input->getOption("limit"),
78+
'--time-limit' => $input->getOption("time-limit"),
79+
'--failure-limit' => $input->getOption("failure-limit"),
80+
'--memory-limit' => $input->getOption("memory-limit"),
81+
]));
82+
83+
// disable interactive behavior for the greet command
84+
$messengerConsumeArguments->setInteractive(false);
85+
86+
$application = $this->getApplication();
87+
\assert(null !== $application);
88+
89+
return $application->doRun($messengerConsumeArguments, $output);
90+
}
91+
}

0 commit comments

Comments
 (0)