Skip to content

Commit 38d7cd7

Browse files
authored
FRW-11084 introduced new worker (#124)
FRW-11084 As a Project Engineer I want to have P&S workers being more stable
1 parent f4b6a8f commit 38d7cd7

29 files changed

Lines changed: 2588 additions & 30 deletions

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"spryker/log": "^3.0.0",
1111
"spryker/monolog": "^2.0.0",
1212
"spryker/propel-orm": "^1.0.0",
13-
"spryker/queue-extension": "^1.0.0",
13+
"spryker/queue-extension": "^1.1.0",
14+
"spryker/store": "^1.25.0",
1415
"spryker/symfony": "^3.0.0",
1516
"spryker/transfer": "^3.0.0",
1617
"spryker/util-encoding": "^2.1.0"

phpstan.neon

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ parameters:
22
level: 7
33
ignoreErrors:
44
- '#Call to an undefined method Propel\\Runtime\\Collection\\Collection::toArray\(\).#'
5+
- '#Comparison operation ">" between 0 and 0 is always false.#'
56
-
67
identifier: missingType.iterableValue
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Shared\Queue\Enum;
9+
10+
enum QueueReadModeEnum: int
11+
{
12+
/**
13+
* Default mode, based on the order of queues in \Pyz\Zed\Queue\QueueDependencyProvider::getProcessorMessagePlugins
14+
*/
15+
case MODE_READ_ORDER = 0;
16+
17+
case MODE_PREFER_PUB = 1;
18+
19+
case MODE_PREFER_SYNC = 2;
20+
21+
case MODE_PREFER_BIG = 4;
22+
23+
case MODE_PREFER_SMALL = 8;
24+
25+
case MODE_PREFER_DEFAULT_STORE = 16;
26+
27+
case MODE_PREFER_FAST = 32;
28+
29+
case MODE_PREFER_SLOW = 64;
30+
31+
case MODE_ONLY_PREFERRED = 128;
32+
}

src/Spryker/Shared/Queue/QueueConstants.php

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,115 @@ interface QueueConstants
189189
* @var string
190190
*/
191191
public const QUEUE_WORKER_MAX_WAITING_ROUNDS = 'QUEUE:QUEUE_WORKER_MAX_WAITING_ROUNDS';
192+
193+
/**
194+
* Specification:
195+
* - Enables processing of queues with resource aware queue worker.
196+
*
197+
* @api
198+
*
199+
* @var string
200+
*/
201+
public const RESOURCE_AWARE_QUEUE_WORKER_ENABLED = 'QUEUE:RESOURCE_AWARE_QUEUE_WORKER_ENABLED';
202+
203+
/**
204+
* Specification:
205+
* - Max concurrent PHP processes for all queues/stores.
206+
*
207+
* @api
208+
*
209+
* @var string
210+
*/
211+
public const QUEUE_WORKER_MAX_PROCESSES = 'QUEUE:QUEUE_WORKER_MAX_PROCESSES';
212+
213+
/**
214+
* Specification:
215+
* - Defines whether to ignore cases when system free memory can't be detected/read or parsed.
216+
*
217+
* @api
218+
*
219+
* @var string
220+
*/
221+
public const QUEUE_WORKER_IGNORE_MEMORY_READ_FAILURE = 'QUEUE:QUEUE_WORKER_IGNORE_MEMORY_READ_FAILURE';
222+
223+
/**
224+
* Specification:
225+
* - Defines free memory buffer for reliability in MBs.
226+
*
227+
* @api
228+
*
229+
* @var string
230+
*/
231+
public const QUEUE_WORKER_FREE_MEMORY_BUFFER = 'QUEUE:QUEUE_WORKER_FREE_MEMORY_BUFFER';
232+
233+
/**
234+
* Specification:
235+
* - Defines timeout for memory read process(command) in seconds.
236+
*
237+
* @api
238+
*
239+
* @var string
240+
*/
241+
public const QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT = 'QUEUE:QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT';
242+
243+
/**
244+
* Specification:
245+
* - Defines a percentage by how much Worker can increase its own memory consumption within PHP process limit.
246+
* - When a limit reached - Worker will finish its job as usual to prevent memory leaks.
247+
*
248+
* @api
249+
*
250+
* @var string
251+
*/
252+
public const QUEUE_WORKER_MEMORY_MAX_GROWTH_FACTOR = 'QUEUE:QUEUE_WORKER_MEMORY_MAX_GROWTH_FACTOR';
253+
254+
/**
255+
* Specification:
256+
* - Defines timeout for waiting all run processes become completed in seconds.
257+
*
258+
* @api
259+
*
260+
* @var string
261+
*/
262+
public const QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT = 'QUEUE:QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT';
263+
264+
/**
265+
* Specification:
266+
* - Defines interval for checking the processes are completed in milliseconds.
267+
*
268+
* @api
269+
*
270+
* @var string
271+
*/
272+
public const QUEUE_WORKER_CHECK_PROCESSES_COMPLETE_INTERVAL_MILLISECONDS = 'QUEUE:QUEUE_WORKER_CHECK_PROCESSES_COMPLETE_INTERVAL_MILLISECONDS';
273+
274+
/**
275+
* Specification:
276+
* - Defines the mode of the queue processing worker.
277+
*
278+
* @api
279+
*
280+
* @var string
281+
*/
282+
public const QUEUE_PROCESSING_WORKER_DYNAMIC_MODE = 'QUEUE:QUEUE_PROCESSING_WORKER_DYNAMIC_MODE';
283+
284+
/**
285+
* Specification:
286+
* - Defines the threshold of the big queue.
287+
*
288+
* @api
289+
*
290+
* @var string
291+
*/
292+
public const QUEUE_PROCESSING_BIG_QUEUE_THRESHOLD_BATCHES_AMOUNT = 'QUEUE:QUEUE_PROCESSING_BIG_QUEUE_THRESHOLD_BATCHES_AMOUNT';
293+
294+
/**
295+
* Specification:
296+
* - Defines the limit of the processes per queue.
297+
*
298+
* @api
299+
*
300+
* @var string
301+
*/
302+
public const QUEUE_PROCESSING_LIMIT_OF_PROCESSES_PER_QUEUE = 'QUEUE:PROCESSING_LIMIT_OF_PROCESSES_PER_QUEUE';
192303
}

src/Spryker/Shared/Queue/Transfer/queue.transfer.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,37 @@
4242
<property name="processedMessageCount" type="int"/>
4343
<property name="isSuccesful" type="bool"/>
4444
</transfer>
45+
46+
<transfer name="QueueDynamicSettings" strict="true">
47+
<property name="mode" type="int"/>
48+
<property name="bigQueueBatches" type="int"/>
49+
<property name="limitPerQueue" type="int"/>
50+
</transfer>
51+
52+
<transfer name="QueueMetricsRequest" strict="true">
53+
<property name="queueName" type="string"/>
54+
<property name="storeName" type="string"/>
55+
<property name="localeName" type="string"/>
56+
</transfer>
57+
58+
<transfer name="QueueMetricsResponse" strict="true">
59+
<property name="messageCount" type="int"/>
60+
<property name="consumerCount" type="int"/>
61+
</transfer>
62+
63+
<transfer name="Store">
64+
<property name="name" type="string"/>
65+
</transfer>
66+
67+
<transfer name="StoreCollection">
68+
<property name="stores" type="Store[]" singular="store"/>
69+
</transfer>
70+
71+
<transfer name="StoreCriteria">
72+
</transfer>
73+
74+
<transfer name="StoreConditions">
75+
<property name="withExpanders" type="bool"/>
76+
</transfer>
77+
4578
</transfers>

src/Spryker/Zed/Queue/Business/Checker/TaskMemoryUsageChecker.php

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,14 @@ class TaskMemoryUsageChecker implements TaskMemoryUsageCheckerInterface
1515
{
1616
use LoggerTrait;
1717

18-
/**
19-
* @var \Spryker\Zed\Queue\QueueConfig
20-
*/
21-
protected QueueConfig $queueConfig;
22-
23-
/**
24-
* @var \Spryker\Zed\Queue\Business\Reader\QueueConfigReaderInterface
25-
*/
26-
protected QueueConfigReaderInterface $queueConfigReader;
27-
2818
/**
2919
* @param \Spryker\Zed\Queue\QueueConfig $queueConfig
3020
* @param \Spryker\Zed\Queue\Business\Reader\QueueConfigReaderInterface $queueConfigReader
3121
*/
32-
public function __construct(QueueConfig $queueConfig, QueueConfigReaderInterface $queueConfigReader)
33-
{
34-
$this->queueConfig = $queueConfig;
35-
$this->queueConfigReader = $queueConfigReader;
22+
public function __construct(
23+
protected QueueConfig $queueConfig,
24+
protected QueueConfigReaderInterface $queueConfigReader,
25+
) {
3626
}
3727

3828
/**
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Zed\Queue\Business\Logger;
9+
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
class WorkerLogger implements WorkerLoggerInterface
13+
{
14+
protected const string ERROR_MESSAGE_TEMPLATE = '\033[31m%s\033[0m';
15+
16+
/**
17+
* @var array<string, float>
18+
*/
19+
protected array $timers = [];
20+
21+
public function __construct(protected OutputInterface $output)
22+
{
23+
}
24+
25+
public function logNotOftenThan(string $timerName, string|callable $message, string $level = 'debug', int $intervalSec = 1): void
26+
{
27+
if (microtime(true) - ($this->timers[$timerName] ?? 0) >= $intervalSec) {
28+
$this->timers[$timerName] = microtime(true);
29+
if (is_callable($message)) {
30+
$message = $message();
31+
}
32+
if ($level === 'debug' && $this->output->isDebug()) {
33+
$this->output->writeln($message);
34+
}
35+
if ($level === 'info' && $this->output->isVerbose()) {
36+
$this->output->writeln($message);
37+
}
38+
}
39+
}
40+
41+
public function info(string $message): void
42+
{
43+
if (!$this->output->isVerbose()) {
44+
return;
45+
}
46+
47+
$this->output->writeln($message);
48+
}
49+
50+
public function error(string $message): void
51+
{
52+
$this->output->writeln(sprintf(
53+
static::ERROR_MESSAGE_TEMPLATE,
54+
$message,
55+
));
56+
}
57+
58+
public function debug(string $message): void
59+
{
60+
if (!$this->output->isDebug()) {
61+
return;
62+
}
63+
64+
$this->output->writeln($message);
65+
}
66+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Zed\Queue\Business\Logger;
9+
10+
interface WorkerLoggerInterface
11+
{
12+
public function logNotOftenThan(string $timerName, string|callable $message, string $level = 'debug', int $intervalSec = 1): void;
13+
14+
public function info(string $message): void;
15+
16+
public function error(string $message): void;
17+
18+
public function debug(string $message): void;
19+
}

src/Spryker/Zed/Queue/Business/Process/ProcessManager.php

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,13 @@
1313
use Propel\Runtime\Formatter\SimpleArrayFormatter;
1414
use Spryker\Shared\Log\LoggerTrait;
1515
use Spryker\Zed\Queue\Persistence\QueueQueryContainerInterface;
16+
use Spryker\Zed\Queue\QueueConfig;
1617
use Symfony\Component\Process\Process;
1718

1819
class ProcessManager implements ProcessManagerInterface
1920
{
2021
use LoggerTrait;
2122

22-
/**
23-
* @var \Spryker\Zed\Queue\Persistence\QueueQueryContainerInterface
24-
*/
25-
protected $queryContainer;
26-
27-
/**
28-
* @var string
29-
*/
30-
protected $serverUniqueId;
31-
3223
/**
3324
* @var array<string>
3425
*/
@@ -43,10 +34,11 @@ class ProcessManager implements ProcessManagerInterface
4334
* @param \Spryker\Zed\Queue\Persistence\QueueQueryContainerInterface $queryContainer
4435
* @param string $serverUniqueId
4536
*/
46-
public function __construct(QueueQueryContainerInterface $queryContainer, $serverUniqueId)
47-
{
48-
$this->queryContainer = $queryContainer;
49-
$this->serverUniqueId = $serverUniqueId;
37+
public function __construct(
38+
protected QueueQueryContainerInterface $queryContainer,
39+
protected string $serverUniqueId,
40+
protected QueueConfig $queueConfig,
41+
) {
5042
}
5143

5244
/**
@@ -261,6 +253,10 @@ protected function createQueueProcessTransfer($queue, $processId)
261253
*/
262254
protected function saveProcess(QueueProcessTransfer $queueProcessTransfer)
263255
{
256+
if ($this->queueConfig->isResourceAwareQueueWorkerEnabled()) {
257+
return $queueProcessTransfer;
258+
}
259+
264260
$processEntity = new SpyQueueProcess();
265261
$processEntity->fromArray($queueProcessTransfer->toArray());
266262
$processEntity->save();

0 commit comments

Comments
 (0)