Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/uno/bootstrap/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"uno/domain/service"
"uno/http"
"uno/infrastructure/external"
"uno/infrastructure/decorator"
"uno/infrastructure/logging"
"uno/infrastructure/postgres"
"uno/infrastructure/telemetry"
Expand Down Expand Up @@ -70,6 +71,9 @@ func RunApi() {
weatherRepo := external.NewYrRepo(logger)
databrusRepo := external.NewDatabrusRepo(logger)

// Initialize decorators
registrationRepo = decorator.NewRaffleDecorator(registrationRepo, notif)

// Initialize services
authService := service.NewAuthService(sessionRepo, userRepo)
happeningService := service.NewHappeningService(happeningRepo, userRepo, registrationRepo, banInfoRepo)
Expand Down
34 changes: 17 additions & 17 deletions apps/uno/domain/port/mocks/RegistrationRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/uno/domain/port/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ type RegistrationRepo interface {
GetByUserAndHappening(ctx context.Context, userID, happeningID string) (*model.Registration, error)
CreateRegistration(
ctx context.Context,
userID, happeningID string,
userID string,
happening model.Happening,
spotRanges []model.SpotRange,
hostGroups []string,
canSkipSpotRange bool,
Expand Down
2 changes: 1 addition & 1 deletion apps/uno/domain/service/happening.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (hs *HappeningService) Register(
_, isWaitlisted, err := hs.registrationRepo.CreateRegistration(
ctx,
userID,
happeningID,
happening,
spotRanges,
hostGroups,
canSkipSpotRange,
Expand Down
136 changes: 136 additions & 0 deletions apps/uno/infrastructure/decorator/raffle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package decorator

import (
"context"
"sync"
"time"
"uno/domain/model"
"uno/domain/port"

"github.com/jesperkha/notifier"
)

// TODO: raffle test: create a registration and poll its state for 15 seconds for a 10 second window
// TODO: add ID field to all Registration models (dto, model)
// TODO: make regRepo.BatchUpdateStatus() and insert the pending regs immediately as pending and batch update later
// TODO: make /happenings/{id}/registrations/{id}/status endpoint

const (
// Maximum length of registration queue before blocking
MAX_QUEUE = 200

// How long the raffle period lasts at the start of a bedpres registration period
RAFFLE_DURATION = 10 * time.Second
)

// RaffleDecorator wraps port.RegistrationRepo and adds raffle
// registration for bedpres happenings in a given time window.
// Implements port.RegistrationRepo
type RaffleDecorator struct {
repo port.RegistrationRepo
mu *sync.Mutex
queues map[string]*eventQueue
}

type eventQueue struct {
queue chan registration
}

type registration struct {
regId int
}

func NewRaffleDecorator(repo port.RegistrationRepo, notif *notifier.Notifier) port.RegistrationRepo {
return &RaffleDecorator{
repo: repo,
queues: map[string]*eventQueue{},
mu: &sync.Mutex{},
}
}

// CreateRegistration implements port.RegistrationRepo.
func (r *RaffleDecorator) CreateRegistration(
ctx context.Context,
userID string,
happening model.Happening,
spotRanges []model.SpotRange,
hostGroups []string,
canSkipSpotRange bool,
) (*model.Registration, bool, error) {
shouldEnqueue := happening.IsBedpres() && happening.RegistrationStart != nil

if !shouldEnqueue {
return r.repo.CreateRegistration(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange)
}

// This lock is also held by r.flushAndDeleteQueue, ensuring that the queue
// is flushed before any new registrations have the chance to go through.
r.mu.Lock()
defer r.mu.Unlock()

// SAFETY: asserted not nil
windowClose := (*happening.RegistrationStart).Add(RAFFLE_DURATION)

// If raffle window is closed, register normally without enqueuing.
if time.Now().After(windowClose) {
return r.repo.CreateRegistration(ctx, userID, happening, spotRanges, hostGroups, canSkipSpotRange)
}

eq, ok := r.queues[happening.ID]
if !ok {
eq = &eventQueue{
queue: make(chan registration, MAX_QUEUE),
}

go func() {
<-time.After(time.Until(windowClose))
_ = r.flushAndDeleteQueue(happening.ID)
}()

r.queues[happening.ID] = eq
}

eq.queue <- registration{
regId: 0, // TODO: add regid
}

return &model.Registration{
UserID: userID,
HappeningID: happening.ID,
Status: model.RegistrationStatusPending,
UnregisterReason: nil,
CreatedAt: time.Now(),
PrevStatus: nil,
ChangedAt: nil,
ChangedBy: nil,
}, false, nil
}

func (r *RaffleDecorator) flushAndDeleteQueue(happeningId string) error {
// Lock any new incomming registrations for this
// happening from trying to enqueue or register.
r.mu.Lock()
defer r.mu.Unlock()

_, ok := r.queues[happeningId]

// No queue to flush
if !ok {
return nil
}

// TODO: randomize and flush queue.

delete(r.queues, happeningId) // SAFETY: locked
return nil
}

// GetByUserAndHappening implements port.RegistrationRepo.
func (r *RaffleDecorator) GetByUserAndHappening(ctx context.Context, userID string, happeningID string) (*model.Registration, error) {
return r.repo.GetByUserAndHappening(ctx, userID, happeningID)
}

// InsertAnswers implements port.RegistrationRepo.
func (r *RaffleDecorator) InsertAnswers(ctx context.Context, userID string, happeningID string, questions []model.QuestionAnswer) error {
return r.repo.InsertAnswers(ctx, userID, happeningID, questions)
}
15 changes: 8 additions & 7 deletions apps/uno/infrastructure/postgres/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func (r *RegistrationRepo) GetByUserAndHappening(ctx context.Context, userID, ha
// An error here could be because of an issue with locking the table or a DB error.
func (r *RegistrationRepo) CreateRegistration(
ctx context.Context,

userID, happeningID string,
userID string,
happening model.Happening,
spotRanges []model.SpotRange,
hostGroups []string,
canSkipSpotRange bool,
) (*model.Registration, bool, error) {
r.logger.Info(ctx, "creating registration",
"user_id", userID,
"happening_id", happeningID,
"happening_id", happening.ID,
)

tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{
Expand Down Expand Up @@ -94,11 +94,11 @@ func (r *RegistrationRepo) CreateRegistration(
WHERE happening_id = $1
AND (status = 'registered' OR status = 'waiting')
`
err = tx.SelectContext(ctx, &existingRegsDB, query, happeningID)
err = tx.SelectContext(ctx, &existingRegsDB, query, happening.ID)
if err != nil {
r.logger.Error(ctx, "failed to get existing registrations for happening",
"error", err,
"happening_id", happeningID,
"happening_id", happening.ID,
)
return nil, false, err
}
Expand Down Expand Up @@ -182,6 +182,7 @@ func (r *RegistrationRepo) CreateRegistration(
return nil, false, sql.ErrNoRows
}

// TODO: rar import-cycle hvor service bruker dette repo, men dette repo bruker service
isRegistered := service.IsAvailableSpot(
spotRanges,
existingRegs,
Expand Down Expand Up @@ -209,12 +210,12 @@ func (r *RegistrationRepo) CreateRegistration(
RETURNING user_id, happening_id, status, unregister_reason,
created_at, prev_status, changed_at, changed_by
`
err = tx.GetContext(ctx, &registrationDB, upsertQuery, userID, happeningID, string(status))
err = tx.GetContext(ctx, &registrationDB, upsertQuery, userID, happening.ID, string(status))
if err != nil {
r.logger.Error(ctx, "failed to upsert registration",
"error", err,
"user_id", userID,
"happening_id", happeningID,
"happening_id", happening.ID,
)
return nil, false, err
}
Expand Down
6 changes: 3 additions & 3 deletions apps/uno/infrastructure/postgres/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestRegistrationRepo_CreateRegistration(t *testing.T) {
registration, isWaitlisted, err := repo.CreateRegistration(
ctx,
createdUser.ID,
createdHappening.ID,
createdHappening,
[]model.SpotRange{spotRange},
[]string{},
false,
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestRegistrationRepo_CreateRegistrationWaitlisted(t *testing.T) {
_, isWaitlisted1, err := repo.CreateRegistration(
ctx,
createdUser1.ID,
createdHappening.ID,
createdHappening,
[]model.SpotRange{spotRange},
[]string{},
false,
Expand All @@ -352,7 +352,7 @@ func TestRegistrationRepo_CreateRegistrationWaitlisted(t *testing.T) {
registration2, isWaitlisted2, err := repo.CreateRegistration(
ctx,
createdUser2.ID,
createdHappening.ID,
createdHappening,
[]model.SpotRange{spotRange},
[]string{},
false,
Expand Down
Loading