diff --git a/composer.json b/composer.json index cd0f6b4..de40a1a 100644 --- a/composer.json +++ b/composer.json @@ -10,6 +10,8 @@ "php-di/slim-bridge": "^3.4", "symfony/console": "^5.4 | ^6 | ^7", "symfony/finder": "^5.4 | ^6 | ^7", + "symfony/cache": "^6 | ^7", + "symfony/messenger": "^6 | ^7", "symfony/property-access": "^7.2", "symfony/serializer": "^7.2", "phpstan/phpdoc-parser": "^2.0", @@ -30,7 +32,8 @@ "WonderNetwork\\SlimKernel\\": "tests/", "Acme\\": [ "tests/Resources/App/src/", - "tests/Resources/ErrorMiddleware/src/" + "tests/Resources/ErrorMiddleware/src/", + "tests/Resources/Messenger/src/" ] } }, diff --git a/composer.lock b/composer.lock index 64c6b50..35c3ea2 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "458f4c80b3a9b76bb55091c9d6a4ce54", + "content-hash": "5772a61c3ce824269fccf4122016f2c7", "packages": [ { "name": "doctrine/deprecations", @@ -610,6 +610,103 @@ }, "time": "2024-10-13T11:29:49+00:00" }, + { + "name": "psr/cache", + "version": "3.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/cache.git", + "reference": "aa5030cfa5405eccfdcb1083ce040c2cb8d253bf" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/cache/zipball/aa5030cfa5405eccfdcb1083ce040c2cb8d253bf", + "reference": "aa5030cfa5405eccfdcb1083ce040c2cb8d253bf", + "shasum": "" + }, + "require": { + "php": ">=8.0.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Cache\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for caching libraries", + "keywords": [ + "cache", + "psr", + "psr-6" + ], + "support": { + "source": "https://github.com/php-fig/cache/tree/3.0.0" + }, + "time": "2021-02-03T23:26:27+00:00" + }, + { + "name": "psr/clock", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/clock.git", + "reference": "e41a24703d4560fd0acb709162f73b8adfc3aa0d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/clock/zipball/e41a24703d4560fd0acb709162f73b8adfc3aa0d", + "reference": "e41a24703d4560fd0acb709162f73b8adfc3aa0d", + "shasum": "" + }, + "require": { + "php": "^7.0 || ^8.0" + }, + "type": "library", + "autoload": { + "psr-4": { + "Psr\\Clock\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for reading the clock.", + "homepage": "https://github.com/php-fig/clock", + "keywords": [ + "clock", + "now", + "psr", + "psr-20", + "time" + ], + "support": { + "issues": "https://github.com/php-fig/clock/issues", + "source": "https://github.com/php-fig/clock/tree/1.0.0" + }, + "time": "2022-11-25T14:36:26+00:00" + }, { "name": "psr/container", "version": "2.0.2", @@ -1176,6 +1273,264 @@ ], "time": "2024-06-13T08:54:48+00:00" }, + { + "name": "symfony/cache", + "version": "v7.4.5", + "source": { + "type": "git", + "url": "https://github.com/symfony/cache.git", + "reference": "8dde98d5a4123b53877aca493f9be57b333f14bd" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/cache/zipball/8dde98d5a4123b53877aca493f9be57b333f14bd", + "reference": "8dde98d5a4123b53877aca493f9be57b333f14bd", + "shasum": "" + }, + "require": { + "php": ">=8.2", + "psr/cache": "^2.0|^3.0", + "psr/log": "^1.1|^2|^3", + "symfony/cache-contracts": "^3.6", + "symfony/deprecation-contracts": "^2.5|^3", + "symfony/service-contracts": "^2.5|^3", + "symfony/var-exporter": "^6.4|^7.0|^8.0" + }, + "conflict": { + "doctrine/dbal": "<3.6", + "ext-redis": "<6.1", + "ext-relay": "<0.12.1", + "symfony/dependency-injection": "<6.4", + "symfony/http-kernel": "<6.4", + "symfony/var-dumper": "<6.4" + }, + "provide": { + "psr/cache-implementation": "2.0|3.0", + "psr/simple-cache-implementation": "1.0|2.0|3.0", + "symfony/cache-implementation": "1.1|2.0|3.0" + }, + "require-dev": { + "cache/integration-tests": "dev-master", + "doctrine/dbal": "^3.6|^4", + "predis/predis": "^1.1|^2.0", + "psr/simple-cache": "^1.0|^2.0|^3.0", + "symfony/clock": "^6.4|^7.0|^8.0", + "symfony/config": "^6.4|^7.0|^8.0", + "symfony/dependency-injection": "^6.4|^7.0|^8.0", + "symfony/filesystem": "^6.4|^7.0|^8.0", + "symfony/http-kernel": "^6.4|^7.0|^8.0", + "symfony/messenger": "^6.4|^7.0|^8.0", + "symfony/var-dumper": "^6.4|^7.0|^8.0" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\Cache\\": "" + }, + "classmap": [ + "Traits/ValueWrapper.php" + ], + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Provides extended PSR-6, PSR-16 (and tags) implementations", + "homepage": "https://symfony.com", + "keywords": [ + "caching", + "psr6" + ], + "support": { + "source": "https://github.com/symfony/cache/tree/v7.4.5" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2026-01-27T16:16:02+00:00" + }, + { + "name": "symfony/cache-contracts", + "version": "v3.6.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/cache-contracts.git", + "reference": "5d68a57d66910405e5c0b63d6f0af941e66fc868" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/cache-contracts/zipball/5d68a57d66910405e5c0b63d6f0af941e66fc868", + "reference": "5d68a57d66910405e5c0b63d6f0af941e66fc868", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "psr/cache": "^3.0" + }, + "type": "library", + "extra": { + "thanks": { + "url": "https://github.com/symfony/contracts", + "name": "symfony/contracts" + }, + "branch-alias": { + "dev-main": "3.6-dev" + } + }, + "autoload": { + "psr-4": { + "Symfony\\Contracts\\Cache\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Generic abstractions related to caching", + "homepage": "https://symfony.com", + "keywords": [ + "abstractions", + "contracts", + "decoupling", + "interfaces", + "interoperability", + "standards" + ], + "support": { + "source": "https://github.com/symfony/cache-contracts/tree/v3.6.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2025-03-13T15:25:07+00:00" + }, + { + "name": "symfony/clock", + "version": "v7.4.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/clock.git", + "reference": "9169f24776edde469914c1e7a1442a50f7a4e110" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/clock/zipball/9169f24776edde469914c1e7a1442a50f7a4e110", + "reference": "9169f24776edde469914c1e7a1442a50f7a4e110", + "shasum": "" + }, + "require": { + "php": ">=8.2", + "psr/clock": "^1.0", + "symfony/polyfill-php83": "^1.28" + }, + "provide": { + "psr/clock-implementation": "1.0" + }, + "type": "library", + "autoload": { + "files": [ + "Resources/now.php" + ], + "psr-4": { + "Symfony\\Component\\Clock\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Decouples applications from the system clock", + "homepage": "https://symfony.com", + "keywords": [ + "clock", + "psr20", + "time" + ], + "support": { + "source": "https://github.com/symfony/clock/tree/v7.4.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2025-11-12T15:39:26+00:00" + }, { "name": "symfony/console", "version": "v7.3.6", @@ -1409,6 +1764,100 @@ ], "time": "2025-10-15T18:45:57+00:00" }, + { + "name": "symfony/messenger", + "version": "v7.4.4", + "source": { + "type": "git", + "url": "https://github.com/symfony/messenger.git", + "reference": "0a39e1b256f280762293f2f441e430c8baf74f9c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/messenger/zipball/0a39e1b256f280762293f2f441e430c8baf74f9c", + "reference": "0a39e1b256f280762293f2f441e430c8baf74f9c", + "shasum": "" + }, + "require": { + "php": ">=8.2", + "psr/log": "^1|^2|^3", + "symfony/clock": "^6.4|^7.0|^8.0", + "symfony/deprecation-contracts": "^2.5|^3" + }, + "conflict": { + "symfony/console": "<7.2", + "symfony/event-dispatcher": "<6.4", + "symfony/event-dispatcher-contracts": "<2.5", + "symfony/framework-bundle": "<6.4", + "symfony/http-kernel": "<7.3", + "symfony/lock": "<7.4", + "symfony/serializer": "<6.4.32|>=7.3,<7.3.10|>=7.4,<7.4.4|>=8.0,<8.0.4" + }, + "require-dev": { + "psr/cache": "^1.0|^2.0|^3.0", + "symfony/console": "^7.2|^8.0", + "symfony/dependency-injection": "^6.4|^7.0|^8.0", + "symfony/event-dispatcher": "^6.4|^7.0|^8.0", + "symfony/http-kernel": "^7.3|^8.0", + "symfony/lock": "^7.4|^8.0", + "symfony/process": "^6.4|^7.0|^8.0", + "symfony/property-access": "^6.4|^7.0|^8.0", + "symfony/rate-limiter": "^6.4|^7.0|^8.0", + "symfony/routing": "^6.4|^7.0|^8.0", + "symfony/serializer": "^6.4.32|~7.3.10|^7.4.4|^8.0.4", + "symfony/service-contracts": "^2.5|^3", + "symfony/stopwatch": "^6.4|^7.0|^8.0", + "symfony/validator": "^6.4|^7.0|^8.0", + "symfony/var-dumper": "^6.4|^7.0|^8.0" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\Messenger\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Samuel Roze", + "email": "samuel.roze@gmail.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Helps applications send and receive messages to/from other applications or via message queues", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/messenger/tree/v7.4.4" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2026-01-08T14:50:10+00:00" + }, { "name": "symfony/polyfill-ctype", "version": "v1.33.0", @@ -1828,6 +2277,86 @@ ], "time": "2025-01-02T08:10:11+00:00" }, + { + "name": "symfony/polyfill-php83", + "version": "v1.33.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/polyfill-php83.git", + "reference": "17f6f9a6b1735c0f163024d959f700cfbc5155e5" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/polyfill-php83/zipball/17f6f9a6b1735c0f163024d959f700cfbc5155e5", + "reference": "17f6f9a6b1735c0f163024d959f700cfbc5155e5", + "shasum": "" + }, + "require": { + "php": ">=7.2" + }, + "type": "library", + "extra": { + "thanks": { + "url": "https://github.com/symfony/polyfill", + "name": "symfony/polyfill" + } + }, + "autoload": { + "files": [ + "bootstrap.php" + ], + "psr-4": { + "Symfony\\Polyfill\\Php83\\": "" + }, + "classmap": [ + "Resources/stubs" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Symfony polyfill backporting some PHP 8.3+ features to lower PHP versions", + "homepage": "https://symfony.com", + "keywords": [ + "compatibility", + "polyfill", + "portable", + "shim" + ], + "support": { + "source": "https://github.com/symfony/polyfill-php83/tree/v1.33.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2025-07-08T02:45:35+00:00" + }, { "name": "symfony/property-access", "version": "v7.2.3", @@ -2339,6 +2868,87 @@ ], "time": "2024-12-20T13:38:37+00:00" }, + { + "name": "symfony/var-exporter", + "version": "v7.4.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/var-exporter.git", + "reference": "03a60f169c79a28513a78c967316fbc8bf17816f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/var-exporter/zipball/03a60f169c79a28513a78c967316fbc8bf17816f", + "reference": "03a60f169c79a28513a78c967316fbc8bf17816f", + "shasum": "" + }, + "require": { + "php": ">=8.2", + "symfony/deprecation-contracts": "^2.5|^3" + }, + "require-dev": { + "symfony/property-access": "^6.4|^7.0|^8.0", + "symfony/serializer": "^6.4|^7.0|^8.0", + "symfony/var-dumper": "^6.4|^7.0|^8.0" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\VarExporter\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Allows exporting any serializable PHP data structure to plain PHP code", + "homepage": "https://symfony.com", + "keywords": [ + "clone", + "construct", + "export", + "hydrate", + "instantiate", + "lazy-loading", + "proxy", + "serialize" + ], + "support": { + "source": "https://github.com/symfony/var-exporter/tree/v7.4.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2025-09-11T10:15:23+00:00" + }, { "name": "webmozart/assert", "version": "1.11.0", diff --git a/src/Messenger/AsyncCommand.php b/src/Messenger/AsyncCommand.php new file mode 100644 index 0000000..9aad119 --- /dev/null +++ b/src/Messenger/AsyncCommand.php @@ -0,0 +1,8 @@ +messageBus = $messageBus; + } + + /** + * @template T + * @param Command $command + * @phpstan-return T + * @throws Throwable + * @noinspection PhpParameterNameChangedDuringInheritanceInspection + */ + public function handle(Command $command): mixed { + try { + return $this->handleAndReturn($command); + } catch (HandlerFailedException $e) { + [$e] = array_values($e->getWrappedExceptions()); + + throw $e; + } + } + + public function queue(AsyncCommand $command, string $transport): void { + $transportNamesStamp = new TransportNamesStamp($transport); + $this->messageBus->dispatch($command, [$transportNamesStamp]); + } + + public function delay(AsyncCommand $command, string $transport, Delay $delay): void { + $transportNamesStamp = new TransportNamesStamp($transport); + $delayStamp = new DelayStamp(delay: $delay->milliseconds); + $this->messageBus->dispatch($command, [$transportNamesStamp, $delayStamp]); + } +} diff --git a/src/Messenger/CommandHandler.php b/src/Messenger/CommandHandler.php new file mode 100644 index 0000000..a70879b --- /dev/null +++ b/src/Messenger/CommandHandler.php @@ -0,0 +1,17 @@ + + */ + public function __invoke(Command $command): mixed; +} diff --git a/src/Messenger/Delay.php b/src/Messenger/Delay.php new file mode 100644 index 0000000..eb2e29a --- /dev/null +++ b/src/Messenger/Delay.php @@ -0,0 +1,26 @@ + + */ + private array $handlers; + + /** + * @param array $handlers + */ + public function __construct(array $handlers) { + $this->handlers = collection($handlers)->indexBy(new HandlerToMessageMapping())->toArray(); + } + + public function getHandlers(Envelope $envelope): iterable { + $class = $envelope->getMessage()::class; + $handler = $this->handlers[$class] ?? null; + + if ($handler) { + yield new HandlerDescriptor($handler); + } + } +} diff --git a/src/Messenger/Kernel/CommandBusDependencies.php b/src/Messenger/Kernel/CommandBusDependencies.php new file mode 100644 index 0000000..406b2fd --- /dev/null +++ b/src/Messenger/Kernel/CommandBusDependencies.php @@ -0,0 +1,12 @@ +hasMethod('__invoke')) { + throw new RuntimeException( + sprintf( + 'Handler %s does not have an __invoke method', + $handler::class, + ), + ); + } + + $reflectionMethod = $reflectionObject->getMethod('__invoke'); + + if ($reflectionMethod->getNumberOfParameters() !== 1) { + throw new RuntimeException( + sprintf( + 'Handler %s::__invoke() is expected to have exactly one parameter, actual: %d', + $handler::class, + $reflectionMethod->getNumberOfParameters(), + ), + ); + } + + $type = $reflectionMethod->getParameters()[0]->getType(); + + return match (true) { + $type instanceof ReflectionNamedType => $type->getName(), + $type instanceof ReflectionUnionType => $this->handleUnionTypes($handler::class, ...$type->getTypes()), + default => throw new RuntimeException( + sprintf( + 'Handler %s::__invoke($message) is not properly typehinted', + $handler::class, + ), + ), + }; + } + + private function handleUnionTypes(string $class, ReflectionIntersectionType|ReflectionNamedType ...$types): string { + if (count($types) > 2) { + throw new RuntimeException( + sprintf( + 'Handler %s::invoke($message) has %d types in union. At most two are supported in the form: %s', + $class, + count($types), + 'RealMessageImpl | MessageMarkerInterface', + ), + ); + } + + foreach ($types as $type) { + if ($type instanceof ReflectionIntersectionType) { + throw new RuntimeException( + sprintf( + 'Handler %s::__invoke($message) cannot use an intersection typehint', + $class, + ), + ); + } + + if ($type->isBuiltin()) { + throw new RuntimeException( + sprintf( + 'Handler %s::__invoke($message) needs to typehint a class name', + $class, + ), + ); + } + + if (interface_exists($type->getName())) { + continue; + } + + return $type->getName(); + } + + throw new RuntimeException( + sprintf( + 'Handler %s::__invoke($message) needs to typehint a class name', + $class, + ), + ); + } +} diff --git a/src/Messenger/Kernel/MessengerServiceFactory.php b/src/Messenger/Kernel/MessengerServiceFactory.php new file mode 100644 index 0000000..7c04688 --- /dev/null +++ b/src/Messenger/Kernel/MessengerServiceFactory.php @@ -0,0 +1,130 @@ +autowire()->glob($this->commandPath); + yield from $queries = $builder->autowire()->glob($this->queryPath); + yield AutowiredHandlerLocator::class => autowire() + ->constructor([ + ...collection($commands)->keys()->map(static fn (string $className) => get($className)), + ...collection($queries)->keys()->map(static fn (string $className) => get($className)), + ]); + yield HandlersLocatorInterface::class => get(AutowiredHandlerLocator::class); + yield HandleMessageMiddleware::class => autowire()->constructor( + handlersLocator: get(HandlersLocatorInterface::class), + ); + // endregion + + // region utilities + yield CommandBusDependencies::Serializer->value => factory(Serializer::create(...)); + yield SerializerInterface::class => get(CommandBusDependencies::Serializer->value); + yield CommandBusDependencies::EventDispatcher->value => $this->eventDispatcher ?? new EventDispatcher(); + yield CommandBusDependencies::Logger->value => $this->logger ?? new NullLogger(); + yield CommandBusDependencies::CachePool->value => $this->cachePool ?? new ArrayAdapter(); + // endregion + + // region senders + yield TransportLocatorBuilder::class => $this->transports ?? TransportLocatorBuilder::empty(); + yield SendersLocator::class => function (TransportLocatorBuilder $config, ContainerInterface $container) { + return new SendersLocator( + sendersMap: [], + sendersLocator: $config->sendersLocator($container), + ); + }; + + yield SendersLocatorInterface::class => get(SendersLocator::class); + yield SendMessageMiddleware::class => autowire()->constructor( + sendersLocator: get(SendersLocatorInterface::class), + eventDispatcher: get(CommandBusDependencies::EventDispatcher->value), + )->method('setLogger', get(CommandBusDependencies::Logger->value)); + // endregion + + yield MessageBusInterface::class => create(MessageBus::class) + ->constructor([ + get(SendMessageMiddleware::class), + get(HandleMessageMiddleware::class), + ]); + yield CommandBus::class => autowire(); + yield QueryBus::class => autowire(); + + yield ConsumeMessagesCommand::class => function (TransportLocatorBuilder $config, ContainerInterface $container) { + /** @var LoggerInterface $logger */ + $logger = $container->get(CommandBusDependencies::Logger->value); + /** @var CacheItemPoolInterface $pool */ + $pool = $container->get(CommandBusDependencies::CachePool->value); + /** @var EventDispatcher $eventDispatcher */ + $eventDispatcher = $container->get(CommandBusDependencies::EventDispatcher->value); + $eventDispatcher->addSubscriber( + new StopWorkerOnRestartSignalListener( + cachePool: $pool, + logger: $logger, + ), + ); + + return new ConsumeMessagesCommand( + // there is only one bus, and we pass it as fallback + // the locator would never find anything, because it’s an empty container + routableBus: new RoutableMessageBus( + busLocator: new Container(), + fallbackBus: $container->get(MessageBusInterface::class), + ), + receiverLocator: $config->receiversLocator($container), + eventDispatcher: $eventDispatcher, + logger: $logger, + receiverNames: array_keys($config->receivers), + ); + }; + + yield StopWorkersCommand::class => autowire()->constructor( + restartSignalCachePool: get(CommandBusDependencies::CachePool->value), + ); + } +} diff --git a/src/Messenger/Kernel/TransportLocatorBuilder.php b/src/Messenger/Kernel/TransportLocatorBuilder.php new file mode 100644 index 0000000..51e12d2 --- /dev/null +++ b/src/Messenger/Kernel/TransportLocatorBuilder.php @@ -0,0 +1,57 @@ + $senders + * @param array $receivers + */ + private function __construct(public array $senders, public array $receivers) { + } + + /** + * @param class-string $sender + * @param class-string $receiver + */ + public function withTransport(string $name, string $sender, string $receiver): self { + return new self([$name => $sender] + $this->senders, [$name => $receiver] + $this->receivers); + } + + public function sendersLocator(ContainerInterface $container): Container { + return $this->createLocator($container, $this->senders); + } + + public function receiversLocator(ContainerInterface $container): Container { + return $this->createLocator($container, $this->receivers); + } + + /** + * @param array $map + */ + private function createLocator(ContainerInterface $container, array $map): Container { + return new Container( + // lazy copy the definitions into the service locator + map( + $map, + static fn (string $className) => fn () => $container->get($className), + ), + ); + } +} diff --git a/src/Messenger/Query.php b/src/Messenger/Query.php new file mode 100644 index 0000000..7a19e7a --- /dev/null +++ b/src/Messenger/Query.php @@ -0,0 +1,11 @@ +messageBus = $messageBus; + } + + /** + * @template T of mixed + * @param Query $query + * @return T + * @throws Throwable + */ + public function query(Query $query): mixed { + try { + return $this->handle($query); + } catch (HandlerFailedException $e) { + [$e] = array_values($e->getWrappedExceptions()); + + throw $e; + } + } +} diff --git a/src/Messenger/QueryHandler.php b/src/Messenger/QueryHandler.php new file mode 100644 index 0000000..c24ae35 --- /dev/null +++ b/src/Messenger/QueryHandler.php @@ -0,0 +1,17 @@ + + */ + public function __invoke(Query $query): mixed; +} diff --git a/tests/Messenger/HoldsState.php b/tests/Messenger/HoldsState.php new file mode 100644 index 0000000..879b81c --- /dev/null +++ b/tests/Messenger/HoldsState.php @@ -0,0 +1,17 @@ +value; + } + + public function setValue(?string $value): void { + $this->value = $value; + } +} diff --git a/tests/Messenger/MessengerTest.php b/tests/Messenger/MessengerTest.php new file mode 100644 index 0000000..3fcf067 --- /dev/null +++ b/tests/Messenger/MessengerTest.php @@ -0,0 +1,62 @@ +register( + new MessengerServiceFactory( + commandPath: 'src/*AsyncHandler.php', + queryPath: 'src/*QueryHandler.php', + transports: TransportLocatorBuilder::start() + ->withTransport( + name: $transportName, + sender: InMemoryTransport::class, + receiver: InMemoryTransport::class, + ), + ), + ) + ->add( + [ + InMemoryTransport::class => new InMemoryTransport(), + HoldsState::class => new HoldsState(), + ], + ) + ->build(); + + /** @var CommandBus $commandBus */ + $commandBus = $container->get(CommandBus::class); + /** @var QueryBus $queryBus */ + $queryBus = $container->get(QueryBus::class); + /** @var ConsumeMessagesCommand $consumeMessagesCommand */ + $consumeMessagesCommand = $container->get(ConsumeMessagesCommand::class); + + $some = bin2hex(random_bytes(16)); + $commandBus->queue(new SideEffectsCommand($some), $transportName); + + self::assertNull($queryBus->query(new StateQuery())); + + $consumeMessagesCommand->run(new ArrayInput(['--limit' => 1]), new BufferedOutput()); + + self::assertSame($some, $queryBus->query(new StateQuery())); + } +} diff --git a/tests/Resources/Messenger/composer.json b/tests/Resources/Messenger/composer.json new file mode 100644 index 0000000..836440f --- /dev/null +++ b/tests/Resources/Messenger/composer.json @@ -0,0 +1,7 @@ +{ + "autoload": { + "psr-4": { + "Acme\\": "src/" + } + } +} diff --git a/tests/Resources/Messenger/src/SideEffectsAsyncHandler.php b/tests/Resources/Messenger/src/SideEffectsAsyncHandler.php new file mode 100644 index 0000000..d49f8ce --- /dev/null +++ b/tests/Resources/Messenger/src/SideEffectsAsyncHandler.php @@ -0,0 +1,21 @@ + + */ +final readonly class SideEffectsAsyncHandler implements AsyncCommandHandler { + public function __construct(private HoldsState $state) { + } + + public function __invoke(SideEffectsCommand|AsyncCommand $command): void { + $this->state->setValue($command->value); + } +} diff --git a/tests/Resources/Messenger/src/SideEffectsCommand.php b/tests/Resources/Messenger/src/SideEffectsCommand.php new file mode 100644 index 0000000..58758c2 --- /dev/null +++ b/tests/Resources/Messenger/src/SideEffectsCommand.php @@ -0,0 +1,14 @@ + + */ +final readonly class StateQuery implements Query { +} diff --git a/tests/Resources/Messenger/src/StateQueryHandler.php b/tests/Resources/Messenger/src/StateQueryHandler.php new file mode 100644 index 0000000..ca3e0a1 --- /dev/null +++ b/tests/Resources/Messenger/src/StateQueryHandler.php @@ -0,0 +1,21 @@ + + */ +final readonly class StateQueryHandler implements QueryHandler { + public function __construct(private HoldsState $state) { + } + + public function __invoke(StateQuery|Query $query): mixed { + return $this->state->getValue(); + } +}