|
4 | 4 | "context" |
5 | 5 | "testing" |
6 | 6 | "time" |
| 7 | + "sync/atomic" |
7 | 8 | ) |
8 | 9 |
|
9 | 10 | // Test basic publish/subscribe lifecycle using memory engine ensuring message receipt and stats increments. |
@@ -129,3 +130,101 @@ func TestEventBusAsyncSubscription(t *testing.T) { |
129 | 130 | } |
130 | 131 |
|
131 | 132 | // Removed local mockApp (reuse the one defined in module_test.go) |
| 133 | + |
| 134 | +// TestMemoryEventBus_RotationFairness ensures subscriber ordering rotates when enabled. |
| 135 | +func TestMemoryEventBus_RotationFairness(t *testing.T) { |
| 136 | + ctx := context.Background() |
| 137 | + cfg := &EventBusConfig{WorkerCount: 1, DefaultEventBufferSize: 1, RotateSubscriberOrder: true, DeliveryMode: "drop"} |
| 138 | + bus := NewMemoryEventBus(cfg) |
| 139 | + if err := bus.Start(ctx); err != nil { t.Fatalf("start: %v", err) } |
| 140 | + defer bus.Stop(ctx) |
| 141 | + |
| 142 | + orderCh := make(chan string, 16) |
| 143 | + mkHandler := func(id string) EventHandler { return func(ctx context.Context, evt Event) error { orderCh <- id; return nil } } |
| 144 | + for i := 0; i < 3; i++ { |
| 145 | + _, err := bus.Subscribe(ctx, "rot.topic", mkHandler(string(rune('A'+i)))) |
| 146 | + if err != nil { t.Fatalf("subscribe %d: %v", i, err) } |
| 147 | + } |
| 148 | + |
| 149 | + firsts := make(map[string]int) |
| 150 | + for i := 0; i < 9; i++ { |
| 151 | + _ = bus.Publish(ctx, Event{Topic: "rot.topic"}) |
| 152 | + select { |
| 153 | + case id := <-orderCh: |
| 154 | + firsts[id]++ |
| 155 | + case <-time.After(500 * time.Millisecond): |
| 156 | + t.Fatalf("timeout waiting for first handler") |
| 157 | + } |
| 158 | + // Drain remaining handlers for this publish (best-effort) |
| 159 | + for j := 0; j < 2; j++ { |
| 160 | + select { case <-orderCh: default: } |
| 161 | + } |
| 162 | + } |
| 163 | + if len(firsts) < 2 { t.Fatalf("expected rotation to vary first subscriber, got %v", firsts) } |
| 164 | +} |
| 165 | + |
| 166 | +// TestMemoryEventBus_PublishTimeoutImmediateDrop covers timeout mode with zero timeout resulting in immediate drop when subscriber buffer full. |
| 167 | +func TestMemoryEventBus_PublishTimeoutImmediateDrop(t *testing.T) { |
| 168 | + ctx := context.Background() |
| 169 | + cfg := &EventBusConfig{WorkerCount: 1, DefaultEventBufferSize: 1, DeliveryMode: "timeout", PublishBlockTimeout: 0} |
| 170 | + bus := NewMemoryEventBus(cfg) |
| 171 | + if err := bus.Start(ctx); err != nil { t.Fatalf("start: %v", err) } |
| 172 | + defer bus.Stop(ctx) |
| 173 | + |
| 174 | + // Manually construct a subscription with a full channel (no handler goroutine) |
| 175 | + sub := &memorySubscription{ |
| 176 | + id: "manual", |
| 177 | + topic: "t", |
| 178 | + handler: func(ctx context.Context, e Event) error { return nil }, |
| 179 | + isAsync: false, |
| 180 | + eventCh: make(chan Event, 1), |
| 181 | + done: make(chan struct{}), |
| 182 | + finished: make(chan struct{}), |
| 183 | + } |
| 184 | + // Fill the channel to force publish path into drop branch |
| 185 | + sub.eventCh <- Event{Topic: "t"} |
| 186 | + bus.topicMutex.Lock() |
| 187 | + bus.subscriptions["t"] = map[string]*memorySubscription{sub.id: sub} |
| 188 | + bus.topicMutex.Unlock() |
| 189 | + |
| 190 | + before := atomic.LoadUint64(&bus.droppedCount) |
| 191 | + _ = bus.Publish(ctx, Event{Topic: "t"}) |
| 192 | + after := atomic.LoadUint64(&bus.droppedCount) |
| 193 | + if after != before+1 { t.Fatalf("expected exactly one drop, before=%d after=%d", before, after) } |
| 194 | +} |
| 195 | + |
| 196 | +// TestMemoryEventBus_AsyncWorkerSaturation ensures async drops when worker count is zero (no workers to consume tasks). |
| 197 | +func TestMemoryEventBus_AsyncWorkerSaturation(t *testing.T) { |
| 198 | + ctx := context.Background() |
| 199 | + cfg := &EventBusConfig{WorkerCount: 0, DefaultEventBufferSize: 1} |
| 200 | + bus := NewMemoryEventBus(cfg) |
| 201 | + if err := bus.Start(ctx); err != nil { t.Fatalf("start: %v", err) } |
| 202 | + defer bus.Stop(ctx) |
| 203 | + |
| 204 | + _, err := bus.SubscribeAsync(ctx, "a", func(ctx context.Context, e Event) error { return nil }) |
| 205 | + if err != nil { t.Fatalf("subscribe async: %v", err) } |
| 206 | + before := atomic.LoadUint64(&bus.droppedCount) |
| 207 | + for i := 0; i < 5; i++ { _ = bus.Publish(ctx, Event{Topic: "a"}) } |
| 208 | + after := atomic.LoadUint64(&bus.droppedCount) |
| 209 | + if after <= before { t.Fatalf("expected drops due to saturated worker pool, before=%d after=%d", before, after) } |
| 210 | +} |
| 211 | + |
| 212 | +// TestMemoryEventBus_RetentionCleanup verifies old events pruned. |
| 213 | +func TestMemoryEventBus_RetentionCleanup(t *testing.T) { |
| 214 | + ctx := context.Background() |
| 215 | + cfg := &EventBusConfig{WorkerCount: 1, DefaultEventBufferSize: 1, RetentionDays: 1} |
| 216 | + bus := NewMemoryEventBus(cfg) |
| 217 | + if err := bus.Start(ctx); err != nil { t.Fatalf("start: %v", err) } |
| 218 | + defer bus.Stop(ctx) |
| 219 | + |
| 220 | + old := Event{Topic: "old", CreatedAt: time.Now().AddDate(0,0,-2)} |
| 221 | + recent := Event{Topic: "recent", CreatedAt: time.Now()} |
| 222 | + bus.storeEventHistory(old) |
| 223 | + bus.storeEventHistory(recent) |
| 224 | + bus.cleanupOldEvents() |
| 225 | + bus.historyMutex.RLock() |
| 226 | + defer bus.historyMutex.RUnlock() |
| 227 | + for _, evs := range bus.eventHistory { |
| 228 | + for _, e := range evs { if e.Topic == "old" { t.Fatalf("old event not cleaned up") } } |
| 229 | + } |
| 230 | +} |
0 commit comments