Skip to content

Commit 50090d1

Browse files
committed
Add comprehensive tests for EventBus and LetsEncrypt modules
- Implemented multi-engine routing tests to verify correct engine selection based on routing rules. - Added tests to ensure publishing before starting the EventBus returns an error. - Created additional tests for Redis EventBus to validate behavior when starting, publishing, and subscribing before initialization. - Developed statistics tests to confirm correct accumulation of event delivery counts per engine. - Introduced a noopLogger for testing purposes to avoid logging during tests. - Added topic prefix filter tests to ensure filtering works as expected. - Enhanced LetsEncrypt module with tests for configuration validation, certificate handling, and error paths. - Implemented tests for DNS provider configurations to cover various error scenarios. - Added tests for certificate renewal and revocation processes, ensuring proper error handling and state management. - Created storage helper tests to validate certificate storage and expiration checks.
1 parent 96ac97b commit 50090d1

32 files changed

Lines changed: 2322 additions & 14 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
# Output of the go coverage tool, specifically when used with LiteIDE
2727
*.out
2828

29+
# coverage files
30+
*.cov
31+
2932
# Dependency directories (remove the comment below to include it)
3033
# vendor/
3134

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
// Test basic publish/subscribe lifecycle using memory engine ensuring message receipt and stats increments.
10+
func TestEventBusPublishSubscribeBasic(t *testing.T) {
11+
m := NewModule().(*EventBusModule)
12+
app := newMockApp()
13+
// Register default config section as RegisterConfig would
14+
if err := m.RegisterConfig(app); err != nil {
15+
t.Fatalf("register config: %v", err)
16+
}
17+
if err := m.Init(app); err != nil {
18+
t.Fatalf("init: %v", err)
19+
}
20+
if err := m.Start(context.Background()); err != nil {
21+
t.Fatalf("start: %v", err)
22+
}
23+
defer m.Stop(context.Background())
24+
25+
received := make(chan struct{}, 1)
26+
_, err := m.Subscribe(context.Background(), "test.topic", func(ctx context.Context, e Event) error {
27+
if e.Topic != "test.topic" {
28+
t.Errorf("unexpected topic %s", e.Topic)
29+
}
30+
received <- struct{}{}
31+
return nil
32+
})
33+
if err != nil {
34+
t.Fatalf("subscribe: %v", err)
35+
}
36+
37+
if err := m.Publish(context.Background(), "test.topic", "payload"); err != nil {
38+
t.Fatalf("publish: %v", err)
39+
}
40+
41+
select {
42+
case <-received:
43+
case <-time.After(2 * time.Second):
44+
t.Fatal("timeout waiting for event delivery")
45+
}
46+
47+
del, _ := m.Stats()
48+
if del == 0 {
49+
t.Fatalf("expected delivered stats > 0")
50+
}
51+
}
52+
53+
// Test unsubscribe removes subscription and no further deliveries occur.
54+
func TestEventBusUnsubscribe(t *testing.T) {
55+
m := NewModule().(*EventBusModule)
56+
app := newMockApp()
57+
if err := m.RegisterConfig(app); err != nil {
58+
t.Fatalf("register config: %v", err)
59+
}
60+
if err := m.Init(app); err != nil {
61+
t.Fatalf("init: %v", err)
62+
}
63+
if err := m.Start(context.Background()); err != nil {
64+
t.Fatalf("start: %v", err)
65+
}
66+
defer m.Stop(context.Background())
67+
68+
count := 0
69+
sub, err := m.Subscribe(context.Background(), "once.topic", func(ctx context.Context, e Event) error { count++; return nil })
70+
if err != nil {
71+
t.Fatalf("subscribe: %v", err)
72+
}
73+
74+
if err := m.Publish(context.Background(), "once.topic", 1); err != nil {
75+
t.Fatalf("publish1: %v", err)
76+
}
77+
time.Sleep(50 * time.Millisecond)
78+
if count != 1 {
79+
t.Fatalf("expected 1 delivery got %d", count)
80+
}
81+
82+
if err := m.Unsubscribe(context.Background(), sub); err != nil {
83+
t.Fatalf("unsubscribe: %v", err)
84+
}
85+
if err := m.Publish(context.Background(), "once.topic", 2); err != nil {
86+
t.Fatalf("publish2: %v", err)
87+
}
88+
time.Sleep(50 * time.Millisecond)
89+
if count != 1 {
90+
t.Fatalf("expected no additional deliveries after unsubscribe")
91+
}
92+
}
93+
94+
// Test async subscription processes events without blocking publisher.
95+
func TestEventBusAsyncSubscription(t *testing.T) {
96+
m := NewModule().(*EventBusModule)
97+
app := newMockApp()
98+
if err := m.RegisterConfig(app); err != nil {
99+
t.Fatalf("register config: %v", err)
100+
}
101+
if err := m.Init(app); err != nil {
102+
t.Fatalf("init: %v", err)
103+
}
104+
if err := m.Start(context.Background()); err != nil {
105+
t.Fatalf("start: %v", err)
106+
}
107+
defer m.Stop(context.Background())
108+
109+
received := make(chan struct{}, 1)
110+
_, err := m.SubscribeAsync(context.Background(), "async.topic", func(ctx context.Context, e Event) error { received <- struct{}{}; return nil })
111+
if err != nil {
112+
t.Fatalf("subscribe async: %v", err)
113+
}
114+
115+
start := time.Now()
116+
if err := m.Publish(context.Background(), "async.topic", 123); err != nil {
117+
t.Fatalf("publish: %v", err)
118+
}
119+
// We expect Publish to return quickly (well under 100ms) even if handler not yet executed.
120+
if time.Since(start) > 200*time.Millisecond {
121+
t.Fatalf("publish blocked unexpectedly long")
122+
}
123+
124+
select {
125+
case <-received:
126+
case <-time.After(2 * time.Second):
127+
t.Fatal("timeout waiting for async delivery")
128+
}
129+
}
130+
131+
// Removed local mockApp (reuse the one defined in module_test.go)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
// TestCancelIdempotency ensures calling Cancel multiple times on subscriptions is safe.
10+
func TestCancelIdempotency(t *testing.T) {
11+
// Memory event bus setup
12+
memCfg := &EventBusConfig{MaxEventQueueSize: 10, DefaultEventBufferSize: 1, WorkerCount: 1, RetentionDays: 1}
13+
mem := NewMemoryEventBus(memCfg)
14+
if err := mem.Start(context.Background()); err != nil {
15+
t.Fatalf("start memory: %v", err)
16+
}
17+
sub, err := mem.Subscribe(context.Background(), "idempotent.topic", func(ctx context.Context, e Event) error { return nil })
18+
if err != nil {
19+
t.Fatalf("subscribe mem: %v", err)
20+
}
21+
if err := sub.Cancel(); err != nil {
22+
t.Fatalf("first cancel mem: %v", err)
23+
}
24+
// Second cancel should be no-op
25+
if err := sub.Cancel(); err != nil {
26+
t.Fatalf("second cancel mem: %v", err)
27+
}
28+
29+
// Custom memory event bus setup
30+
busRaw, err := NewCustomMemoryEventBus(map[string]interface{}{"enableMetrics": false, "defaultEventBufferSize": 1})
31+
if err != nil {
32+
t.Fatalf("create custom: %v", err)
33+
}
34+
cust := busRaw.(*CustomMemoryEventBus)
35+
if err := cust.Start(context.Background()); err != nil {
36+
t.Fatalf("start custom: %v", err)
37+
}
38+
csub, err := cust.Subscribe(context.Background(), "idempotent.custom", func(ctx context.Context, e Event) error { return nil })
39+
if err != nil {
40+
t.Fatalf("subscribe custom: %v", err)
41+
}
42+
if err := csub.Cancel(); err != nil {
43+
t.Fatalf("first cancel custom: %v", err)
44+
}
45+
if err := csub.Cancel(); err != nil {
46+
t.Fatalf("second cancel custom: %v", err)
47+
}
48+
49+
// Publish after cancellation should not trigger handler (cannot easily assert directly without races; rely on no panic).
50+
_ = mem.Publish(context.Background(), Event{Topic: "idempotent.topic"})
51+
_ = cust.Publish(context.Background(), Event{Topic: "idempotent.custom"})
52+
time.Sleep(10 * time.Millisecond)
53+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
)
9+
10+
// TestCustomMemoryErrorPaths covers Publish/Subscribe before Start and nil handler validation.
11+
func TestCustomMemoryErrorPaths(t *testing.T) {
12+
ctx := context.Background()
13+
ebRaw, err := NewCustomMemoryEventBus(map[string]interface{}{"enableMetrics": false})
14+
if err != nil {
15+
t.Fatalf("new bus: %v", err)
16+
}
17+
eb := ebRaw.(*CustomMemoryEventBus)
18+
19+
// Publish before Start
20+
if err := eb.Publish(ctx, Event{Topic: "x"}); !errors.Is(err, ErrEventBusNotStarted) {
21+
t.Fatalf("expected ErrEventBusNotStarted publish, got %v", err)
22+
}
23+
// Subscribe before Start
24+
if _, err := eb.Subscribe(ctx, "x", func(context.Context, Event) error { return nil }); !errors.Is(err, ErrEventBusNotStarted) {
25+
t.Fatalf("expected ErrEventBusNotStarted subscribe, got %v", err)
26+
}
27+
if _, err := eb.SubscribeAsync(ctx, "x", func(context.Context, Event) error { return nil }); !errors.Is(err, ErrEventBusNotStarted) {
28+
t.Fatalf("expected ErrEventBusNotStarted subscribe async, got %v", err)
29+
}
30+
31+
// Start now
32+
if err := eb.Start(ctx); err != nil {
33+
t.Fatalf("start: %v", err)
34+
}
35+
36+
// Nil handler
37+
if _, err := eb.Subscribe(ctx, "y", nil); !errors.Is(err, ErrEventHandlerNil) {
38+
t.Fatalf("expected ErrEventHandlerNil got %v", err)
39+
}
40+
41+
// Basic successful subscription after start
42+
sub, err := eb.Subscribe(ctx, "y", func(context.Context, Event) error { return nil })
43+
if err != nil {
44+
t.Fatalf("subscribe after start: %v", err)
45+
}
46+
if sub.Topic() != "y" {
47+
t.Fatalf("unexpected topic %s", sub.Topic())
48+
}
49+
50+
// Publish should succeed now
51+
if err := eb.Publish(ctx, Event{Topic: "y"}); err != nil {
52+
t.Fatalf("publish after start: %v", err)
53+
}
54+
55+
// Allow processing
56+
time.Sleep(20 * time.Millisecond)
57+
_ = eb.Stop(ctx)
58+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
// TestCustomMemoryFilterReject ensures events not matching TopicPrefixFilter are skipped without metrics increment.
10+
func TestCustomMemoryFilterReject(t *testing.T) {
11+
busRaw, err := NewCustomMemoryEventBus(map[string]interface{}{
12+
"enableMetrics": true,
13+
"defaultEventBufferSize": 1,
14+
})
15+
if err != nil {
16+
t.Fatalf("create bus: %v", err)
17+
}
18+
bus := busRaw.(*CustomMemoryEventBus)
19+
20+
// Inject a filter allowing only topics starting with "allow.".
21+
bus.eventFilters = []EventFilter{&TopicPrefixFilter{AllowedPrefixes: []string{"allow."}, name: "topicPrefix"}}
22+
if err := bus.Start(context.Background()); err != nil {
23+
t.Fatalf("start: %v", err)
24+
}
25+
26+
// Subscribe to both allowed and denied topics; only allowed should receive events.
27+
allowedCount := int64(0)
28+
deniedCount := int64(0)
29+
_, err = bus.Subscribe(context.Background(), "allow.test", func(ctx context.Context, e Event) error { allowedCount++; return nil })
30+
if err != nil {
31+
t.Fatalf("subscribe allow: %v", err)
32+
}
33+
_, err = bus.Subscribe(context.Background(), "deny.test", func(ctx context.Context, e Event) error { deniedCount++; return nil })
34+
if err != nil {
35+
t.Fatalf("subscribe deny: %v", err)
36+
}
37+
38+
// Publish one denied event and one allowed; denied should be filtered out early.
39+
_ = bus.Publish(context.Background(), Event{Topic: "deny.test"})
40+
_ = bus.Publish(context.Background(), Event{Topic: "allow.test"})
41+
42+
// Wait briefly for allowed delivery.
43+
time.Sleep(20 * time.Millisecond)
44+
45+
if allowedCount != 1 {
46+
t.Fatalf("expected allowedCount=1 got %d", allowedCount)
47+
}
48+
if deniedCount != 0 {
49+
t.Fatalf("expected deniedCount=0 got %d", deniedCount)
50+
}
51+
52+
metrics := bus.GetMetrics()
53+
if metrics.TotalEvents != 1 {
54+
t.Fatalf("expected metrics.TotalEvents=1 got %d", metrics.TotalEvents)
55+
}
56+
if metrics.EventsPerTopic["deny.test"] != 0 {
57+
t.Fatalf("deny.test should not be counted")
58+
}
59+
if metrics.EventsPerTopic["allow.test"] != 1 {
60+
t.Fatalf("allow.test metrics missing")
61+
}
62+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
// foreignSub implements Subscription but is not the concrete type expected by CustomMemoryEventBus.
9+
type foreignSub struct{}
10+
11+
func (f foreignSub) Topic() string { return "valid.topic" }
12+
func (f foreignSub) ID() string { return "foreign" }
13+
func (f foreignSub) IsAsync() bool { return false }
14+
func (f foreignSub) Cancel() error { return nil }
15+
16+
// TestCustomMemoryInvalidUnsubscribe exercises the ErrInvalidSubscriptionType branch.
17+
func TestCustomMemoryInvalidUnsubscribe(t *testing.T) {
18+
busRaw, err := NewCustomMemoryEventBus(map[string]interface{}{"enableMetrics": false})
19+
if err != nil {
20+
t.Fatalf("create bus: %v", err)
21+
}
22+
bus := busRaw.(*CustomMemoryEventBus)
23+
if err := bus.Start(context.Background()); err != nil {
24+
t.Fatalf("start: %v", err)
25+
}
26+
27+
// Create a valid subscription to ensure bus started logic executed (not strictly required for invalid path).
28+
sub, err := bus.Subscribe(context.Background(), "valid.topic", func(ctx context.Context, e Event) error { return nil })
29+
if err != nil {
30+
t.Fatalf("subscribe: %v", err)
31+
}
32+
if sub == nil {
33+
t.Fatalf("expected non-nil subscription")
34+
}
35+
36+
if err := bus.Unsubscribe(context.Background(), foreignSub{}); err == nil || err != ErrInvalidSubscriptionType {
37+
t.Fatalf("expected ErrInvalidSubscriptionType, got %v", err)
38+
}
39+
}

0 commit comments

Comments
 (0)