diff --git a/hooks/hookexecution/context.go b/hooks/hookexecution/context.go index 75ad2a82179..934e39d85c3 100644 --- a/hooks/hookexecution/context.go +++ b/hooks/hookexecution/context.go @@ -17,6 +17,10 @@ type executionContext struct { account *config.Account moduleContexts *moduleContexts activityControl privacy.ActivityControl + // holdReadLock indicates whether to hold moduleContexts read lock + // for the duration of hook group execution. This is needed for stages + // that run concurrently (bidder stages) to prevent concurrent map access. + holdReadLock bool } func (ctx executionContext) getModuleContext(moduleName string) hookstage.ModuleInvocationContext { @@ -30,7 +34,32 @@ func (ctx executionContext) getModuleContext(moduleName string) hookstage.Module if ctx.account != nil { cfg, err := ctx.account.Hooks.Modules.ModuleConfig(moduleName) if err != nil { - logger.Warnf("Failed to get account config for %s module: %s", moduleName, err) + logger.Warnf("Failed to get account config for %s module: %v", moduleName, err) + } + + moduleInvocationCtx.AccountID = ctx.accountID + moduleInvocationCtx.AccountConfig = cfg + } + + return moduleInvocationCtx +} + +// getModuleContextLocked reads from moduleContexts without locking. +// IMPORTANT: Caller must hold moduleContexts.RLock. +func (ctx executionContext) getModuleContextLocked(moduleName string) hookstage.ModuleInvocationContext { + moduleInvocationCtx := hookstage.ModuleInvocationContext{Endpoint: ctx.endpoint} + + // Direct map access - caller holds lock + if ctx.moduleContexts != nil && ctx.moduleContexts.ctxs != nil { + if mc, ok := ctx.moduleContexts.ctxs[moduleName]; ok { + moduleInvocationCtx.ModuleContext = mc + } + } + + if ctx.account != nil { + cfg, err := ctx.account.Hooks.Modules.ModuleConfig(moduleName) + if err != nil { + logger.Warnf("Failed to get account config for %s module: %v", moduleName, err) } moduleInvocationCtx.AccountID = ctx.accountID diff --git a/hooks/hookexecution/execution.go b/hooks/hookexecution/execution.go index 57a145dd1c3..68c4a731cf5 100644 --- a/hooks/hookexecution/execution.go +++ b/hooks/hookexecution/execution.go @@ -67,27 +67,48 @@ func executeGroup[H any, P any]( hookHandler hookHandler[H, P], metricEngine metrics.MetricsEngine, ) (GroupOutcome, P, groupModuleContext, *RejectError) { - var wg sync.WaitGroup - rejected := make(chan struct{}) - resp := make(chan hookResponse[P], len(group.Hooks)) - - for _, hook := range group.Hooks { - mCtx := executionCtx.getModuleContext(hook.Module) - mCtx.HookImplCode = hook.Code - newPayload := handleModuleActivities(hook.Code, executionCtx.activityControl, payload, executionCtx.account) - wg.Add(1) - go func(hw hooks.HookWrapper[H], moduleCtx hookstage.ModuleInvocationContext) { - defer wg.Done() - executeHook(moduleCtx, hw, newPayload, hookHandler, group.Timeout, resp, rejected) - }(hook, mCtx) - } + // For concurrent stages (bidder request/response), spawn goroutines + // and collect responses within a scoped lock to prevent concurrent + // map access to moduleContexts + hookResponses := func() []hookResponse[P] { + var wg sync.WaitGroup + rejected := make(chan struct{}) + resp := make(chan hookResponse[P], len(group.Hooks)) + // Hold read lock for the duration that goroutines are spawned and executing + if executionCtx.holdReadLock && executionCtx.moduleContexts != nil { + executionCtx.moduleContexts.RLock() + defer executionCtx.moduleContexts.RUnlock() + } - go func() { - wg.Wait() - close(resp) - }() + for _, hook := range group.Hooks { + var mCtx hookstage.ModuleInvocationContext + + if executionCtx.holdReadLock { + // Lock already held, use lockless accessor + mCtx = executionCtx.getModuleContextLocked(hook.Module) + } else { + // Normal case: acquire and release lock per hook + mCtx = executionCtx.getModuleContext(hook.Module) + } - hookResponses := collectHookResponses(resp, rejected) + mCtx.HookImplCode = hook.Code + newPayload := handleModuleActivities(hook.Code, executionCtx.activityControl, payload, executionCtx.account) + wg.Add(1) + go func(hw hooks.HookWrapper[H], moduleCtx hookstage.ModuleInvocationContext) { + defer wg.Done() + executeHook(moduleCtx, hw, newPayload, hookHandler, group.Timeout, resp, rejected) + }(hook, mCtx) + } + + go func() { + wg.Wait() + close(resp) + }() + + // Collect responses while lock is still held (goroutines are executing) + return collectHookResponses(resp, rejected) + }() + // Lock is now released - goroutines have completed return handleHookResponses(executionCtx, hookResponses, payload, metricEngine) } diff --git a/hooks/hookexecution/executor.go b/hooks/hookexecution/executor.go index 0e5b28fff8e..c11212a7f78 100644 --- a/hooks/hookexecution/executor.go +++ b/hooks/hookexecution/executor.go @@ -203,6 +203,7 @@ func (e *hookExecutor) ExecuteBidderRequestStage(req *openrtb_ext.RequestWrapper stageName := hooks.StageBidderRequest.String() executionCtx := e.newContext(stageName) + executionCtx.holdReadLock = true // Enable lock holding for concurrent bidder stage payload := hookstage.BidderRequestPayload{Request: req, Bidder: bidder} outcome, _, contexts, reject := executeStage(executionCtx, plan, payload, handler, e.metricEngine) outcome.Entity = entity(bidder) @@ -231,6 +232,7 @@ func (e *hookExecutor) ExecuteRawBidderResponseStage(response *adapters.BidderRe stageName := hooks.StageRawBidderResponse.String() executionCtx := e.newContext(stageName) + executionCtx.holdReadLock = true // Enable lock holding for concurrent bidder stage payload := hookstage.RawBidderResponsePayload{BidderResponse: response, Bidder: bidder} outcome, payload, contexts, reject := executeStage(executionCtx, plan, payload, handler, e.metricEngine)