diff --git a/apps/uno/bootstrap/api.go b/apps/uno/bootstrap/api.go index a29fa5184d..b71e968ae5 100644 --- a/apps/uno/bootstrap/api.go +++ b/apps/uno/bootstrap/api.go @@ -9,6 +9,7 @@ import ( "uno/domain/service" "uno/http" "uno/infrastructure/external" + "uno/infrastructure/decorator" "uno/infrastructure/logging" "uno/infrastructure/postgres" "uno/infrastructure/telemetry" @@ -70,6 +71,9 @@ func RunApi() { weatherRepo := external.NewYrRepo(logger) databrusRepo := external.NewDatabrusRepo(logger) + // Initialize decorators + registrationRepo = decorator.NewRaffleDecorator(registrationRepo, notif) + // Initialize services authService := service.NewAuthService(sessionRepo, userRepo) happeningService := service.NewHappeningService(happeningRepo, userRepo, registrationRepo, banInfoRepo) diff --git a/apps/uno/domain/port/mocks/RegistrationRepo.go b/apps/uno/domain/port/mocks/RegistrationRepo.go index 3b14d363e3..87eb419f9f 100644 --- a/apps/uno/domain/port/mocks/RegistrationRepo.go +++ b/apps/uno/domain/port/mocks/RegistrationRepo.go @@ -39,8 +39,8 @@ func (_m *RegistrationRepo) EXPECT() *RegistrationRepo_Expecter { } // CreateRegistration provides a mock function for the type RegistrationRepo -func (_mock *RegistrationRepo) CreateRegistration(ctx context.Context, userID string, happeningID string, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool) (*model.Registration, bool, error) { - ret := _mock.Called(ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange) +func (_mock *RegistrationRepo) CreateRegistration(ctx context.Context, userID string, happening model.Happening, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool) (*model.Registration, bool, error) { + ret := _mock.Called(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) if len(ret) == 0 { panic("no return value specified for CreateRegistration") @@ -49,23 +49,23 @@ func (_mock *RegistrationRepo) CreateRegistration(ctx context.Context, userID st var r0 *model.Registration var r1 bool var r2 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, []model.SpotRange, []string, bool) (*model.Registration, bool, error)); ok { - return returnFunc(ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, model.Happening, []model.SpotRange, []string, bool) (*model.Registration, bool, error)); ok { + return returnFunc(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) } - if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, []model.SpotRange, []string, bool) *model.Registration); ok { - r0 = returnFunc(ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, model.Happening, []model.SpotRange, []string, bool) *model.Registration); ok { + r0 = returnFunc(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*model.Registration) } } - if returnFunc, ok := ret.Get(1).(func(context.Context, string, string, []model.SpotRange, []string, bool) bool); ok { - r1 = returnFunc(ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange) + if returnFunc, ok := ret.Get(1).(func(context.Context, string, model.Happening, []model.SpotRange, []string, bool) bool); ok { + r1 = returnFunc(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) } else { r1 = ret.Get(1).(bool) } - if returnFunc, ok := ret.Get(2).(func(context.Context, string, string, []model.SpotRange, []string, bool) error); ok { - r2 = returnFunc(ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange) + if returnFunc, ok := ret.Get(2).(func(context.Context, string, model.Happening, []model.SpotRange, []string, bool) error); ok { + r2 = returnFunc(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) } else { r2 = ret.Error(2) } @@ -80,15 +80,15 @@ type RegistrationRepo_CreateRegistration_Call struct { // CreateRegistration is a helper method to define mock.On call // - ctx context.Context // - userID string -// - happeningID string +// - happening model.Happening // - spotRanges []model.SpotRange // - hostGroups []string // - canSkipSpotRange bool -func (_e *RegistrationRepo_Expecter) CreateRegistration(ctx interface{}, userID interface{}, happeningID interface{}, spotRanges interface{}, hostGroups interface{}, canSkipSpotRange interface{}) *RegistrationRepo_CreateRegistration_Call { - return &RegistrationRepo_CreateRegistration_Call{Call: _e.mock.On("CreateRegistration", ctx, userID, happeningID, spotRanges, hostGroups, canSkipSpotRange)} +func (_e *RegistrationRepo_Expecter) CreateRegistration(ctx interface{}, userID interface{}, happening interface{}, spotRanges interface{}, hostGroups interface{}, canSkipSpotRange interface{}) *RegistrationRepo_CreateRegistration_Call { + return &RegistrationRepo_CreateRegistration_Call{Call: _e.mock.On("CreateRegistration", ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange)} } -func (_c *RegistrationRepo_CreateRegistration_Call) Run(run func(ctx context.Context, userID string, happeningID string, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool)) *RegistrationRepo_CreateRegistration_Call { +func (_c *RegistrationRepo_CreateRegistration_Call) Run(run func(ctx context.Context, userID string, happening model.Happening, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool)) *RegistrationRepo_CreateRegistration_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -98,9 +98,9 @@ func (_c *RegistrationRepo_CreateRegistration_Call) Run(run func(ctx context.Con if args[1] != nil { arg1 = args[1].(string) } - var arg2 string + var arg2 model.Happening if args[2] != nil { - arg2 = args[2].(string) + arg2 = args[2].(model.Happening) } var arg3 []model.SpotRange if args[3] != nil { @@ -131,7 +131,7 @@ func (_c *RegistrationRepo_CreateRegistration_Call) Return(registration *model.R return _c } -func (_c *RegistrationRepo_CreateRegistration_Call) RunAndReturn(run func(ctx context.Context, userID string, happeningID string, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool) (*model.Registration, bool, error)) *RegistrationRepo_CreateRegistration_Call { +func (_c *RegistrationRepo_CreateRegistration_Call) RunAndReturn(run func(ctx context.Context, userID string, happening model.Happening, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool) (*model.Registration, bool, error)) *RegistrationRepo_CreateRegistration_Call { _c.Call.Return(run) return _c } diff --git a/apps/uno/domain/port/registration.go b/apps/uno/domain/port/registration.go index 5fc2f9ee22..618c6884a5 100644 --- a/apps/uno/domain/port/registration.go +++ b/apps/uno/domain/port/registration.go @@ -9,7 +9,8 @@ type RegistrationRepo interface { GetByUserAndHappening(ctx context.Context, userID, happeningID string) (*model.Registration, error) CreateRegistration( ctx context.Context, - userID, happeningID string, + userID string, + happening model.Happening, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool, diff --git a/apps/uno/domain/service/happening.go b/apps/uno/domain/service/happening.go index 43337538ea..5015fcaca7 100644 --- a/apps/uno/domain/service/happening.go +++ b/apps/uno/domain/service/happening.go @@ -371,7 +371,7 @@ func (hs *HappeningService) Register( _, isWaitlisted, err := hs.registrationRepo.CreateRegistration( ctx, userID, - happeningID, + happening, spotRanges, hostGroups, canSkipSpotRange, diff --git a/apps/uno/infrastructure/decorator/raffle.go b/apps/uno/infrastructure/decorator/raffle.go new file mode 100644 index 0000000000..fb354c4eae --- /dev/null +++ b/apps/uno/infrastructure/decorator/raffle.go @@ -0,0 +1,136 @@ +package decorator + +import ( + "context" + "sync" + "time" + "uno/domain/model" + "uno/domain/port" + + "github.com/jesperkha/notifier" +) + +// TODO: raffle test: create a registration and poll its state for 15 seconds for a 10 second window +// TODO: add ID field to all Registration models (dto, model) +// TODO: make regRepo.BatchUpdateStatus() and insert the pending regs immediately as pending and batch update later +// TODO: make /happenings/{id}/registrations/{id}/status endpoint + +const ( + // Maximum length of registration queue before blocking + MAX_QUEUE = 200 + + // How long the raffle period lasts at the start of a bedpres registration period + RAFFLE_DURATION = 10 * time.Second +) + +// RaffleDecorator wraps port.RegistrationRepo and adds raffle +// registration for bedpres happenings in a given time window. +// Implements port.RegistrationRepo +type RaffleDecorator struct { + repo port.RegistrationRepo + mu *sync.Mutex + queues map[string]*eventQueue +} + +type eventQueue struct { + queue chan registration +} + +type registration struct { + regId int +} + +func NewRaffleDecorator(repo port.RegistrationRepo, notif *notifier.Notifier) port.RegistrationRepo { + return &RaffleDecorator{ + repo: repo, + queues: map[string]*eventQueue{}, + mu: &sync.Mutex{}, + } +} + +// CreateRegistration implements port.RegistrationRepo. +func (r *RaffleDecorator) CreateRegistration( + ctx context.Context, + userID string, + happening model.Happening, + spotRanges []model.SpotRange, + hostGroups []string, + canSkipSpotRange bool, +) (*model.Registration, bool, error) { + shouldEnqueue := happening.IsBedpres() && happening.RegistrationStart != nil + + if !shouldEnqueue { + return r.repo.CreateRegistration(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) + } + + // This lock is also held by r.flushAndDeleteQueue, ensuring that the queue + // is flushed before any new registrations have the chance to go through. + r.mu.Lock() + defer r.mu.Unlock() + + // SAFETY: asserted not nil + windowClose := (*happening.RegistrationStart).Add(RAFFLE_DURATION) + + // If raffle window is closed, register normally without enqueuing. + if time.Now().After(windowClose) { + return r.repo.CreateRegistration(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange) + } + + eq, ok := r.queues[happening.ID] + if !ok { + eq = &eventQueue{ + queue: make(chan registration, MAX_QUEUE), + } + + go func() { + <-time.After(time.Until(windowClose)) + _ = r.flushAndDeleteQueue(happening.ID) + }() + + r.queues[happening.ID] = eq + } + + eq.queue <- registration{ + regId: 0, // TODO: add regid + } + + return &model.Registration{ + UserID: userID, + HappeningID: happening.ID, + Status: model.RegistrationStatusPending, + UnregisterReason: nil, + CreatedAt: time.Now(), + PrevStatus: nil, + ChangedAt: nil, + ChangedBy: nil, + }, false, nil +} + +func (r *RaffleDecorator) flushAndDeleteQueue(happeningId string) error { + // Lock any new incomming registrations for this + // happening from trying to enqueue or register. + r.mu.Lock() + defer r.mu.Unlock() + + _, ok := r.queues[happeningId] + + // No queue to flush + if !ok { + return nil + } + + // TODO: randomize and flush queue. + + delete(r.queues, happeningId) // SAFETY: locked + return nil +} + +// GetByUserAndHappening implements port.RegistrationRepo. +func (r *RaffleDecorator) GetByUserAndHappening(ctx context.Context, userID string, happeningID string) (*model.Registration, error) { + return r.repo.GetByUserAndHappening(ctx, userID, happeningID) +} + +// InsertAnswers implements port.RegistrationRepo. +func (r *RaffleDecorator) InsertAnswers(ctx context.Context, userID string, happeningID string, questions []model.QuestionAnswer) error { + return r.repo.InsertAnswers(ctx, userID, happeningID, questions) +} diff --git a/apps/uno/infrastructure/postgres/registration.go b/apps/uno/infrastructure/postgres/registration.go index 418387a9b4..d809a2bf5b 100644 --- a/apps/uno/infrastructure/postgres/registration.go +++ b/apps/uno/infrastructure/postgres/registration.go @@ -55,15 +55,15 @@ func (r *RegistrationRepo) GetByUserAndHappening(ctx context.Context, userID, ha // An error here could be because of an issue with locking the table or a DB error. func (r *RegistrationRepo) CreateRegistration( ctx context.Context, - - userID, happeningID string, + userID string, + happening model.Happening, spotRanges []model.SpotRange, hostGroups []string, canSkipSpotRange bool, ) (*model.Registration, bool, error) { r.logger.Info(ctx, "creating registration", "user_id", userID, - "happening_id", happeningID, + "happening_id", happening.ID, ) tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ @@ -94,11 +94,11 @@ func (r *RegistrationRepo) CreateRegistration( WHERE happening_id = $1 AND (status = 'registered' OR status = 'waiting') ` - err = tx.SelectContext(ctx, &existingRegsDB, query, happeningID) + err = tx.SelectContext(ctx, &existingRegsDB, query, happening.ID) if err != nil { r.logger.Error(ctx, "failed to get existing registrations for happening", "error", err, - "happening_id", happeningID, + "happening_id", happening.ID, ) return nil, false, err } @@ -182,6 +182,7 @@ func (r *RegistrationRepo) CreateRegistration( return nil, false, sql.ErrNoRows } + // TODO: rar import-cycle hvor service bruker dette repo, men dette repo bruker service isRegistered := service.IsAvailableSpot( spotRanges, existingRegs, @@ -209,12 +210,12 @@ func (r *RegistrationRepo) CreateRegistration( RETURNING user_id, happening_id, status, unregister_reason, created_at, prev_status, changed_at, changed_by ` - err = tx.GetContext(ctx, ®istrationDB, upsertQuery, userID, happeningID, string(status)) + err = tx.GetContext(ctx, ®istrationDB, upsertQuery, userID, happening.ID, string(status)) if err != nil { r.logger.Error(ctx, "failed to upsert registration", "error", err, "user_id", userID, - "happening_id", happeningID, + "happening_id", happening.ID, ) return nil, false, err } diff --git a/apps/uno/infrastructure/postgres/registration_test.go b/apps/uno/infrastructure/postgres/registration_test.go index 505c35d0a6..8429d6e96d 100644 --- a/apps/uno/infrastructure/postgres/registration_test.go +++ b/apps/uno/infrastructure/postgres/registration_test.go @@ -257,7 +257,7 @@ func TestRegistrationRepo_CreateRegistration(t *testing.T) { registration, isWaitlisted, err := repo.CreateRegistration( ctx, createdUser.ID, - createdHappening.ID, + createdHappening, []model.SpotRange{spotRange}, []string{}, false, @@ -340,7 +340,7 @@ func TestRegistrationRepo_CreateRegistrationWaitlisted(t *testing.T) { _, isWaitlisted1, err := repo.CreateRegistration( ctx, createdUser1.ID, - createdHappening.ID, + createdHappening, []model.SpotRange{spotRange}, []string{}, false, @@ -352,7 +352,7 @@ func TestRegistrationRepo_CreateRegistrationWaitlisted(t *testing.T) { registration2, isWaitlisted2, err := repo.CreateRegistration( ctx, createdUser2.ID, - createdHappening.ID, + createdHappening, []model.SpotRange{spotRange}, []string{}, false,