Skip to content

Commit d6d95a0

Browse files
committed
tests(eventbus,scheduler): add edge-case coverage to lift buffer over threshold
1 parent 1b04e37 commit d6d95a0

5 files changed

Lines changed: 378 additions & 0 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
)
8+
9+
// bogusSub implements Subscription but is not a *memorySubscription to trigger type error.
10+
type bogusSub struct{}
11+
12+
func (b bogusSub) Topic() string { return "t" }
13+
func (b bogusSub) ID() string { return "id" }
14+
func (b bogusSub) IsAsync() bool { return false }
15+
func (b bogusSub) Cancel() error { return nil }
16+
17+
// TestMemoryEventBusEdgeCases covers small edge branches not yet exercised to
18+
// push overall coverage safely above threshold.
19+
func TestMemoryEventBusEdgeCases(t *testing.T) {
20+
cfg := &EventBusConfig{Engine: "memory", MaxEventQueueSize: 5, DefaultEventBufferSize: 1, WorkerCount: 1, RetentionDays: 1}
21+
if err := cfg.ValidateConfig(); err != nil {
22+
t.Fatalf("validate: %v", err)
23+
}
24+
router, err := NewEngineRouter(cfg)
25+
if err != nil {
26+
t.Fatalf("router: %v", err)
27+
}
28+
if err := router.Start(context.Background()); err != nil {
29+
t.Fatalf("start: %v", err)
30+
}
31+
32+
// 1. Publish to topic with no subscribers (early return path)
33+
if err := router.Publish(context.Background(), Event{Topic: "no.subscribers"}); err != nil {
34+
t.Fatalf("publish no subs: %v", err)
35+
}
36+
37+
// Find memory engine instance (only engine configured here)
38+
var mem *MemoryEventBus
39+
for _, eng := range router.engines { // access internal map within same package
40+
if m, ok := eng.(*MemoryEventBus); ok {
41+
mem = m
42+
break
43+
}
44+
}
45+
if mem == nil {
46+
t.Fatalf("expected memory engine present")
47+
}
48+
49+
// 2. Subscribe with nil handler triggers ErrEventHandlerNil
50+
if _, err := mem.Subscribe(context.Background(), "x", nil); !errors.Is(err, ErrEventHandlerNil) {
51+
if err == nil {
52+
// Should never be nil
53+
t.Fatalf("expected error ErrEventHandlerNil, got nil")
54+
}
55+
t.Fatalf("expected ErrEventHandlerNil, got %v", err)
56+
}
57+
58+
// 3. Unsubscribe invalid subscription type -> ErrInvalidSubscriptionType
59+
if err := mem.Unsubscribe(context.Background(), bogusSub{}); !errors.Is(err, ErrInvalidSubscriptionType) {
60+
t.Fatalf("expected ErrInvalidSubscriptionType, got %v", err)
61+
}
62+
63+
// 4. Stats after Stop should stay stable and not panic
64+
delBefore, dropBefore := mem.Stats()
65+
if err := mem.Stop(context.Background()); err != nil {
66+
t.Fatalf("stop: %v", err)
67+
}
68+
delAfter, dropAfter := mem.Stats()
69+
if delAfter != delBefore || dropAfter != dropBefore {
70+
t.Fatalf("stats changed after stop")
71+
}
72+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
// TestMemoryPublishRotationAndDrops exercises:
10+
// 1. RotateSubscriberOrder branch in memory.Publish (ensures rotation logic executes)
11+
// 2. Async worker pool saturation drop path (queueEventHandler default case increments droppedCount)
12+
// 3. DeliveryMode "timeout" with zero PublishBlockTimeout immediate drop branch
13+
// 4. Module level GetRouter / Stats / PerEngineStats accessors (light touch)
14+
func TestMemoryPublishRotationAndDrops(t *testing.T) {
15+
cfg := &EventBusConfig{
16+
Engine: "memory",
17+
WorkerCount: 1,
18+
DefaultEventBufferSize: 1,
19+
MaxEventQueueSize: 10,
20+
RetentionDays: 1,
21+
RotateSubscriberOrder: true,
22+
DeliveryMode: "timeout", // exercise timeout mode with zero timeout
23+
PublishBlockTimeout: 0, // immediate drop for full buffers
24+
}
25+
if err := cfg.ValidateConfig(); err != nil { t.Fatalf("validate: %v", err) }
26+
27+
router, err := NewEngineRouter(cfg)
28+
if err != nil { t.Fatalf("router: %v", err) }
29+
if err := router.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) }
30+
31+
// Extract memory engine
32+
var mem *MemoryEventBus
33+
for _, eng := range router.engines { if m, ok := eng.(*MemoryEventBus); ok { mem = m; break } }
34+
if mem == nil { t.Fatalf("memory engine missing") }
35+
36+
// Create multiple async subscriptions so rotation has >1 subscriber list.
37+
ctx := context.Background()
38+
for i := 0; i < 3; i++ { // 3 subs ensures rotation slice logic triggers when >1
39+
_, err := mem.SubscribeAsync(ctx, "rotate.topic", func(ctx context.Context, e Event) error { time.Sleep(5 * time.Millisecond); return nil })
40+
if err != nil { t.Fatalf("subscribe async %d: %v", i, err) }
41+
}
42+
43+
// Also create a synchronous subscriber with tiny buffer to force timeout-mode drops when saturated.
44+
_, err = mem.Subscribe(ctx, "rotate.topic", func(ctx context.Context, e Event) error { time.Sleep(2 * time.Millisecond); return nil })
45+
if err != nil { t.Fatalf("sync subscribe: %v", err) }
46+
47+
// Fire a burst of events; limited worker pool + small buffers -> some drops.
48+
for i := 0; i < 50; i++ { // ample attempts to cause rotation & drops
49+
_ = mem.Publish(ctx, Event{Topic: "rotate.topic"})
50+
}
51+
52+
// Allow processing/draining
53+
time.Sleep(100 * time.Millisecond)
54+
55+
delivered, dropped := mem.Stats()
56+
if delivered == 0 { t.Fatalf("expected some delivered events (rotation path), got 0") }
57+
if dropped == 0 { t.Fatalf("expected some dropped events from timeout + saturation, got 0") }
58+
59+
// Touch module-level accessors via a lightweight module wrapper to bump coverage on module.go convenience methods.
60+
mod := &EventBusModule{router: router}
61+
if mod.GetRouter() == nil { t.Fatalf("expected router from module accessor") }
62+
td, _ := mod.Stats()
63+
if td == 0 { t.Fatalf("expected non-zero delivered via module stats") }
64+
per := mod.PerEngineStats()
65+
if len(per) == 0 { t.Fatalf("expected per-engine stats via module accessor") }
66+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/GoCodeAlone/modular"
8+
)
9+
10+
// TestModuleStatsBeforeInit ensures Stats/PerEngineStats fast-paths when router is nil.
11+
func TestModuleStatsBeforeInit(t *testing.T) {
12+
m := &EventBusModule{}
13+
d, r := m.Stats()
14+
if d != 0 || r != 0 {
15+
t.Fatalf("expected zero stats prior to init, got delivered=%d dropped=%d", d, r)
16+
}
17+
per := m.PerEngineStats()
18+
if len(per) != 0 {
19+
t.Fatalf("expected empty per-engine stats prior to init, got %v", per)
20+
}
21+
}
22+
23+
// TestModuleEmitEventNoSubject covers EmitEvent error branch when no subject registered.
24+
func TestModuleEmitEventNoSubject(t *testing.T) {
25+
m := &EventBusModule{logger: noopLogger{}}
26+
ev := modular.NewCloudEvent("com.modular.test.event", "test-source", map[string]interface{}{"k": "v"}, nil)
27+
if err := m.EmitEvent(context.Background(), ev); err == nil {
28+
t.Fatalf("expected ErrNoSubjectForEventEmission when emitting without subject")
29+
}
30+
}
31+
32+
// TestModuleStartStopIdempotency exercises Start/Stop idempotent branches directly.
33+
func TestModuleStartStopIdempotency(t *testing.T) {
34+
cfg := &EventBusConfig{Engine: "memory", WorkerCount: 1, DefaultEventBufferSize: 1, MaxEventQueueSize: 10, RetentionDays: 1}
35+
if err := cfg.ValidateConfig(); err != nil { t.Fatalf("validate: %v", err) }
36+
37+
router, err := NewEngineRouter(cfg)
38+
if err != nil { t.Fatalf("router: %v", err) }
39+
40+
m := &EventBusModule{config: cfg, router: router, logger: noopLogger{}}
41+
42+
// First start
43+
if err := m.Start(context.Background()); err != nil { t.Fatalf("first start: %v", err) }
44+
// Second start should be idempotent (no error)
45+
if err := m.Start(context.Background()); err != nil { t.Fatalf("second start (idempotent) unexpected error: %v", err) }
46+
47+
// First stop
48+
if err := m.Stop(context.Background()); err != nil { t.Fatalf("first stop: %v", err) }
49+
// Second stop should be idempotent (no error)
50+
if err := m.Stop(context.Background()); err != nil { t.Fatalf("second stop (idempotent) unexpected error: %v", err) }
51+
}
52+
53+
// TestModulePublishBeforeStart validates error path when publishing before engines started.
54+
func TestModulePublishBeforeStart(t *testing.T) {
55+
cfg := &EventBusConfig{Engine: "memory", WorkerCount: 1, DefaultEventBufferSize: 1, MaxEventQueueSize: 10, RetentionDays: 1}
56+
if err := cfg.ValidateConfig(); err != nil { t.Fatalf("validate: %v", err) }
57+
router, err := NewEngineRouter(cfg)
58+
if err != nil { t.Fatalf("router: %v", err) }
59+
m := &EventBusModule{config: cfg, router: router, logger: noopLogger{}}
60+
// Publish before Start -> underlying memory engine not started -> ErrEventBusNotStarted wrapped.
61+
if err := m.Publish(context.Background(), "pre.start.topic", "payload"); err == nil {
62+
t.Fatalf("expected error publishing before start")
63+
}
64+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package eventbus
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
// TestMemorySubscriptionLifecycle covers double cancel and second unsubscribe no-op behavior for memory engine.
10+
func TestMemorySubscriptionLifecycle(t *testing.T) {
11+
cfg := &EventBusConfig{Engine: "memory", WorkerCount: 1, DefaultEventBufferSize: 2, MaxEventQueueSize: 10, RetentionDays: 1}
12+
if err := cfg.ValidateConfig(); err != nil { t.Fatalf("validate: %v", err) }
13+
router, err := NewEngineRouter(cfg)
14+
if err != nil { t.Fatalf("router: %v", err) }
15+
if err := router.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) }
16+
17+
// Locate memory engine
18+
var mem *MemoryEventBus
19+
for _, eng := range router.engines { if m, ok := eng.(*MemoryEventBus); ok { mem = m; break } }
20+
if mem == nil { t.Fatalf("memory engine missing") }
21+
22+
delivered, dropped := mem.Stats()
23+
24+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
25+
defer cancel()
26+
sub, err := mem.Subscribe(ctx, "lifecycle.topic", func(ctx context.Context, e Event) error { return nil })
27+
if err != nil { t.Fatalf("subscribe: %v", err) }
28+
29+
// First unsubscribe
30+
if err := mem.Unsubscribe(ctx, sub); err != nil { t.Fatalf("unsubscribe first: %v", err) }
31+
// Second unsubscribe on memory engine is a silent no-op (returns nil). Ensure it doesn't error.
32+
if err := mem.Unsubscribe(ctx, sub); err != nil { t.Fatalf("second unsubscribe should be no-op, got error: %v", err) }
33+
34+
// Direct double cancel path also returns nil.
35+
if err := sub.Cancel(); err != nil { t.Fatalf("second direct cancel: %v", err) }
36+
37+
// Publish events to confirm no delivery after unsubscribe.
38+
if err := mem.Publish(ctx, Event{Topic: "lifecycle.topic"}); err != nil { t.Fatalf("publish: %v", err) }
39+
newDelivered, newDropped := mem.Stats()
40+
if newDelivered != delivered || newDropped != dropped { t.Fatalf("expected stats unchanged after publishing to removed subscription: got %d/%d -> %d/%d", delivered, dropped, newDelivered, newDropped) }
41+
}
42+
43+
// TestEngineRouterDoubleUnsubscribeIdempotent verifies router-level double unsubscribe is idempotent
44+
// (returns nil just like the underlying memory engine). The ErrSubscriptionNotFound branch is
45+
// covered separately using a dummy subscription of an unknown concrete type in
46+
// engine_router_additional_test.go.
47+
func TestEngineRouterDoubleUnsubscribeIdempotent(t *testing.T) {
48+
cfg := &EventBusConfig{Engine: "memory", WorkerCount: 1, DefaultEventBufferSize: 1, MaxEventQueueSize: 5, RetentionDays: 1}
49+
if err := cfg.ValidateConfig(); err != nil { t.Fatalf("validate: %v", err) }
50+
router, err := NewEngineRouter(cfg)
51+
if err != nil { t.Fatalf("router: %v", err) }
52+
if err := router.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) }
53+
54+
sub, err := router.Subscribe(context.Background(), "router.lifecycle", func(ctx context.Context, e Event) error { return nil })
55+
if err != nil { t.Fatalf("subscribe: %v", err) }
56+
if err := router.Unsubscribe(context.Background(), sub); err != nil { t.Fatalf("first unsubscribe: %v", err) }
57+
// Second unsubscribe should traverse all engines, none handle it, yielding ErrSubscriptionNotFound.
58+
if err := router.Unsubscribe(context.Background(), sub); err != nil {
59+
t.Fatalf("second unsubscribe should be idempotent (nil), got %v", err)
60+
}
61+
}

modules/scheduler/module_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,3 +556,118 @@ func TestJobPersistence(t *testing.T) {
556556
}
557557
})
558558
}
559+
560+
// Additional coverage tests for validation errors, resume logic, cleanup, and persistence edge cases.
561+
func TestSchedulerEdgeCases(t *testing.T) {
562+
module := NewModule().(*SchedulerModule)
563+
app := newMockApp()
564+
module.RegisterConfig(app)
565+
module.Init(app)
566+
ctx := context.Background()
567+
require.NoError(t, module.Start(ctx))
568+
defer module.Stop(ctx)
569+
570+
t.Run("ScheduleJobMissingTiming", func(t *testing.T) {
571+
_, err := module.ScheduleJob(Job{Name: "no-timing"})
572+
assert.ErrorIs(t, err, ErrJobInvalidSchedule)
573+
})
574+
575+
t.Run("ScheduleRecurringMissingSchedule", func(t *testing.T) {
576+
_, err := module.ScheduleJob(Job{Name: "rec-missing", IsRecurring: true})
577+
// Current implementation returns ErrJobInvalidSchedule before specific recurring check
578+
assert.ErrorIs(t, err, ErrJobInvalidSchedule)
579+
})
580+
581+
t.Run("ScheduleRecurringInvalidCron", func(t *testing.T) {
582+
_, err := module.ScheduleJob(Job{Name: "rec-invalid", IsRecurring: true, Schedule: "* * *"})
583+
assert.Error(t, err)
584+
})
585+
586+
t.Run("ResumeJobMissingID", func(t *testing.T) {
587+
_, err := module.scheduler.ResumeJob(Job{})
588+
assert.ErrorIs(t, err, ErrJobIDRequired)
589+
})
590+
591+
t.Run("ResumeJobNoNextRunTime", func(t *testing.T) {
592+
// Past run time with no future next run forces ErrJobNoValidNextRunTime
593+
_, err := module.scheduler.ResumeJob(Job{ID: "abc", RunAt: time.Now().Add(-1 * time.Hour)})
594+
assert.ErrorIs(t, err, ErrJobNoValidNextRunTime)
595+
})
596+
597+
t.Run("ResumeRecurringJobMissingID", func(t *testing.T) {
598+
_, err := module.scheduler.ResumeRecurringJob(Job{IsRecurring: true, Schedule: "* * * * *"})
599+
assert.ErrorIs(t, err, ErrRecurringJobIDRequired)
600+
})
601+
602+
t.Run("ResumeRecurringJobNotRecurring", func(t *testing.T) {
603+
_, err := module.scheduler.ResumeRecurringJob(Job{ID: "id1", IsRecurring: false})
604+
assert.ErrorIs(t, err, ErrJobMustBeRecurring)
605+
})
606+
607+
t.Run("ResumeRecurringJobInvalidCron", func(t *testing.T) {
608+
_, err := module.scheduler.ResumeRecurringJob(Job{ID: "id2", IsRecurring: true, Schedule: "* * *"})
609+
assert.Error(t, err)
610+
})
611+
612+
// Success path: resume one-time job with future RunAt
613+
t.Run("ResumeJobSuccess", func(t *testing.T) {
614+
future := time.Now().Add(30 * time.Minute)
615+
job := Job{ID: "resume-one", Name: "resume-one", RunAt: future, Status: JobStatusCancelled}
616+
// Add job to store first
617+
require.NoError(t, module.scheduler.jobStore.AddJob(job))
618+
_, err := module.scheduler.ResumeJob(job)
619+
assert.NoError(t, err)
620+
stored, err := module.scheduler.GetJob("resume-one")
621+
require.NoError(t, err)
622+
assert.Equal(t, JobStatusPending, stored.Status)
623+
assert.NotNil(t, stored.NextRun)
624+
if stored.NextRun != nil {
625+
assert.WithinDuration(t, future, *stored.NextRun, time.Minute) // allow minute boundary drift
626+
}
627+
})
628+
629+
// Success path: resume recurring job with valid cron schedule
630+
t.Run("ResumeRecurringJobSuccess", func(t *testing.T) {
631+
job := Job{ID: "resume-rec", Name: "resume-rec", IsRecurring: true, Schedule: "* * * * *", Status: JobStatusCancelled}
632+
require.NoError(t, module.scheduler.jobStore.AddJob(job))
633+
_, err := module.scheduler.ResumeRecurringJob(job)
634+
assert.NoError(t, err)
635+
stored, err := module.scheduler.GetJob("resume-rec")
636+
require.NoError(t, err)
637+
assert.Equal(t, JobStatusPending, stored.Status)
638+
assert.NotNil(t, stored.NextRun)
639+
})
640+
}
641+
642+
func TestMemoryJobStoreCleanupAndPersistenceEdges(t *testing.T) {
643+
store := NewMemoryJobStore(24 * time.Hour)
644+
645+
// Add executions with different times
646+
oldExec := JobExecution{JobID: "job1", StartTime: time.Now().Add(-48 * time.Hour), Status: "completed"}
647+
recentExec := JobExecution{JobID: "job1", StartTime: time.Now(), Status: "completed"}
648+
require.NoError(t, store.AddJobExecution(oldExec))
649+
require.NoError(t, store.AddJobExecution(recentExec))
650+
651+
// Cleanup older than 24h
652+
cutoff := time.Now().Add(-24 * time.Hour)
653+
require.NoError(t, store.CleanupOldExecutions(cutoff))
654+
execs, err := store.GetJobExecutions("job1")
655+
require.NoError(t, err)
656+
assert.Len(t, execs, 1)
657+
assert.Equal(t, recentExec.StartTime, execs[0].StartTime)
658+
659+
t.Run("LoadFromFileNonexistent", func(t *testing.T) {
660+
jobs, err := store.LoadFromFile("/tmp/nonexistent-file-should-not-exist.json")
661+
require.NoError(t, err)
662+
assert.Len(t, jobs, 0)
663+
})
664+
665+
t.Run("SaveAndLoadEmptyJobs", func(t *testing.T) {
666+
tmp := fmt.Sprintf("/tmp/scheduler-empty-%d.json", time.Now().UnixNano())
667+
require.NoError(t, store.SaveToFile([]Job{}, tmp))
668+
jobs, err := store.LoadFromFile(tmp)
669+
require.NoError(t, err)
670+
assert.Len(t, jobs, 0)
671+
_ = os.Remove(tmp)
672+
})
673+
}

0 commit comments

Comments
 (0)