Skip to content

Commit ce3407c

Browse files
authored
Merge pull request #4 from feedloop/feature/expander-idempotency
fix(expander): make expander idempotent and prevent duplicate occurre…
2 parents 878a2f1 + 475ee1a commit ce3407c

7 files changed

Lines changed: 168 additions & 1 deletion

File tree

internal/handlers/event_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func isValidSchedule(schedule *models.ScheduleConfig) bool {
358358

359359
// Validate frequency
360360
switch schedule.Frequency {
361-
case "daily", "weekly", "monthly", "yearly":
361+
case "minutely", "hourly", "daily", "weekly", "monthly", "yearly":
362362
// Valid frequency
363363
default:
364364
return false
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package handlers
2+
3+
import (
4+
"testing"
5+
6+
"github.com/feedloop/qhronos/internal/models"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestIsValidSchedule(t *testing.T) {
11+
valid := []*models.ScheduleConfig{
12+
{Frequency: "minutely", Interval: 1},
13+
{Frequency: "hourly", Interval: 1},
14+
{Frequency: "daily", Interval: 1},
15+
{Frequency: "weekly", Interval: 1, ByDay: []string{"MO", "WE"}},
16+
{Frequency: "monthly", Interval: 1, ByMonthDay: []int{1, 15}},
17+
{Frequency: "yearly", Interval: 1, ByMonth: []int{1, 12}},
18+
}
19+
for _, sched := range valid {
20+
assert.True(t, isValidSchedule(sched), "should be valid: %+v", sched)
21+
}
22+
23+
invalid := []*models.ScheduleConfig{
24+
{Frequency: "secondly", Interval: 1},
25+
{Frequency: "foo", Interval: 1},
26+
{Frequency: "minutely", Interval: 0},
27+
{Frequency: "weekly", Interval: 1, ByDay: []string{"XX"}},
28+
{Frequency: "monthly", Interval: 1, ByMonthDay: []int{0, 32}},
29+
{Frequency: "yearly", Interval: 1, ByMonth: []int{0, 13}},
30+
}
31+
for _, sched := range invalid {
32+
assert.False(t, isValidSchedule(sched), "should be invalid: %+v", sched)
33+
}
34+
}

internal/repository/occurrence_repository.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,15 @@ func (r *OccurrenceRepository) GetLatestByOccurrenceID(ctx context.Context, occu
204204
}
205205
return &occurrence, nil
206206
}
207+
208+
// CountCompletedByEventID returns the number of completed occurrences for a given event ID
209+
func (r *OccurrenceRepository) CountCompletedByEventID(ctx context.Context, eventID uuid.UUID) (int, error) {
210+
var count int
211+
query := `SELECT COUNT(*) FROM occurrences WHERE event_id = $1 AND status = 'completed'`
212+
err := r.db.GetContext(ctx, &count, query, eventID)
213+
if err != nil {
214+
r.logger.Error("Error counting completed occurrences", zap.Error(err), zap.String("event_id", eventID.String()))
215+
return 0, err
216+
}
217+
return count, nil
218+
}

internal/scheduler/dispatcher.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,31 @@ func (d *Dispatcher) DispatchAction(ctx context.Context, sched *models.Schedule)
164164
}
165165
}
166166

167+
// Auto-inactivate recurring events if count or until is reached
168+
if event.Schedule != nil && event.Status == models.EventStatusActive && finalStatus == models.OccurrenceStatusCompleted {
169+
shouldInactivate := false
170+
if event.Schedule.Count != nil {
171+
completedCount, err := d.occurrenceRepo.CountCompletedByEventID(ctx, event.ID)
172+
if err == nil && completedCount >= *event.Schedule.Count {
173+
shouldInactivate = true
174+
}
175+
}
176+
if !shouldInactivate && event.Schedule.Until != nil {
177+
untilTime, err := time.Parse(time.RFC3339, *event.Schedule.Until)
178+
if err == nil && time.Now().After(untilTime) {
179+
shouldInactivate = true
180+
}
181+
}
182+
if shouldInactivate {
183+
event.Status = models.EventStatusInactive
184+
if err := d.eventRepo.Update(ctx, event); err != nil {
185+
d.logger.Error("Failed to auto-inactivate recurring event", zap.String("event_id", event.ID.String()), zap.Error(err))
186+
} else {
187+
d.logger.Info("Auto-inactivated recurring event", zap.String("event_id", event.ID.String()))
188+
}
189+
}
190+
}
191+
167192
return dispatchError // This error determines if it goes to retry queue
168193
}
169194

internal/scheduler/dispatcher_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,52 @@ func TestDispatcher(t *testing.T) {
327327
}
328328
assert.Equal(t, []string{"client3:c1", "client3:c2", "client3:c1", "client3:c2"}, mockNotifier.calls)
329329
})
330+
331+
t.Run("auto-inactivate recurring event when count is reached", func(t *testing.T) {
332+
cleanup()
333+
mockHTTP := new(MockHTTPClient)
334+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP)
335+
mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{
336+
StatusCode: 200,
337+
Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})),
338+
}, nil)
339+
whParams, _ := json.Marshal(models.WebhookActionParams{URL: "http://example.com/webhook"})
340+
count := 2
341+
event := &models.Event{
342+
ID: uuid.New(),
343+
Name: "Recurring Event",
344+
Description: "Test recurring event with count limit",
345+
StartTime: time.Now(),
346+
Action: &models.Action{Type: models.ActionTypeWebhook, Params: whParams},
347+
Status: models.EventStatusActive,
348+
Metadata: []byte(`{"key": "value"}`),
349+
Tags: pq.StringArray{"test"},
350+
CreatedAt: time.Now(),
351+
Schedule: &models.ScheduleConfig{Frequency: "daily", Interval: 1, Count: &count},
352+
}
353+
err := eventRepo.Create(ctx, event)
354+
require.NoError(t, err)
355+
for i := 0; i < 2; i++ {
356+
schedule := &models.Schedule{
357+
Occurrence: models.Occurrence{
358+
OccurrenceID: uuid.New(),
359+
EventID: event.ID,
360+
ScheduledAt: time.Now().Add(time.Duration(i) * time.Hour),
361+
},
362+
Name: event.Name,
363+
Description: event.Description,
364+
Webhook: "http://example.com/webhook",
365+
Metadata: event.Metadata,
366+
Tags: event.Tags,
367+
}
368+
err = dispatcher.DispatchAction(ctx, schedule)
369+
assert.NoError(t, err)
370+
}
371+
// After two completions, event should be inactive
372+
updatedEvent, err := eventRepo.GetByID(ctx, event.ID)
373+
require.NoError(t, err)
374+
assert.Equal(t, models.EventStatusInactive, updatedEvent.Status)
375+
})
330376
}
331377

332378
func TestDispatcher_RedisOnlyDispatch(t *testing.T) {

internal/scheduler/expander.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,16 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
235235
e.logger.Debug("Found future occurrences for event", zap.String("event_id", event.ID.String()), zap.Int("count", len(occurrences)))
236236

237237
for _, t := range occurrences {
238+
// Prevent duplicate occurrences: check if one already exists for this event and time
239+
exists, err := e.occurrenceRepo.ExistsAtTime(ctx, event.ID, t)
240+
if err != nil {
241+
e.logger.Error("Failed to check for existing occurrence", zap.Error(err), zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t))
242+
continue
243+
}
244+
if exists {
245+
e.logger.Debug("Occurrence already exists, skipping", zap.String("event_id", event.ID.String()), zap.Time("scheduled_at", t))
246+
continue
247+
}
238248
occurrence := &models.Occurrence{
239249
OccurrenceID: uuid.New(),
240250
EventID: event.ID,

internal/scheduler/expander_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,44 @@ func TestEventExpander(t *testing.T) {
384384
require.NoError(t, err)
385385
assert.NotEmpty(t, results)
386386
})
387+
388+
t.Run("minutely frequency event expansion is idempotent", func(t *testing.T) {
389+
cleanup()
390+
startTime := time.Now().Add(-2 * time.Minute).Truncate(time.Minute)
391+
// Create a minutely schedule
392+
scheduleConfig := &models.ScheduleConfig{
393+
Frequency: "minutely",
394+
Interval: 1,
395+
}
396+
event := &models.Event{
397+
ID: uuid.New(),
398+
Name: "Minutely Event",
399+
Description: "Test minutely event for idempotency",
400+
StartTime: startTime,
401+
Webhook: "http://example.com",
402+
Schedule: scheduleConfig,
403+
Status: models.EventStatusActive,
404+
Metadata: []byte(`{"key": "value"}`),
405+
Tags: pq.StringArray{"test"},
406+
CreatedAt: time.Now(),
407+
}
408+
err := eventRepo.Create(ctx, event)
409+
require.NoError(t, err)
410+
// Run expansion twice
411+
err = expander.ExpandEvents(ctx)
412+
require.NoError(t, err)
413+
err = expander.ExpandEvents(ctx)
414+
require.NoError(t, err)
415+
// Get all occurrences for this event from the DB
416+
occurrences, err := occurrenceRepo.ListByEventID(ctx, event.ID)
417+
require.NoError(t, err)
418+
// There should be only one occurrence per scheduled time
419+
scheduledTimes := make(map[int64]struct{})
420+
for _, occ := range occurrences {
421+
ts := occ.ScheduledAt.Unix()
422+
_, exists := scheduledTimes[ts]
423+
assert.False(t, exists, "Duplicate occurrence for scheduled time: %v", occ.ScheduledAt)
424+
scheduledTimes[ts] = struct{}{}
425+
}
426+
})
387427
}

0 commit comments

Comments
 (0)