Skip to content

[Possibly] Add an event system or messagebus #9

@YanGusik

Description

@YanGusik

Overview

Distributed event-driven job system with pluggable transports and language-agnostic event contracts.

The system is designed to work across multiple runtimes (PHP, Go, etc.) using a shared event envelope and interchangeable transport layers (Redis, RabbitMQ, Kafka, etc.).

It supports both simple event dispatching and batch/chain job execution with lifecycle events.


Event System

Events are identified by a contract name. This can be either a string identifier or a PHP class (as a convenience layer in PHP-only contexts).

PHP

eventDispatch->event('order_created', [
    'order_id' => 123,
]);

eventDispatch->event(UserCreated::class, [
    'user_id' => 1,
]);

Go example

message := map[string]any{
    "event": "go_message",
    "payload": map[string]any{
        "text": "hello",
    },
}

data, _ := json.Marshal(message)

rdb.LPush(ctx, "events", data)

Event Envelope

All events are transported using a unified structure:

{
  "event": "order_created",
  "payload": {
    "order_id": 123
  },
  "meta": {
    "event_id": "uuid",
    "timestamp": "2026-01-01T00:00:00Z",
    "source": "php|go"
  }
}

Event Listeners (PHP)

Listeners can be registered manually or via attribute-based discovery.

Attribute-based

#[ThrunEventListener('go_message')]
final class GoMessageHandler
{
    public function __invoke(array $payload): void
    {
        // handle event
    }
}

Manual subscription

eventDispatch->subscribe('order_created', OrderCreatedHandler::class);

Auto-discovery (concept)

PHP may support auto-discovery of event listeners via attributes or mapping registry.

Example:

#[ThrunEventListener(UserCreated::class)]
final class UserCreatedHandler
{
    public function __invoke(array $payload): void
    {
    }
}

or

#[ThrunEventListener('order_created')]
final class OrderCreatedHandler
{
    public function __invoke(array $payload): void
    {
    }
}

Transport Layer

The system is transport-agnostic via interfaces.

Supported or planned transports:

  • Redis (List / Streams)
  • RabbitMQ
  • Kafka
  • Custom implementations

The transport layer is responsible only for message delivery, not interpretation.


Batch / Chain Job System

Batch execution allows sequential job orchestration without closures or serialized callbacks.

Basic usage

$batchId = $bus->batch([
    new CreatePayment(),
    new CreateOrder(),
    new SendOrderEmail(),
])
->then('order.success')
->catch('order.failed')
->finally('order.finished')
->progress('order.progress');

Batch Event Lifecycle

Batch execution emits events instead of relying on closures.

Example emitted events:

{
  "event": "order.progress",
  "payload": {
    "batch_id": "uuid",
    "completed": 2,
    "total": 5,
    "percent": 40
  }
}

Design Goals

  • Language-agnostic event contracts
  • Pluggable transport layer
  • No closure serialization
  • Predictable execution model
  • Event-driven job orchestration
  • PHP-friendly developer experience (attributes + auto-discovery)

Notes

  • ::class usage is optional syntactic sugar for PHP-only environments
  • Internally, events should resolve to a stable contract identifier
  • Distributed systems should rely on string-based event names
  • PHP class-based events are a convenience layer, not a transport contract

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions