Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion hooks/hookexecution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type executionContext struct {
account *config.Account
moduleContexts *moduleContexts
activityControl privacy.ActivityControl
// holdReadLock indicates whether to hold moduleContexts read lock
// for the duration of hook group execution. This is needed for stages
// that run concurrently (bidder stages) to prevent concurrent map access.
holdReadLock bool
}

func (ctx executionContext) getModuleContext(moduleName string) hookstage.ModuleInvocationContext {
Expand All @@ -30,7 +34,32 @@ func (ctx executionContext) getModuleContext(moduleName string) hookstage.Module
if ctx.account != nil {
cfg, err := ctx.account.Hooks.Modules.ModuleConfig(moduleName)
if err != nil {
logger.Warnf("Failed to get account config for %s module: %s", moduleName, err)
logger.Warnf("Failed to get account config for %s module: %v", moduleName, err)
}

moduleInvocationCtx.AccountID = ctx.accountID
moduleInvocationCtx.AccountConfig = cfg
}

return moduleInvocationCtx
}

// getModuleContextLocked reads from moduleContexts without locking.
// IMPORTANT: Caller must hold moduleContexts.RLock.
func (ctx executionContext) getModuleContextLocked(moduleName string) hookstage.ModuleInvocationContext {
moduleInvocationCtx := hookstage.ModuleInvocationContext{Endpoint: ctx.endpoint}

// Direct map access - caller holds lock
if ctx.moduleContexts != nil && ctx.moduleContexts.ctxs != nil {
if mc, ok := ctx.moduleContexts.ctxs[moduleName]; ok {
moduleInvocationCtx.ModuleContext = mc
}
}

if ctx.account != nil {
cfg, err := ctx.account.Hooks.Modules.ModuleConfig(moduleName)
if err != nil {
logger.Warnf("Failed to get account config for %s module: %v", moduleName, err)
}

moduleInvocationCtx.AccountID = ctx.accountID
Expand Down
59 changes: 40 additions & 19 deletions hooks/hookexecution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,48 @@ func executeGroup[H any, P any](
hookHandler hookHandler[H, P],
metricEngine metrics.MetricsEngine,
) (GroupOutcome, P, groupModuleContext, *RejectError) {
var wg sync.WaitGroup
rejected := make(chan struct{})
resp := make(chan hookResponse[P], len(group.Hooks))

for _, hook := range group.Hooks {
mCtx := executionCtx.getModuleContext(hook.Module)
mCtx.HookImplCode = hook.Code
newPayload := handleModuleActivities(hook.Code, executionCtx.activityControl, payload, executionCtx.account)
wg.Add(1)
go func(hw hooks.HookWrapper[H], moduleCtx hookstage.ModuleInvocationContext) {
defer wg.Done()
executeHook(moduleCtx, hw, newPayload, hookHandler, group.Timeout, resp, rejected)
}(hook, mCtx)
}
// For concurrent stages (bidder request/response), spawn goroutines
// and collect responses within a scoped lock to prevent concurrent
// map access to moduleContexts
hookResponses := func() []hookResponse[P] {
var wg sync.WaitGroup
rejected := make(chan struct{})
resp := make(chan hookResponse[P], len(group.Hooks))
// Hold read lock for the duration that goroutines are spawned and executing
if executionCtx.holdReadLock && executionCtx.moduleContexts != nil {
executionCtx.moduleContexts.RLock()
defer executionCtx.moduleContexts.RUnlock()
}

go func() {
wg.Wait()
close(resp)
}()
for _, hook := range group.Hooks {
var mCtx hookstage.ModuleInvocationContext

if executionCtx.holdReadLock {
// Lock already held, use lockless accessor
mCtx = executionCtx.getModuleContextLocked(hook.Module)
} else {
// Normal case: acquire and release lock per hook
mCtx = executionCtx.getModuleContext(hook.Module)
}

hookResponses := collectHookResponses(resp, rejected)
mCtx.HookImplCode = hook.Code
newPayload := handleModuleActivities(hook.Code, executionCtx.activityControl, payload, executionCtx.account)
wg.Add(1)
go func(hw hooks.HookWrapper[H], moduleCtx hookstage.ModuleInvocationContext) {
defer wg.Done()
executeHook(moduleCtx, hw, newPayload, hookHandler, group.Timeout, resp, rejected)
}(hook, mCtx)
}

go func() {
wg.Wait()
close(resp)
}()

// Collect responses while lock is still held (goroutines are executing)
return collectHookResponses(resp, rejected)
}()
// Lock is now released - goroutines have completed

return handleHookResponses(executionCtx, hookResponses, payload, metricEngine)
}
Expand Down
2 changes: 2 additions & 0 deletions hooks/hookexecution/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (e *hookExecutor) ExecuteBidderRequestStage(req *openrtb_ext.RequestWrapper

stageName := hooks.StageBidderRequest.String()
executionCtx := e.newContext(stageName)
executionCtx.holdReadLock = true // Enable lock holding for concurrent bidder stage
payload := hookstage.BidderRequestPayload{Request: req, Bidder: bidder}
outcome, _, contexts, reject := executeStage(executionCtx, plan, payload, handler, e.metricEngine)
outcome.Entity = entity(bidder)
Expand Down Expand Up @@ -231,6 +232,7 @@ func (e *hookExecutor) ExecuteRawBidderResponseStage(response *adapters.BidderRe

stageName := hooks.StageRawBidderResponse.String()
executionCtx := e.newContext(stageName)
executionCtx.holdReadLock = true // Enable lock holding for concurrent bidder stage
payload := hookstage.RawBidderResponsePayload{BidderResponse: response, Bidder: bidder}

outcome, payload, contexts, reject := executeStage(executionCtx, plan, payload, handler, e.metricEngine)
Expand Down
Loading