Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions module/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ type Route struct {

// StandardHTTPRouter implements both HTTPRouter and http.Handler interfaces
type StandardHTTPRouter struct {
name string
routes []Route
mu sync.RWMutex
serverDeps []string // Names of HTTP server modules this router depends on
serveMux *http.ServeMux
name string
routes []Route
mu sync.RWMutex
serverDeps []string // Names of HTTP server modules this router depends on
serveMux *http.ServeMux
globalMiddlewares []HTTPMiddleware
}

// NewStandardHTTPRouter creates a new HTTP router
Expand Down Expand Up @@ -138,6 +139,17 @@ func (r *StandardHTTPRouter) AddRouteWithMiddleware(method, path string, handler
}
}

// AddGlobalMiddleware appends a middleware that wraps every request served by
// this router, regardless of which route is matched. Global middlewares are
// applied in the order they are added, before any per-route middlewares.
// This is the correct place to attach cross-cutting concerns such as
// distributed tracing that must observe all traffic.
func (r *StandardHTTPRouter) AddGlobalMiddleware(mw HTTPMiddleware) {
r.mu.Lock()
defer r.mu.Unlock()
r.globalMiddlewares = append(r.globalMiddlewares, mw)
Comment on lines +149 to +150
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddGlobalMiddleware/ServeHTTP introduce new global middleware behavior, but there are existing router tests (module/http_test.go) that don't currently validate that global middlewares run (and run in the expected order) for matched routes and 404s. Adding unit tests for AddGlobalMiddleware + ServeHTTP would help prevent regressions in tracing and other cross-cutting middleware.

Suggested change
defer r.mu.Unlock()
r.globalMiddlewares = append(r.globalMiddlewares, mw)
defer r.mu.Unlock()
r.globalMiddlewares = append(r.globalMiddlewares, mw)
// Rebuild the mux if we've already started so that newly added global
// middlewares take effect for all subsequent requests (hot-add support).
if r.serveMux != nil {
r.rebuildMuxLocked()
}

Copilot uses AI. Check for mistakes.
}

// HasRoute checks if a route with the given method and path already exists
func (r *StandardHTTPRouter) HasRoute(method, path string) bool {
r.mu.RLock()
Expand All @@ -150,16 +162,29 @@ func (r *StandardHTTPRouter) HasRoute(method, path string) bool {
return false
}

// ServeHTTP implements the http.Handler interface
// ServeHTTP implements the http.Handler interface.
// Global middlewares (e.g. OTEL tracing) are applied around the entire mux so
// every request — including health-checks and pipeline-triggered routes — is
// instrumented, regardless of how the route was registered.
func (r *StandardHTTPRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.mu.RLock()
defer r.mu.RUnlock()

Comment on lines 169 to 172
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServeHTTP holds r.mu.RLock for the entire request (due to the deferred RUnlock) and then calls middleware Process() and handler.ServeHTTP while the lock is held. This risks self-deadlock if any handler/middleware tries to mutate the router (e.g., AddRouteWithMiddleware/AddGlobalMiddleware) and also blocks hot-add route updates until the request completes. Consider copying serveMux and globalMiddlewares under the lock, releasing the lock, then building the middleware chain and serving the request without holding r.mu.

Copilot uses AI. Check for mistakes.
var base http.Handler
if r.serveMux != nil {
r.serveMux.ServeHTTP(w, req)
base = r.serveMux
} else {
http.NotFound(w, req)
base = http.NotFoundHandler()
}

// Wrap with global middlewares in reverse registration order so that the
// first-added middleware is outermost (executes first).
handler := base
for i := len(r.globalMiddlewares) - 1; i >= 0; i-- {
handler = r.globalMiddlewares[i].Process(handler)
}

handler.ServeHTTP(w, req)
}

// Start compiles all registered routes into the internal ServeMux.
Expand Down
1 change: 1 addition & 0 deletions plugins/observability/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func New() *ObservabilityPlugin {
"http.middleware.otel",
},
WiringHooks: []string{
"observability.otel-middleware",
"observability.health-endpoints",
"observability.metrics-endpoint",
"observability.log-endpoint",
Expand Down
11 changes: 7 additions & 4 deletions plugins/observability/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestManifestValidation(t *testing.T) {
if m.Name != "observability" {
t.Errorf("manifest Name = %q, want %q", m.Name, "observability")
}
if len(m.ModuleTypes) != 5 {
t.Errorf("manifest ModuleTypes count = %d, want 5", len(m.ModuleTypes))
if len(m.ModuleTypes) != 6 {
t.Errorf("manifest ModuleTypes count = %d, want 6", len(m.ModuleTypes))
}
}

Expand Down Expand Up @@ -79,6 +79,7 @@ func TestModuleFactories(t *testing.T) {
"log.collector",
"observability.otel",
"openapi.generator",
"http.middleware.otel",
}

if len(factories) != len(expectedTypes) {
Expand Down Expand Up @@ -166,6 +167,7 @@ func TestModuleSchemas(t *testing.T) {
"log.collector": false,
"observability.otel": false,
"openapi.generator": false,
"http.middleware.otel": false,
}

if len(schemas) != len(expectedTypes) {
Expand Down Expand Up @@ -235,11 +237,12 @@ func TestWiringHooks(t *testing.T) {
p := New()
hooks := p.WiringHooks()

if len(hooks) != 4 {
t.Fatalf("WiringHooks() count = %d, want 4", len(hooks))
if len(hooks) != 5 {
t.Fatalf("WiringHooks() count = %d, want 5", len(hooks))
}

expectedNames := map[string]bool{
"observability.otel-middleware": false,
"observability.health-endpoints": false,
"observability.metrics-endpoint": false,
"observability.log-endpoint": false,
Expand Down
28 changes: 28 additions & 0 deletions plugins/observability/wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
// modules to the HTTP router.
func wiringHooks() []plugin.WiringHook {
return []plugin.WiringHook{
{
// Run at priority 100 (highest) so OTEL wraps all other middleware
// and every request — including health, metrics, and pipeline routes
// registered later — is captured in a trace span.
Name: "observability.otel-middleware",
Priority: 100,
Hook: wireOTelMiddleware,
},
{
Name: "observability.health-endpoints",
Priority: 50,
Expand Down Expand Up @@ -132,6 +140,26 @@ func wireLogEndpoint(app modular.Application, _ *config.WorkflowConfig) error {
return nil
}

// wireOTelMiddleware registers any OTelMiddleware instances as global middleware
// on every available StandardHTTPRouter so that all inbound HTTP requests are
// wrapped in an OpenTelemetry trace span.
func wireOTelMiddleware(app modular.Application, _ *config.WorkflowConfig) error {
for _, svc := range app.SvcRegistry() {
otelMw, ok := svc.(*module.OTelMiddleware)
if !ok {
continue
}
for _, routerSvc := range app.SvcRegistry() {
router, ok := routerSvc.(*module.StandardHTTPRouter)
if !ok {
continue
}
router.AddGlobalMiddleware(otelMw)
}
}
Comment on lines +147 to +159
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wireOTelMiddleware currently does a nested iteration over app.SvcRegistry() (OTelMiddleware × routers). Even if the registry is usually small, this is avoidable and makes the hook harder to extend. Consider collecting the routers once (and possibly the OTel middlewares once) and then wiring them in a single pass.

Suggested change
for _, svc := range app.SvcRegistry() {
otelMw, ok := svc.(*module.OTelMiddleware)
if !ok {
continue
}
for _, routerSvc := range app.SvcRegistry() {
router, ok := routerSvc.(*module.StandardHTTPRouter)
if !ok {
continue
}
router.AddGlobalMiddleware(otelMw)
}
}
var (
routers []*module.StandardHTTPRouter
middlewares []*module.OTelMiddleware
)
// Collect all routers and OTel middleware instances in a single pass.
for _, svc := range app.SvcRegistry() {
if router, ok := svc.(*module.StandardHTTPRouter); ok {
routers = append(routers, router)
continue
}
if otelMw, ok := svc.(*module.OTelMiddleware); ok {
middlewares = append(middlewares, otelMw)
}
}
// Register each OTel middleware as global middleware on every router.
for _, router := range routers {
for _, otelMw := range middlewares {
router.AddGlobalMiddleware(otelMw)
}
}

Copilot uses AI. Check for mistakes.
return nil
}

// wireOpenAPIEndpoints builds OpenAPI specs and registers the JSON/YAML endpoints
// on any available router.
func wireOpenAPIEndpoints(app modular.Application, cfg *config.WorkflowConfig) error {
Expand Down
Loading