diff --git a/examples/reverse-proxy/config.yaml b/examples/reverse-proxy/config.yaml index 785805dd..c8ac8d30 100644 --- a/examples/reverse-proxy/config.yaml +++ b/examples/reverse-proxy/config.yaml @@ -28,6 +28,14 @@ reverseproxy: # Custom Transformer Backends profile-backend: "http://localhost:9013" analytics-backend: "http://localhost:9014" + + # Pipeline Strategy Backends + conversations-backend: "http://localhost:9015" + followup-backend: "http://localhost:9016" + + # Fan-Out-Merge Strategy Backends + tickets-backend: "http://localhost:9017" + assignments-backend: "http://localhost:9018" default_backend: "global-default" tenant_id_header: "X-Tenant-ID" @@ -100,6 +108,29 @@ reverseproxy: - "analytics-backend" strategy: "merge" # Strategy is set, but transformer overrides merge behavior + # STRATEGY 4: PIPELINE + # Executes backends sequentially where each stage's response informs the next request. + # Use case: A list page showing queued conversations. Backend A returns conversation + # details, those IDs are fed into Backend B to fetch follow-up information, + # and the responses are merged into a unified view. + "/api/composite/pipeline": + pattern: "/api/composite/pipeline" + backends: + - "conversations-backend" + - "followup-backend" + strategy: "pipeline" + + # STRATEGY 5: FAN-OUT-MERGE + # Executes all backends in parallel, then merges responses by matching IDs. + # Use case: A ticket dashboard where tickets come from one service and + # assignment/priority data comes from another. The merger correlates by ticket ID. + "/api/composite/fanout-merge": + pattern: "/api/composite/fanout-merge" + backends: + - "tickets-backend" + - "assignments-backend" + strategy: "fan-out-merge" + # ChiMux router configuration chimux: basepath: "" diff --git a/examples/reverse-proxy/main.go b/examples/reverse-proxy/main.go index 6285b495..3f938b44 100644 --- a/examples/reverse-proxy/main.go +++ b/examples/reverse-proxy/main.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "os" + "strings" "time" "github.com/CrisisTextLine/modular" @@ -168,6 +170,118 @@ func main() { return resp, nil }) + // PIPELINE STRATEGY EXAMPLE: + // This demonstrates chained backend requests where backend B's request is constructed + // using data from backend A's response. This is the map/reduce pattern. + // + // Use case: A list page shows queued conversations. Backend A returns conversation details, + // then those conversation IDs are fed into Backend B to fetch follow-up information. + // The responses are then merged to produce a unified view. + proxyModule.SetPipelineConfig("/api/composite/pipeline", reverseproxy.PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + if nextBackendID == "followup-backend" { + // Extract conversation IDs from the conversations backend response + var convResp struct { + Conversations []struct { + ID string `json:"id"` + } `json:"conversations"` + } + if body, ok := previousResponses["conversations-backend"]; ok { + if err := json.Unmarshal(body, &convResp); err != nil { + return nil, fmt.Errorf("failed to parse conversations: %w", err) + } + } + + // Build the follow-up request with those IDs + ids := make([]string, 0, len(convResp.Conversations)) + for _, c := range convResp.Conversations { + ids = append(ids, c.ID) + } + idsParam := "" + for i, id := range ids { + if i > 0 { + idsParam += "," + } + idsParam += id + } + + url := "http://localhost:9016/followups?ids=" + idsParam + return http.NewRequestWithContext(ctx, "GET", url, nil) + } + return nil, fmt.Errorf("unknown pipeline backend: %s", nextBackendID) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + // Parse conversations + var convResp struct { + Conversations []map[string]interface{} `json:"conversations"` + } + if body, ok := allResponses["conversations-backend"]; ok { + json.Unmarshal(body, &convResp) + } + + // Parse follow-ups + var fuResp struct { + FollowUps map[string]interface{} `json:"follow_ups"` + } + if body, ok := allResponses["followup-backend"]; ok { + json.Unmarshal(body, &fuResp) + } + + // Merge follow-up data into each conversation + for i, conv := range convResp.Conversations { + if id, ok := conv["id"].(string); ok { + if fu, exists := fuResp.FollowUps[id]; exists { + convResp.Conversations[i]["follow_up"] = fu + } + } + } + + return reverseproxy.MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "conversations": convResp.Conversations, + "strategy": "pipeline", + }) + }, + }) + + // FAN-OUT-MERGE STRATEGY EXAMPLE: + // This demonstrates parallel requests to multiple backends with custom ID-based + // response merging. Both backends are called simultaneously, then their responses + // are correlated by matching IDs. + // + // Use case: Show a ticket dashboard where tickets come from one service and + // priority/assignment data comes from another. The merger matches by ticket ID. + proxyModule.SetFanOutMerger("/api/composite/fanout-merge", func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Parse tickets from the tickets backend + var ticketsResp struct { + Tickets []map[string]interface{} `json:"tickets"` + } + if body, ok := responses["tickets-backend"]; ok { + json.Unmarshal(body, &ticketsResp) + } + + // Parse assignments from the assignments backend + var assignResp struct { + Assignments map[string]interface{} `json:"assignments"` + } + if body, ok := responses["assignments-backend"]; ok { + json.Unmarshal(body, &assignResp) + } + + // Merge assignments into tickets by ID + for i, ticket := range ticketsResp.Tickets { + if id, ok := ticket["id"].(string); ok { + if assignment, exists := assignResp.Assignments[id]; exists { + ticketsResp.Tickets[i]["assignment"] = assignment + } + } + } + + return reverseproxy.MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "tickets": ticketsResp.Tickets, + "strategy": "fan-out-merge", + }) + }) + app.RegisterModule(proxyModule) app.RegisterModule(httpserver.NewHTTPServerModule()) @@ -403,4 +517,89 @@ func startMockBackends() { fmt.Printf("Backend server error on :9014: %v\n", err) } }() + + // ======================================== + // Backends for PIPELINE strategy demonstration + // ======================================== + + // Conversations backend (port 9015) - Returns queued conversations + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"conversations":[{"id":"conv-1","status":"queued","counselor":"Alice","created_at":"2024-01-01T10:00:00Z"},{"id":"conv-2","status":"queued","counselor":"Bob","created_at":"2024-01-01T10:05:00Z"},{"id":"conv-3","status":"active","counselor":"Carol","created_at":"2024-01-01T10:10:00Z"}]}`) + }) + fmt.Println("Starting conversations-backend (pipeline demo) on :9015") + if err := http.ListenAndServe(":9015", mux); err != nil { //nolint:gosec + fmt.Printf("Backend server error on :9015: %v\n", err) + } + }() + + // Follow-up backend (port 9016) - Returns follow-up details for given conversation IDs + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + idsParam := r.URL.Query().Get("ids") + followUps := make(map[string]interface{}) + if idsParam != "" { + for _, id := range strings.Split(idsParam, ",") { + switch id { + case "conv-1": + followUps[id] = map[string]interface{}{ + "is_follow_up": true, + "original_conv_id": "conv-50", + "follow_up_count": 2, + } + case "conv-3": + followUps[id] = map[string]interface{}{ + "is_follow_up": true, + "original_conv_id": "conv-90", + "follow_up_count": 1, + } + } + } + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + resp, _ := json.Marshal(map[string]interface{}{"follow_ups": followUps}) + w.Write(resp) //nolint:errcheck + }) + fmt.Println("Starting followup-backend (pipeline demo) on :9016") + if err := http.ListenAndServe(":9016", mux); err != nil { //nolint:gosec + fmt.Printf("Backend server error on :9016: %v\n", err) + } + }() + + // ======================================== + // Backends for FAN-OUT-MERGE strategy demonstration + // ======================================== + + // Tickets backend (port 9017) - Returns support tickets + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"tickets":[{"id":"ticket-1","subject":"Login issue","status":"open","created":"2024-01-15"},{"id":"ticket-2","subject":"Billing question","status":"open","created":"2024-01-16"},{"id":"ticket-3","subject":"Feature request","status":"pending","created":"2024-01-17"}]}`) + }) + fmt.Println("Starting tickets-backend (fan-out-merge demo) on :9017") + if err := http.ListenAndServe(":9017", mux); err != nil { //nolint:gosec + fmt.Printf("Backend server error on :9017: %v\n", err) + } + }() + + // Assignments backend (port 9018) - Returns ticket assignments and priorities + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"assignments":{"ticket-1":{"assignee":"Alice","priority":"high","sla_deadline":"2024-01-16T12:00:00Z"},"ticket-3":{"assignee":"Bob","priority":"low","sla_deadline":"2024-01-20T12:00:00Z"}}}`) + }) + fmt.Println("Starting assignments-backend (fan-out-merge demo) on :9018") + if err := http.ListenAndServe(":9018", mux); err != nil { //nolint:gosec + fmt.Printf("Backend server error on :9018: %v\n", err) + } + }() } diff --git a/modules/reverseproxy/README.md b/modules/reverseproxy/README.md index af8858e6..16ab9512 100644 --- a/modules/reverseproxy/README.md +++ b/modules/reverseproxy/README.md @@ -24,6 +24,9 @@ The Reverse Proxy module functions as a versatile API gateway that can route req * **Tenant Awareness**: Support for multi-tenant environments with tenant-specific routing * **Pattern-Based Routing**: Direct requests to specific backends based on URL patterns * **Custom Endpoint Mapping**: Define flexible mappings from frontend endpoints to backend services +* **Pipeline Strategy**: Chain backend requests where each stage's response informs the next (map/reduce) +* **Fan-Out-Merge Strategy**: Parallel backend requests with custom ID-based response merging +* **Empty Response Policies**: Configurable handling of empty backend responses (allow, skip, or fail) * **Health Checking**: Continuous monitoring of backend service availability with DNS resolution and HTTP checks * **Circuit Breaker**: Automatic failure detection and recovery with configurable thresholds * **Response Caching**: Performance optimization with TTL-based caching @@ -366,6 +369,139 @@ The module supports several advanced features: 11. **Connection Pooling**: Advanced connection pool management with configurable limits 12. **Queue Management**: Request queueing with configurable sizes and timeouts 13. **Error Handling**: Comprehensive error handling with custom pages and retry logic +14. **Pipeline Strategy**: Chain backend requests where each stage's response informs the next request (map/reduce pattern) +15. **Fan-Out-Merge Strategy**: Parallel backend requests with custom ID-based response merging +16. **Empty Response Policies**: Configurable handling of empty backend responses (allow, skip, or fail) + +### Composite Route Strategies + +Composite routes allow combining responses from multiple backend services. The module supports five strategies: + +#### first-success +Tries backends sequentially until one succeeds. Use case: High-availability setup with primary and fallback backends. + +```yaml +composite_routes: + "/api/data": + pattern: "/api/data" + backends: ["primary-backend", "fallback-backend"] + strategy: "first-success" +``` + +#### merge +Executes all backend requests in parallel and merges JSON responses by backend ID. + +```yaml +composite_routes: + "/api/user/profile": + pattern: "/api/user/profile" + backends: ["user-backend", "analytics-backend"] + strategy: "merge" +``` + +#### sequential +Executes requests one at a time, returning the last successful response. + +```yaml +composite_routes: + "/api/process": + pattern: "/api/process" + backends: ["auth-backend", "processing-backend"] + strategy: "sequential" +``` + +#### pipeline +Executes backends sequentially where each stage's response can inform the next stage's request. Requires programmatic configuration via `SetPipelineConfig()`. + +Use case: A list page shows queued conversations. Backend A returns conversation details, those IDs are fed into Backend B to fetch follow-up information, and the responses are merged. + +```yaml +composite_routes: + "/api/conversations": + pattern: "/api/conversations" + backends: ["conversations-backend", "followup-backend"] + strategy: "pipeline" + empty_policy: "skip-empty" # Optional: allow-empty, skip-empty, fail-on-empty +``` + +```go +proxyModule.SetPipelineConfig("/api/conversations", reverseproxy.PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, + previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + // Extract IDs from previous response and build next request + var convResp struct { + Conversations []struct{ ID string `json:"id"` } `json:"conversations"` + } + json.Unmarshal(previousResponses["conversations-backend"], &convResp) + + ids := []string{} + for _, c := range convResp.Conversations { + ids = append(ids, c.ID) + } + url := "http://followup-service/followups?ids=" + strings.Join(ids, ",") + return http.NewRequestWithContext(ctx, "GET", url, nil) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, + allResponses map[string][]byte) (*http.Response, error) { + // Merge follow-up data into conversations + // ... custom merging logic ... + return reverseproxy.MakeJSONResponse(http.StatusOK, mergedResult) + }, +}) +``` + +#### fan-out-merge +Executes all backends in parallel (like merge), then applies a custom merger function for ID-based matching, filtering, or complex data correlation. Requires programmatic configuration via `SetFanOutMerger()`. + +Use case: A ticket dashboard where tickets come from one service and priority/assignment data comes from another. The merger matches by ticket ID. + +```yaml +composite_routes: + "/api/tickets": + pattern: "/api/tickets" + backends: ["tickets-backend", "assignments-backend"] + strategy: "fan-out-merge" + empty_policy: "allow-empty" # Optional +``` + +```go +proxyModule.SetFanOutMerger("/api/tickets", func(ctx context.Context, + originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Parse both responses + var ticketsResp struct { Tickets []map[string]interface{} `json:"tickets"` } + json.Unmarshal(responses["tickets-backend"], &ticketsResp) + + var assignResp struct { Assignments map[string]interface{} `json:"assignments"` } + json.Unmarshal(responses["assignments-backend"], &assignResp) + + // Merge by ID + for i, ticket := range ticketsResp.Tickets { + if id, ok := ticket["id"].(string); ok { + if assignment, exists := assignResp.Assignments[id]; exists { + ticketsResp.Tickets[i]["assignment"] = assignment + } + } + } + return reverseproxy.MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "tickets": ticketsResp.Tickets, + }) +}) +``` + +#### Empty Response Policies + +For `pipeline` and `fan-out-merge` strategies, you can control how empty backend responses are handled: + +| Policy | Description | +|--------|-------------| +| `allow-empty` | Include empty responses in the result set (default) | +| `skip-empty` | Silently drop empty responses from the result | +| `fail-on-empty` | Fail the entire request if any backend returns empty | + +Set via config (`empty_policy` field) or programmatically: +```go +proxyModule.SetEmptyResponsePolicy("/api/route", reverseproxy.EmptyResponseSkip) +``` ### Debug Endpoints diff --git a/modules/reverseproxy/bdd_composite_pipeline_test.go b/modules/reverseproxy/bdd_composite_pipeline_test.go new file mode 100644 index 00000000..0334cd6f --- /dev/null +++ b/modules/reverseproxy/bdd_composite_pipeline_test.go @@ -0,0 +1,720 @@ +package reverseproxy + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "time" +) + +// ============================================================================ +// Pipeline Strategy BDD Steps +// ============================================================================ + +func (ctx *ReverseProxyBDDTestContext) iHaveAPipelineCompositeRouteWithTwoBackends() error { + ctx.resetContext() + + // Backend 1: returns a list of items with IDs + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": []map[string]interface{}{ + {"id": "item-1", "name": "First Item"}, + {"id": "item-2", "name": "Second Item"}, + }, + }) + })) + ctx.testServers = append(ctx.testServers, backend1) + + // Backend 2: returns details for given IDs + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + idsParam := r.URL.Query().Get("ids") + details := make(map[string]interface{}) + if idsParam != "" { + for _, id := range strings.Split(idsParam, ",") { + if id == "item-1" { + details[id] = map[string]interface{}{"category": "A", "priority": "high"} + } + if id == "item-2" { + details[id] = map[string]interface{}{"category": "B", "priority": "low"} + } + } + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "details": details, + }) + })) + ctx.testServers = append(ctx.testServers, backend2) + + ctx.config = &ReverseProxyConfig{ + DefaultBackend: "items-backend", + BackendServices: map[string]string{ + "items-backend": backend1.URL, + "details-backend": backend2.URL, + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/pipeline": { + Pattern: "/api/pipeline", + Backends: []string{"items-backend", "details-backend"}, + Strategy: "pipeline", + }, + }, + HealthCheck: HealthCheckConfig{ + Enabled: false, + Interval: 30 * time.Second, + }, + CircuitBreakerConfig: CircuitBreakerConfig{ + Enabled: false, + }, + } + + // Capture backend2 URL for use in the closure + backend2URL := backend2.URL + + if err := ctx.setupApplicationWithConfig(); err != nil { + return err + } + + // Set pipeline config on the module AFTER setup creates it + ctx.module.SetPipelineConfig("/api/pipeline", PipelineConfig{ + RequestBuilder: func(rctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + if nextBackendID == "details-backend" { + var itemsResp struct { + Items []struct { + ID string `json:"id"` + } `json:"items"` + } + if body, ok := previousResponses["items-backend"]; ok { + if err := json.Unmarshal(body, &itemsResp); err != nil { + return nil, err + } + } + ids := make([]string, 0, len(itemsResp.Items)) + for _, item := range itemsResp.Items { + ids = append(ids, item.ID) + } + url := backend2URL + "/details?ids=" + strings.Join(ids, ",") + return http.NewRequestWithContext(rctx, "GET", url, nil) + } + return nil, fmt.Errorf("unknown backend: %s", nextBackendID) + }, + ResponseMerger: func(rctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + var itemsResp struct { + Items []map[string]interface{} `json:"items"` + } + json.Unmarshal(allResponses["items-backend"], &itemsResp) + + var detailsResp struct { + Details map[string]interface{} `json:"details"` + } + json.Unmarshal(allResponses["details-backend"], &detailsResp) + + for i, item := range itemsResp.Items { + if id, ok := item["id"].(string); ok { + if detail, exists := detailsResp.Details[id]; exists { + itemsResp.Items[i]["detail"] = detail + } + } + } + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "items": itemsResp.Items, + }) + }, + }) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) iSendARequestToThePipelineRoute() error { + if ctx.module == nil { + return fmt.Errorf("module not initialized") + } + + backends := []*Backend{ + {ID: "items-backend", URL: ctx.module.config.BackendServices["items-backend"], Client: http.DefaultClient}, + {ID: "details-backend", URL: ctx.module.config.BackendServices["details-backend"], Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + + if pipelineCfg, ok := ctx.module.pipelineConfigs["/api/pipeline"]; ok { + handler.SetPipelineConfig(pipelineCfg) + } + + req := httptest.NewRequest("GET", "/api/pipeline", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + ctx.lastResponse = resp + body, _ := io.ReadAll(resp.Body) + ctx.lastResponseBody = body + resp.Body = io.NopCloser(strings.NewReader(string(body))) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theFirstBackendShouldBeCalledWithTheOriginalRequest() error { + if ctx.lastResponse == nil { + return fmt.Errorf("no response received") + } + if ctx.lastResponse.StatusCode != http.StatusOK { + return fmt.Errorf("expected status 200, got %d", ctx.lastResponse.StatusCode) + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theSecondBackendShouldReceiveDataDerivedFromTheFirstResponse() error { + if ctx.lastResponseBody == nil { + return fmt.Errorf("no response body") + } + var result map[string]interface{} + if err := json.Unmarshal(ctx.lastResponseBody, &result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + items, ok := result["items"].([]interface{}) + if !ok { + return fmt.Errorf("expected items array in response") + } + if len(items) == 0 { + return fmt.Errorf("expected at least one item") + } + // Check that items have detail data (proving second backend was called with IDs from first) + item1 := items[0].(map[string]interface{}) + if _, hasDetail := item1["detail"]; !hasDetail { + return fmt.Errorf("item should have detail from second backend") + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theFinalResponseShouldContainMergedDataFromAllStages() error { + var result map[string]interface{} + if err := json.Unmarshal(ctx.lastResponseBody, &result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + items := result["items"].([]interface{}) + if len(items) != 2 { + return fmt.Errorf("expected 2 items, got %d", len(items)) + } + + // Verify item-1 has detail with category A + item1 := items[0].(map[string]interface{}) + detail1 := item1["detail"].(map[string]interface{}) + if detail1["category"] != "A" { + return fmt.Errorf("item-1 should have category A") + } + return nil +} + +// ============================================================================ +// Fan-Out-Merge Strategy BDD Steps +// ============================================================================ + +func (ctx *ReverseProxyBDDTestContext) iHaveAFanOutMergeCompositeRouteWithTwoBackends() error { + ctx.resetContext() + + // Backend A: returns items + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "records": []map[string]interface{}{ + {"id": "r1", "title": "Record One"}, + {"id": "r2", "title": "Record Two"}, + {"id": "r3", "title": "Record Three"}, + }, + }) + })) + ctx.testServers = append(ctx.testServers, backendA) + + // Backend B: returns tags keyed by ID + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "tags": map[string]interface{}{ + "r1": []string{"urgent", "new"}, + "r3": []string{"follow-up"}, + }, + }) + })) + ctx.testServers = append(ctx.testServers, backendB) + + ctx.config = &ReverseProxyConfig{ + DefaultBackend: "records-backend", + BackendServices: map[string]string{ + "records-backend": backendA.URL, + "tags-backend": backendB.URL, + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/fanout": { + Pattern: "/api/fanout", + Backends: []string{"records-backend", "tags-backend"}, + Strategy: "fan-out-merge", + }, + }, + HealthCheck: HealthCheckConfig{ + Enabled: false, + Interval: 30 * time.Second, + }, + CircuitBreakerConfig: CircuitBreakerConfig{ + Enabled: false, + }, + } + + if err := ctx.setupApplicationWithConfig(); err != nil { + return err + } + + // Set fan-out merger AFTER setup creates the module + ctx.module.SetFanOutMerger("/api/fanout", func(rctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + var recordsResp struct { + Records []map[string]interface{} `json:"records"` + } + json.Unmarshal(responses["records-backend"], &recordsResp) + + var tagsResp struct { + Tags map[string]interface{} `json:"tags"` + } + json.Unmarshal(responses["tags-backend"], &tagsResp) + + for i, record := range recordsResp.Records { + if id, ok := record["id"].(string); ok { + if tags, exists := tagsResp.Tags[id]; exists { + recordsResp.Records[i]["tags"] = tags + } + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "records": recordsResp.Records, + }) + }) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) iSendARequestToTheFanOutMergeRoute() error { + if ctx.module == nil { + return fmt.Errorf("module not initialized") + } + + backends := []*Backend{ + {ID: "records-backend", URL: ctx.module.config.BackendServices["records-backend"], Client: http.DefaultClient}, + {ID: "tags-backend", URL: ctx.module.config.BackendServices["tags-backend"], Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + + if merger, ok := ctx.module.fanOutMergers["/api/fanout"]; ok { + handler.SetFanOutMerger(merger) + } + + req := httptest.NewRequest("GET", "/api/fanout", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + ctx.lastResponse = resp + body, _ := io.ReadAll(resp.Body) + ctx.lastResponseBody = body + resp.Body = io.NopCloser(strings.NewReader(string(body))) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) bothBackendsShouldBeCalledInParallel() error { + if ctx.lastResponse == nil { + return fmt.Errorf("no response received") + } + if ctx.lastResponse.StatusCode != http.StatusOK { + return fmt.Errorf("expected status 200, got %d", ctx.lastResponse.StatusCode) + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theResponsesShouldBeMergedByMatchingIDs() error { + var result map[string]interface{} + if err := json.Unmarshal(ctx.lastResponseBody, &result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + records, ok := result["records"].([]interface{}) + if !ok { + return fmt.Errorf("expected records array") + } + if len(records) != 3 { + return fmt.Errorf("expected 3 records, got %d", len(records)) + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) itemsWithMatchingAncillaryDataShouldBeEnriched() error { + var result map[string]interface{} + json.Unmarshal(ctx.lastResponseBody, &result) + + records := result["records"].([]interface{}) + + // r1 should have tags + r1 := records[0].(map[string]interface{}) + if _, hasTags := r1["tags"]; !hasTags { + return fmt.Errorf("r1 should have tags") + } + + // r2 should NOT have tags + r2 := records[1].(map[string]interface{}) + if _, hasTags := r2["tags"]; hasTags { + return fmt.Errorf("r2 should NOT have tags") + } + + // r3 should have tags + r3 := records[2].(map[string]interface{}) + if _, hasTags := r3["tags"]; !hasTags { + return fmt.Errorf("r3 should have tags") + } + + return nil +} + +// ============================================================================ +// Empty Response Policy BDD Steps +// ============================================================================ + +func (ctx *ReverseProxyBDDTestContext) iHaveAPipelineRouteWithSkipEmptyPolicy() error { + ctx.resetContext() + + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"from-stage1","value":42}`)) + })) + ctx.testServers = append(ctx.testServers, backend1) + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) // Empty response + })) + ctx.testServers = append(ctx.testServers, backend2) + + // Capture URL for closure + backend2URL := backend2.URL + + ctx.config = &ReverseProxyConfig{ + DefaultBackend: "data-backend", + BackendServices: map[string]string{ + "data-backend": backend1.URL, + "empty-backend": backend2.URL, + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/skip-empty": { + Pattern: "/api/skip-empty", + Backends: []string{"data-backend", "empty-backend"}, + Strategy: "pipeline", + EmptyPolicy: "skip-empty", + }, + }, + HealthCheck: HealthCheckConfig{Enabled: false, Interval: 30 * time.Second}, + CircuitBreakerConfig: CircuitBreakerConfig{Enabled: false}, + } + + if err := ctx.setupApplicationWithConfig(); err != nil { + return err + } + + // Set pipeline config and empty policy AFTER setup + ctx.module.SetPipelineConfig("/api/skip-empty", PipelineConfig{ + RequestBuilder: func(rctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(rctx, "GET", backend2URL+"/test", nil) + }, + }) + ctx.module.SetEmptyResponsePolicy("/api/skip-empty", EmptyResponseSkip) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) iSendARequestAndABackendReturnsAnEmptyResponse() error { + if ctx.module == nil { + return fmt.Errorf("module not initialized") + } + + var strategy CompositeStrategy + var pattern string + for p, route := range ctx.module.config.CompositeRoutes { + strategy = CompositeStrategy(route.Strategy) + pattern = p + break + } + + var backends []*Backend + for _, name := range ctx.module.config.CompositeRoutes[pattern].Backends { + backends = append(backends, &Backend{ + ID: name, + URL: ctx.module.config.BackendServices[name], + Client: http.DefaultClient, + }) + } + + handler := NewCompositeHandler(backends, strategy, 10*time.Second) + + emptyPolicy := EmptyResponsePolicy(ctx.module.config.CompositeRoutes[pattern].EmptyPolicy) + if policy, ok := ctx.module.emptyResponsePolicies[pattern]; ok { + emptyPolicy = policy + } + handler.SetEmptyResponsePolicy(emptyPolicy) + + if pipelineCfg, ok := ctx.module.pipelineConfigs[pattern]; ok { + handler.SetPipelineConfig(pipelineCfg) + } + if merger, ok := ctx.module.fanOutMergers[pattern]; ok { + handler.SetFanOutMerger(merger) + } + + req := httptest.NewRequest("GET", pattern, nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + ctx.lastResponse = resp + body, _ := io.ReadAll(resp.Body) + ctx.lastResponseBody = body + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theEmptyResponseShouldBeExcludedFromTheResult() error { + var result map[string]interface{} + if err := json.Unmarshal(ctx.lastResponseBody, &result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + + // "empty-backend" should not be in the response + if _, found := result["empty-backend"]; found { + return fmt.Errorf("empty backend response should be excluded") + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theNonEmptyResponsesShouldStillBePresent() error { + var result map[string]interface{} + json.Unmarshal(ctx.lastResponseBody, &result) + + if _, found := result["data-backend"]; !found { + return fmt.Errorf("data-backend response should be present") + } + return nil +} + +func (ctx *ReverseProxyBDDTestContext) iHaveAFanOutMergeRouteWithFailOnEmptyPolicy() error { + ctx.resetContext() + + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"ok"}`)) + })) + ctx.testServers = append(ctx.testServers, backendA) + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(``)) // Completely empty + })) + ctx.testServers = append(ctx.testServers, backendB) + + ctx.config = &ReverseProxyConfig{ + DefaultBackend: "ok-backend", + BackendServices: map[string]string{ + "ok-backend": backendA.URL, + "empty-backend": backendB.URL, + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/fail-empty": { + Pattern: "/api/fail-empty", + Backends: []string{"ok-backend", "empty-backend"}, + Strategy: "fan-out-merge", + EmptyPolicy: "fail-on-empty", + }, + }, + HealthCheck: HealthCheckConfig{Enabled: false, Interval: 30 * time.Second}, + CircuitBreakerConfig: CircuitBreakerConfig{Enabled: false}, + } + + if err := ctx.setupApplicationWithConfig(); err != nil { + return err + } + + // Set merger and policy AFTER setup + ctx.module.SetFanOutMerger("/api/fail-empty", func(rctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + return MakeJSONResponse(http.StatusOK, responses) + }) + ctx.module.SetEmptyResponsePolicy("/api/fail-empty", EmptyResponseFail) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) theRequestShouldFailWithABadGatewayError() error { + if ctx.lastResponse == nil { + return fmt.Errorf("no response received") + } + if ctx.lastResponse.StatusCode != http.StatusBadGateway { + return fmt.Errorf("expected status 502, got %d", ctx.lastResponse.StatusCode) + } + return nil +} + +// ============================================================================ +// Pipeline Filter BDD Steps +// ============================================================================ + +func (ctx *ReverseProxyBDDTestContext) iHaveAPipelineRouteThatFiltersByAncillaryBackendData() error { + ctx.resetContext() + + // Backend A: returns all conversations + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "conversations": []map[string]interface{}{ + {"id": "c1", "title": "Conv 1", "status": "queued"}, + {"id": "c2", "title": "Conv 2", "status": "queued"}, + {"id": "c3", "title": "Conv 3", "status": "active"}, + {"id": "c4", "title": "Conv 4", "status": "queued"}, + }, + }) + })) + ctx.testServers = append(ctx.testServers, backendA) + + // Backend B: returns which conversations are flagged + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "flagged_ids": []string{"c1", "c4"}, + }) + })) + ctx.testServers = append(ctx.testServers, backendB) + + // Capture URL for closure + backendBURL := backendB.URL + + ctx.config = &ReverseProxyConfig{ + DefaultBackend: "conv-backend", + BackendServices: map[string]string{ + "conv-backend": backendA.URL, + "flags-backend": backendB.URL, + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/filter": { + Pattern: "/api/filter", + Backends: []string{"conv-backend", "flags-backend"}, + Strategy: "pipeline", + }, + }, + HealthCheck: HealthCheckConfig{Enabled: false, Interval: 30 * time.Second}, + CircuitBreakerConfig: CircuitBreakerConfig{Enabled: false}, + } + + if err := ctx.setupApplicationWithConfig(); err != nil { + return err + } + + // Set pipeline config AFTER setup + ctx.module.SetPipelineConfig("/api/filter", PipelineConfig{ + RequestBuilder: func(rctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(rctx, "GET", backendBURL+"/flags", nil) + }, + ResponseMerger: func(rctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + var convResp struct { + Conversations []map[string]interface{} `json:"conversations"` + } + json.Unmarshal(allResponses["conv-backend"], &convResp) + + var flagsResp struct { + FlaggedIDs []string `json:"flagged_ids"` + } + json.Unmarshal(allResponses["flags-backend"], &flagsResp) + + flagSet := make(map[string]bool) + for _, id := range flagsResp.FlaggedIDs { + flagSet[id] = true + } + + var filtered []map[string]interface{} + for _, conv := range convResp.Conversations { + if id, ok := conv["id"].(string); ok && flagSet[id] { + conv["flagged"] = true + filtered = append(filtered, conv) + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "filtered_conversations": filtered, + "total_filtered": len(filtered), + }) + }, + }) + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) iSendARequestToFetchFilteredResults() error { + if ctx.module == nil { + return fmt.Errorf("module not initialized") + } + + backends := []*Backend{ + {ID: "conv-backend", URL: ctx.module.config.BackendServices["conv-backend"], Client: http.DefaultClient}, + {ID: "flags-backend", URL: ctx.module.config.BackendServices["flags-backend"], Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + if pipelineCfg, ok := ctx.module.pipelineConfigs["/api/filter"]; ok { + handler.SetPipelineConfig(pipelineCfg) + } + + req := httptest.NewRequest("GET", "/api/filter", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + ctx.lastResponse = resp + body, _ := io.ReadAll(resp.Body) + ctx.lastResponseBody = body + + return nil +} + +func (ctx *ReverseProxyBDDTestContext) onlyItemsMatchingTheAncillaryCriteriaShouldBeReturned() error { + var result map[string]interface{} + if err := json.Unmarshal(ctx.lastResponseBody, &result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + + filtered := result["filtered_conversations"].([]interface{}) + totalFiltered := result["total_filtered"].(float64) + + if int(totalFiltered) != 2 { + return fmt.Errorf("expected 2 filtered conversations, got %v", totalFiltered) + } + if len(filtered) != 2 { + return fmt.Errorf("expected 2 filtered conversations in array, got %d", len(filtered)) + } + + // Verify only c1 and c4 are present + ids := make(map[string]bool) + for _, item := range filtered { + m := item.(map[string]interface{}) + ids[m["id"].(string)] = true + if m["flagged"] != true { + return fmt.Errorf("expected flagged=true on filtered item") + } + } + + if !ids["c1"] || !ids["c4"] { + return fmt.Errorf("expected c1 and c4 in filtered results, got %v", ids) + } + + return nil +} diff --git a/modules/reverseproxy/bdd_step_registry_test.go b/modules/reverseproxy/bdd_step_registry_test.go index d6915677..b509bd8b 100644 --- a/modules/reverseproxy/bdd_step_registry_test.go +++ b/modules/reverseproxy/bdd_step_registry_test.go @@ -380,6 +380,31 @@ func registerAllStepDefinitions(s *godog.ScenarioContext, ctx *ReverseProxyBDDTe // Timeout-related scenario steps (removing duplicate to avoid ambiguity) s.Then(`^appropriate timeout error responses should be returned$`, ctx.appropriateTimeoutErrorResponsesShouldBeReturned) + // Pipeline and Fan-Out-Merge Composite Strategy Steps (from bdd_composite_pipeline_test.go) + s.Given(`^I have a pipeline composite route with two backends$`, ctx.iHaveAPipelineCompositeRouteWithTwoBackends) + s.When(`^I send a request to the pipeline route$`, ctx.iSendARequestToThePipelineRoute) + s.Then(`^the first backend should be called with the original request$`, ctx.theFirstBackendShouldBeCalledWithTheOriginalRequest) + s.Then(`^the second backend should receive data derived from the first response$`, ctx.theSecondBackendShouldReceiveDataDerivedFromTheFirstResponse) + s.Then(`^the final response should contain merged data from all stages$`, ctx.theFinalResponseShouldContainMergedDataFromAllStages) + + s.Given(`^I have a fan-out-merge composite route with two backends$`, ctx.iHaveAFanOutMergeCompositeRouteWithTwoBackends) + s.When(`^I send a request to the fan-out-merge route$`, ctx.iSendARequestToTheFanOutMergeRoute) + s.Then(`^both backends should be called in parallel$`, ctx.bothBackendsShouldBeCalledInParallel) + s.Then(`^the responses should be merged by matching IDs$`, ctx.theResponsesShouldBeMergedByMatchingIDs) + s.Then(`^items with matching ancillary data should be enriched$`, ctx.itemsWithMatchingAncillaryDataShouldBeEnriched) + + s.Given(`^I have a pipeline route with skip-empty policy$`, ctx.iHaveAPipelineRouteWithSkipEmptyPolicy) + s.When(`^I send a request and a backend returns an empty response$`, ctx.iSendARequestAndABackendReturnsAnEmptyResponse) + s.Then(`^the empty response should be excluded from the result$`, ctx.theEmptyResponseShouldBeExcludedFromTheResult) + s.Then(`^the non-empty responses should still be present$`, ctx.theNonEmptyResponsesShouldStillBePresent) + + s.Given(`^I have a fan-out-merge route with fail-on-empty policy$`, ctx.iHaveAFanOutMergeRouteWithFailOnEmptyPolicy) + s.Then(`^the request should fail with a bad gateway error$`, ctx.theRequestShouldFailWithABadGatewayError) + + s.Given(`^I have a pipeline route that filters by ancillary backend data$`, ctx.iHaveAPipelineRouteThatFiltersByAncillaryBackendData) + s.When(`^I send a request to fetch filtered results$`, ctx.iSendARequestToFetchFilteredResults) + s.Then(`^only items matching the ancillary criteria should be returned$`, ctx.onlyItemsMatchingTheAncillaryCriteriaShouldBeReturned) + // Note: Most comprehensive step implementations are already in existing BDD files // Only add new steps here for scenarios that are completely missing implementations } diff --git a/modules/reverseproxy/composite.go b/modules/reverseproxy/composite.go index 2aa91982..651bf87f 100644 --- a/modules/reverseproxy/composite.go +++ b/modules/reverseproxy/composite.go @@ -55,6 +55,15 @@ type CompositeHandler struct { responseCache *responseCache eventEmitter func(eventType string, data map[string]interface{}) responseTransformer ResponseTransformer + + // Pipeline strategy configuration + pipelineConfig *PipelineConfig + + // Fan-out-merge strategy merger function + fanOutMerger FanOutMerger + + // Empty response policy for pipeline and fan-out-merge strategies + emptyResponsePolicy EmptyResponsePolicy } // NewCompositeHandler creates a new composite handler with the given backends and strategy. @@ -121,6 +130,21 @@ func (h *CompositeHandler) SetResponseTransformer(transformer ResponseTransforme h.responseTransformer = transformer } +// SetPipelineConfig sets the pipeline configuration for pipeline strategy routes. +func (h *CompositeHandler) SetPipelineConfig(config *PipelineConfig) { + h.pipelineConfig = config +} + +// SetFanOutMerger sets the fan-out merger function for fan-out-merge strategy routes. +func (h *CompositeHandler) SetFanOutMerger(merger FanOutMerger) { + h.fanOutMerger = merger +} + +// SetEmptyResponsePolicy sets the empty response policy for pipeline and fan-out-merge strategies. +func (h *CompositeHandler) SetEmptyResponsePolicy(policy EmptyResponsePolicy) { + h.emptyResponsePolicy = policy +} + // ServeHTTP handles the request by forwarding it to all backends // and merging the responses. func (h *CompositeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -173,6 +197,10 @@ func (h *CompositeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.executeMerge(ctx, recorder, r, bodyBytes) case StrategySequential: h.executeSequential(ctx, recorder, r, bodyBytes) + case StrategyPipeline: + h.executePipeline(ctx, recorder, r, bodyBytes) + case StrategyFanOutMerge: + h.executeFanOutMerge(ctx, recorder, r, bodyBytes) default: // Default to first-success for unknown strategies h.executeFirstSuccess(ctx, recorder, r, bodyBytes) @@ -377,7 +405,7 @@ func (h *CompositeHandler) executeBackendRequest(ctx context.Context, backend *B } // Create a new request with the same method, URL, and headers. - req, err := http.NewRequestWithContext(ctx, r.Method, backendURL, nil) //nolint:gosec // G704: backendURL is built from configured backend.URL, not user input + req, err := http.NewRequestWithContext(ctx, r.Method, backendURL, nil) //nolint:gosec // G704: reverse proxy intentionally forwards requests to configured backends if err != nil { return nil, fmt.Errorf("failed to create new request: %w", err) } @@ -504,6 +532,17 @@ func (m *ReverseProxyModule) createCompositeHandler(ctx context.Context, routeCo // Create and configure the handler handler := NewCompositeHandler(backends, strategy, responseTimeout) + // Set empty response policy from config if specified + if routeConfig.EmptyPolicy != "" { + switch routeConfig.EmptyPolicy { + case string(EmptyResponseAllow), string(EmptyResponseSkip), string(EmptyResponseFail): + handler.SetEmptyResponsePolicy(EmptyResponsePolicy(routeConfig.EmptyPolicy)) + default: + return nil, fmt.Errorf("route %q empty_policy %q: %w", + routeConfig.Pattern, routeConfig.EmptyPolicy, ErrInvalidEmptyResponsePolicy) + } + } + // Set event emitter for circuit breaker events handler.SetEventEmitter(func(eventType string, data map[string]interface{}) { m.emitEvent(ctx, eventType, data) @@ -536,6 +575,21 @@ func (m *ReverseProxyModule) createCompositeHandler(ctx context.Context, routeCo handler.SetResponseTransformer(transformer) } + // Set pipeline config if available for this route + if pipelineCfg, exists := m.pipelineConfigs[routeConfig.Pattern]; exists { + handler.SetPipelineConfig(pipelineCfg) + } + + // Set fan-out merger if available for this route + if merger, exists := m.fanOutMergers[routeConfig.Pattern]; exists { + handler.SetFanOutMerger(merger) + } + + // Set empty response policy if available for this route + if policy, exists := m.emptyResponsePolicies[routeConfig.Pattern]; exists { + handler.SetEmptyResponsePolicy(policy) + } + return handler, nil } diff --git a/modules/reverseproxy/composite_pipeline.go b/modules/reverseproxy/composite_pipeline.go new file mode 100644 index 00000000..bf38f35f --- /dev/null +++ b/modules/reverseproxy/composite_pipeline.go @@ -0,0 +1,417 @@ +// Package reverseproxy provides a flexible reverse proxy module with support for multiple backends, +// composite responses, and tenant awareness. +package reverseproxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" +) + +const ( + // StrategyPipeline executes backends sequentially where each stage's response + // can inform the next stage's request. This enables map/reduce patterns where + // backend B's request is constructed from backend A's response. + // + // Example: Backend A returns a list of conversation IDs, backend B is called + // with those IDs to fetch ancillary details, and the responses are merged. + // + // Requires a PipelineConfig to be set via SetPipelineConfig. + StrategyPipeline CompositeStrategy = "pipeline" + + // StrategyFanOutMerge executes all backend requests in parallel (like merge), + // then applies a custom FanOutMerger function to perform ID-based matching, + // filtering, and complex merging logic across all responses. + // + // Example: Backend A returns conversations, backend B returns follow-up flags. + // The merger matches by conversation ID and produces a unified response. + // + // Requires a FanOutMerger to be set via SetFanOutMerger. + StrategyFanOutMerge CompositeStrategy = "fan-out-merge" +) + +// EmptyResponsePolicy defines how empty backend responses should be handled +// in pipeline and fan-out-merge strategies. +type EmptyResponsePolicy string + +const ( + // EmptyResponseAllow includes empty responses in the result set. + // Backends that return no data are represented as empty/nil in the response map. + EmptyResponseAllow EmptyResponsePolicy = "allow-empty" + + // EmptyResponseSkip silently drops empty responses from the result set. + // The merger/pipeline receives only non-empty responses. + EmptyResponseSkip EmptyResponsePolicy = "skip-empty" + + // EmptyResponseFail causes the entire composite request to fail if any backend + // returns an empty response. Returns 502 Bad Gateway. + EmptyResponseFail EmptyResponsePolicy = "fail-on-empty" +) + +// PipelineRequestBuilder builds the HTTP request for the next pipeline stage. +// It receives: +// - ctx: the request context +// - originalReq: the original incoming HTTP request +// - previousResponses: accumulated parsed response bodies keyed by backend ID +// - nextBackendID: the ID of the next backend to call +// +// It returns the HTTP request to send to the next backend, or an error. +// If it returns nil for the request (with no error), the stage is skipped. +type PipelineRequestBuilder func( + ctx context.Context, + originalReq *http.Request, + previousResponses map[string][]byte, + nextBackendID string, +) (*http.Request, error) + +// PipelineResponseMerger merges all pipeline stage responses into a single HTTP response. +// It receives: +// - ctx: the request context +// - originalReq: the original incoming HTTP request +// - allResponses: all accumulated response bodies keyed by backend ID +// +// It returns the final merged HTTP response, or an error. +type PipelineResponseMerger func( + ctx context.Context, + originalReq *http.Request, + allResponses map[string][]byte, +) (*http.Response, error) + +// FanOutMerger merges parallel backend responses using custom logic such as +// ID-based matching, filtering, or complex data correlation. +// It receives: +// - ctx: the request context +// - originalReq: the original incoming HTTP request +// - responses: response bodies keyed by backend ID +// +// It returns the final merged HTTP response, or an error. +type FanOutMerger func( + ctx context.Context, + originalReq *http.Request, + responses map[string][]byte, +) (*http.Response, error) + +// PipelineConfig holds configuration for a pipeline strategy route. +type PipelineConfig struct { + // RequestBuilder constructs the request for each subsequent pipeline stage + // using responses from previous stages. + RequestBuilder PipelineRequestBuilder + + // ResponseMerger combines all pipeline stage responses into a final response. + // If nil, a default merger is used that wraps all responses in a JSON object + // keyed by backend ID. + ResponseMerger PipelineResponseMerger +} + +// isEmptyBody returns true if the body bytes represent an empty or null response. +func isEmptyBody(body []byte) bool { + if len(body) == 0 { + return true + } + trimmed := bytes.TrimSpace(body) + if len(trimmed) == 0 { + return true + } + // Check for JSON null + if string(trimmed) == "null" { + return true + } + // Check for empty JSON object + if string(trimmed) == "{}" { + return true + } + // Check for empty JSON array + if string(trimmed) == "[]" { + return true + } + return false +} + +// executePipeline executes backends sequentially, passing each response to the +// PipelineRequestBuilder to construct the next request. +func (h *CompositeHandler) executePipeline(ctx context.Context, w http.ResponseWriter, r *http.Request, bodyBytes []byte) { + if h.pipelineConfig == nil || h.pipelineConfig.RequestBuilder == nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("Pipeline strategy requires a PipelineConfig with RequestBuilder")) + return + } + + allResponses := make(map[string][]byte) + + for i, backend := range h.backends { + // Check the circuit breaker before making the request. + circuitBreaker := h.circuitBreakers[backend.ID] + if circuitBreaker != nil && circuitBreaker.IsOpen() { + continue + } + + var req *http.Request + var err error + + if i == 0 { + // First stage: use the original request + req, err = h.buildBackendRequest(ctx, backend, r, bodyBytes) + if err != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + continue + } + } else { + // Subsequent stages: use the PipelineRequestBuilder + req, err = h.pipelineConfig.RequestBuilder(ctx, r, allResponses, backend.ID) + if err != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + continue + } + // If builder returns nil, skip this stage + if req == nil { + continue + } + } + + // Execute the request using the backend's client + resp, err := backend.Client.Do(req) //nolint:gosec // G704: reverse proxy intentionally forwards requests to configured backends + if err != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + continue + } + + // Read and store the response body + respBody, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + continue + } + + // Record success in the circuit breaker. + if circuitBreaker != nil { + circuitBreaker.RecordSuccess() + } + + // Apply empty response policy + if isEmptyBody(respBody) { + switch h.emptyResponsePolicy { + case EmptyResponseFail: + w.WriteHeader(http.StatusBadGateway) + fmt.Fprintf(w, "Backend %s returned empty response", backend.ID) + return + case EmptyResponseSkip: + continue + case EmptyResponseAllow: + // Include empty response + default: + // Include empty response + } + } + + allResponses[backend.ID] = respBody + } + + // Merge all responses + if h.pipelineConfig.ResponseMerger != nil { + mergedResp, err := h.pipelineConfig.ResponseMerger(ctx, r, allResponses) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Pipeline response merge failed: %v", err) + return + } + if mergedResp != nil { + h.writeResponse(mergedResp, w) + mergedResp.Body.Close() + return + } + } + + // Default: wrap all responses in a JSON object keyed by backend ID + h.writeDefaultPipelineResponse(allResponses, w) +} + +// executeFanOutMerge executes all backend requests in parallel, reads their bodies, +// then applies the FanOutMerger to produce the final response. +func (h *CompositeHandler) executeFanOutMerge(ctx context.Context, w http.ResponseWriter, r *http.Request, bodyBytes []byte) { + if h.fanOutMerger == nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("Fan-out-merge strategy requires a FanOutMerger")) + return + } + + var wg sync.WaitGroup + var mu sync.Mutex + responses := make(map[string][]byte) + + for _, backend := range h.backends { + b := backend + wg.Go(func() { + // Check the circuit breaker + circuitBreaker := h.circuitBreakers[b.ID] + if circuitBreaker != nil && circuitBreaker.IsOpen() { + return + } + + // Execute the request + resp, err := h.executeBackendRequest(ctx, b, r, bodyBytes) //nolint:bodyclose // Response body is closed below + if err != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + return + } + + // Read the response body + body, readErr := io.ReadAll(resp.Body) + resp.Body.Close() + if readErr != nil { + if circuitBreaker != nil { + circuitBreaker.RecordFailure() + } + return + } + + // Record success + if circuitBreaker != nil { + circuitBreaker.RecordSuccess() + } + + mu.Lock() + responses[b.ID] = body + mu.Unlock() + }) + } + + wg.Wait() + + // Short-circuit if all backends failed or were skipped by open circuit breakers + if len(responses) == 0 { + w.WriteHeader(http.StatusBadGateway) + fmt.Fprintf(w, "No successful responses from fan-out backends") + return + } + + // Apply empty response policy + filteredResponses := make(map[string][]byte) + for backendID, body := range responses { + if isEmptyBody(body) { + switch h.emptyResponsePolicy { + case EmptyResponseFail: + w.WriteHeader(http.StatusBadGateway) + fmt.Fprintf(w, "Backend %s returned empty response", backendID) + return + case EmptyResponseSkip: + continue + case EmptyResponseAllow: + filteredResponses[backendID] = body + default: + filteredResponses[backendID] = body + } + } else { + filteredResponses[backendID] = body + } + } + + // Short-circuit if all responses were filtered out (e.g., all empty with skip policy) + if len(filteredResponses) == 0 { + w.WriteHeader(http.StatusBadGateway) + fmt.Fprintf(w, "No non-empty responses from fan-out backends") + return + } + + // Apply the fan-out merger + mergedResp, err := h.fanOutMerger(ctx, r, filteredResponses) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Fan-out merge failed: %v", err) + return + } + if mergedResp != nil { + h.writeResponse(mergedResp, w) + mergedResp.Body.Close() + return + } + + // If merger returned nil, return empty response + w.WriteHeader(http.StatusNoContent) +} + +// buildBackendRequest creates an HTTP request for a backend (used by pipeline for the first stage). +func (h *CompositeHandler) buildBackendRequest(ctx context.Context, backend *Backend, r *http.Request, bodyBytes []byte) (*http.Request, error) { + backendURL := backend.URL + r.URL.Path + if r.URL.RawQuery != "" { + backendURL += "?" + r.URL.RawQuery + } + + req, err := http.NewRequestWithContext(ctx, r.Method, backendURL, nil) //nolint:gosec // G704: reverse proxy intentionally forwards requests to configured backends + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + for k, v := range r.Header { + for _, val := range v { + req.Header.Add(k, val) + } + } + + if len(bodyBytes) > 0 { + req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + req.ContentLength = int64(len(bodyBytes)) + } + + return req, nil +} + +// writeDefaultPipelineResponse writes a default JSON response containing all pipeline stage responses. +func (h *CompositeHandler) writeDefaultPipelineResponse(allResponses map[string][]byte, w http.ResponseWriter) { + if len(allResponses) == 0 { + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte("No successful responses from pipeline backends")) + return + } + + merged := make(map[string]interface{}) + for backendID, body := range allResponses { + var data interface{} + if err := json.Unmarshal(body, &data); err != nil { + merged[backendID] = string(body) + } else { + merged[backendID] = data + } + } + + encoded, err := json.Marshal(merged) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("Failed to encode pipeline response")) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(encoded) +} + +// MakeJSONResponse is a helper that creates an HTTP response from a JSON-serializable value. +// It's provided for use by PipelineResponseMerger and FanOutMerger implementations. +func MakeJSONResponse(statusCode int, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, fmt.Errorf("failed to marshal response: %w", err) + } + + return &http.Response{ + Status: fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)), + StatusCode: statusCode, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil +} diff --git a/modules/reverseproxy/composite_pipeline_test.go b/modules/reverseproxy/composite_pipeline_test.go new file mode 100644 index 00000000..ad034ca5 --- /dev/null +++ b/modules/reverseproxy/composite_pipeline_test.go @@ -0,0 +1,1676 @@ +package reverseproxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// ============================================================================ +// Pipeline Strategy Tests +// ============================================================================ + +func TestPipelineStrategy_BasicChaining(t *testing.T) { + // Backend A returns a list of conversation IDs + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "conversations": []map[string]interface{}{ + {"id": "conv-1", "title": "First conversation"}, + {"id": "conv-2", "title": "Second conversation"}, + {"id": "conv-3", "title": "Third conversation"}, + }, + }) + })) + defer backendA.Close() + + // Backend B returns follow-up details for given IDs (received via query params) + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ids := r.URL.Query().Get("ids") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + // Return follow-up details for the requested IDs + followUps := map[string]interface{}{} + for _, id := range strings.Split(ids, ",") { + if id == "conv-1" { + followUps[id] = map[string]interface{}{"is_followup": true, "original_id": "conv-0"} + } + // conv-2 and conv-3 have no follow-up data + } + json.NewEncoder(w).Encode(map[string]interface{}{ + "follow_ups": followUps, + }) + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "conversations", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "followups", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + if nextBackendID == "followups" { + // Parse conversation IDs from the previous response + var convResp struct { + Conversations []struct { + ID string `json:"id"` + } `json:"conversations"` + } + if convBody, ok := previousResponses["conversations"]; ok { + if err := json.Unmarshal(convBody, &convResp); err != nil { + return nil, fmt.Errorf("failed to parse conversations: %w", err) + } + } + + // Build query with IDs + ids := make([]string, 0, len(convResp.Conversations)) + for _, c := range convResp.Conversations { + ids = append(ids, c.ID) + } + + url := backends[1].URL + "/followups?ids=" + strings.Join(ids, ",") + return http.NewRequestWithContext(ctx, "GET", url, nil) + } + return nil, fmt.Errorf("unknown backend: %s", nextBackendID) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + // Parse both responses + var convResp struct { + Conversations []map[string]interface{} `json:"conversations"` + } + if convBody, ok := allResponses["conversations"]; ok { + json.Unmarshal(convBody, &convResp) + } + + var followUpResp struct { + FollowUps map[string]interface{} `json:"follow_ups"` + } + if fuBody, ok := allResponses["followups"]; ok { + json.Unmarshal(fuBody, &followUpResp) + } + + // Merge follow-up data into conversations + for i, conv := range convResp.Conversations { + if id, ok := conv["id"].(string); ok { + if fu, exists := followUpResp.FollowUps[id]; exists { + convResp.Conversations[i]["follow_up"] = fu + } + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "conversations": convResp.Conversations, + }) + }, + }) + + req := httptest.NewRequest("GET", "/api/conversations", nil) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var result map[string]interface{} + err = json.Unmarshal(body, &result) + require.NoError(t, err) + + conversations, ok := result["conversations"].([]interface{}) + require.True(t, ok, "expected conversations array") + assert.Len(t, conversations, 3) + + // Verify conv-1 has follow-up data + conv1 := conversations[0].(map[string]interface{}) + assert.Equal(t, "conv-1", conv1["id"]) + followUp, hasFollowUp := conv1["follow_up"] + assert.True(t, hasFollowUp, "conv-1 should have follow_up data") + fuMap := followUp.(map[string]interface{}) + assert.Equal(t, true, fuMap["is_followup"]) + + // Verify conv-2 has no follow-up data + conv2 := conversations[1].(map[string]interface{}) + assert.Equal(t, "conv-2", conv2["id"]) + _, hasFollowUp2 := conv2["follow_up"] + assert.False(t, hasFollowUp2, "conv-2 should not have follow_up data") +} + +func TestPipelineStrategy_ThreeStageChain(t *testing.T) { + // Stage 1: returns user IDs + stage1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "users": []string{"user-1", "user-2"}, + }) + })) + defer stage1.Close() + + // Stage 2: returns user profiles + stage2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "profiles": map[string]interface{}{ + "user-1": map[string]interface{}{"name": "Alice", "dept": "eng"}, + "user-2": map[string]interface{}{"name": "Bob", "dept": "sales"}, + }, + }) + })) + defer stage2.Close() + + // Stage 3: returns permissions + stage3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "permissions": map[string]interface{}{ + "user-1": []string{"admin", "read", "write"}, + "user-2": []string{"read"}, + }, + }) + })) + defer stage3.Close() + + backends := []*Backend{ + {ID: "users", URL: stage1.URL, Client: http.DefaultClient}, + {ID: "profiles", URL: stage2.URL, Client: http.DefaultClient}, + {ID: "permissions", URL: stage3.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + switch nextBackendID { + case "profiles": + url := backends[1].URL + "/profiles" + return http.NewRequestWithContext(ctx, "GET", url, nil) + case "permissions": + url := backends[2].URL + "/permissions" + return http.NewRequestWithContext(ctx, "GET", url, nil) + } + return nil, fmt.Errorf("unknown backend: %s", nextBackendID) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + result := make(map[string]interface{}) + for k, v := range allResponses { + var parsed interface{} + json.Unmarshal(v, &parsed) + result[k] = parsed + } + return MakeJSONResponse(http.StatusOK, result) + }, + }) + + req := httptest.NewRequest("GET", "/api/users", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + // All three stages should be present + assert.Contains(t, result, "users") + assert.Contains(t, result, "profiles") + assert.Contains(t, result, "permissions") +} + +func TestPipelineStrategy_NoPipelineConfig(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "b1", URL: backend.URL, Client: http.DefaultClient}, + } + handler := NewCompositeHandler(backends, StrategyPipeline, 5*time.Second) + // Intentionally not setting pipeline config + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) +} + +func TestPipelineStrategy_DefaultMerger(t *testing.T) { + // When no ResponseMerger is set, the default wraps responses by backend ID + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"step":"one"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"step":"two"}`)) + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "step1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "step2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(ctx, "GET", backends[1].URL+"/step2", nil) + }, + // No ResponseMerger - uses default + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + assert.Contains(t, result, "step1") + assert.Contains(t, result, "step2") +} + +func TestPipelineStrategy_SkipStage(t *testing.T) { + // When PipelineRequestBuilder returns nil, the stage is skipped + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"from-stage1"}`)) + })) + defer backend1.Close() + + callCount := 0 + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"from-stage2"}`)) + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "stage1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "stage2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + // Skip stage2 by returning nil + return nil, nil + }, + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + // Stage 2 should not have been called + assert.Equal(t, 0, callCount, "stage2 should not have been called") + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + assert.Contains(t, result, "stage1") + assert.NotContains(t, result, "stage2") +} + +func TestPipelineStrategy_BackendError(t *testing.T) { + // First backend succeeds, second fails + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"from-stage1"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "stage1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "stage2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(ctx, "GET", backends[1].URL+"/test", nil) + }, + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + // Pipeline should still return stage1 results even if stage2 fails + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + assert.Contains(t, result, "stage1") +} + +func TestPipelineStrategy_RequestBuilderError(t *testing.T) { + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"ok"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"stage2"}`)) + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "stage1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "stage2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return nil, fmt.Errorf("intentional builder error") + }, + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + // Should still return stage1 data despite stage2 builder error + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +// ============================================================================ +// Fan-Out-Merge Strategy Tests +// ============================================================================ + +func TestFanOutMerge_IDBasedMerging(t *testing.T) { + // Backend A returns conversations + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": []map[string]interface{}{ + {"id": "item-1", "name": "Item One", "status": "active"}, + {"id": "item-2", "name": "Item Two", "status": "pending"}, + {"id": "item-3", "name": "Item Three", "status": "active"}, + }, + }) + })) + defer backendA.Close() + + // Backend B returns ancillary details (some items may not be present) + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "details": map[string]interface{}{ + "item-1": map[string]interface{}{"priority": "high", "assignee": "Alice"}, + "item-3": map[string]interface{}{"priority": "low", "assignee": "Bob"}, + }, + }) + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "items", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "details", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Parse items + var itemsResp struct { + Items []map[string]interface{} `json:"items"` + } + if body, ok := responses["items"]; ok { + json.Unmarshal(body, &itemsResp) + } + + // Parse details + var detailsResp struct { + Details map[string]interface{} `json:"details"` + } + if body, ok := responses["details"]; ok { + json.Unmarshal(body, &detailsResp) + } + + // Merge by ID + for i, item := range itemsResp.Items { + if id, ok := item["id"].(string); ok { + if detail, exists := detailsResp.Details[id]; exists { + itemsResp.Items[i]["details"] = detail + } + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "items": itemsResp.Items, + }) + }) + + req := httptest.NewRequest("GET", "/api/items", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + items := result["items"].([]interface{}) + assert.Len(t, items, 3) + + // item-1 should have details + item1 := items[0].(map[string]interface{}) + assert.Equal(t, "item-1", item1["id"]) + assert.NotNil(t, item1["details"]) + + // item-2 should NOT have details + item2 := items[1].(map[string]interface{}) + assert.Equal(t, "item-2", item2["id"]) + _, hasDetails := item2["details"] + assert.False(t, hasDetails) + + // item-3 should have details + item3 := items[2].(map[string]interface{}) + assert.Equal(t, "item-3", item3["id"]) + assert.NotNil(t, item3["details"]) +} + +func TestFanOutMerge_FilterByAncillaryData(t *testing.T) { + // Backend A returns all conversations + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "conversations": []map[string]interface{}{ + {"id": "c1", "title": "Conversation 1"}, + {"id": "c2", "title": "Conversation 2"}, + {"id": "c3", "title": "Conversation 3"}, + }, + }) + })) + defer backendA.Close() + + // Backend B returns which conversations are follow-ups (acts as filter) + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "follow_up_ids": []string{"c1", "c3"}, + }) + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "conversations", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "followups", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + var convResp struct { + Conversations []map[string]interface{} `json:"conversations"` + } + if body, ok := responses["conversations"]; ok { + json.Unmarshal(body, &convResp) + } + + var fuResp struct { + FollowUpIDs []string `json:"follow_up_ids"` + } + if body, ok := responses["followups"]; ok { + json.Unmarshal(body, &fuResp) + } + + // Create lookup set + followUpSet := make(map[string]bool) + for _, id := range fuResp.FollowUpIDs { + followUpSet[id] = true + } + + // Filter: only include conversations that are follow-ups + var filtered []map[string]interface{} + for _, conv := range convResp.Conversations { + if id, ok := conv["id"].(string); ok && followUpSet[id] { + conv["is_follow_up"] = true + filtered = append(filtered, conv) + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "follow_up_conversations": filtered, + }) + }) + + req := httptest.NewRequest("GET", "/api/followups", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + convs := result["follow_up_conversations"].([]interface{}) + assert.Len(t, convs, 2, "only c1 and c3 should be included") + + ids := make([]string, 0, len(convs)) + for _, c := range convs { + ids = append(ids, c.(map[string]interface{})["id"].(string)) + } + assert.Contains(t, ids, "c1") + assert.Contains(t, ids, "c3") +} + +func TestFanOutMerge_NoMerger(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "b1", URL: backend.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 5*time.Second) + // Intentionally not setting fan-out merger + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) +} + +func TestFanOutMerge_ThreeBackends(t *testing.T) { + // Three backends returning different types of data for the same entity + backendUsers := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "users": []map[string]interface{}{ + {"id": "u1", "name": "Alice"}, + {"id": "u2", "name": "Bob"}, + }, + }) + })) + defer backendUsers.Close() + + backendRoles := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "roles": map[string]string{ + "u1": "admin", + "u2": "user", + }, + }) + })) + defer backendRoles.Close() + + backendActivity := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "activity": map[string]int{ + "u1": 42, + "u2": 7, + }, + }) + })) + defer backendActivity.Close() + + backends := []*Backend{ + {ID: "users", URL: backendUsers.URL, Client: http.DefaultClient}, + {ID: "roles", URL: backendRoles.URL, Client: http.DefaultClient}, + {ID: "activity", URL: backendActivity.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + var usersResp struct { + Users []map[string]interface{} `json:"users"` + } + json.Unmarshal(responses["users"], &usersResp) + + var rolesResp struct { + Roles map[string]string `json:"roles"` + } + json.Unmarshal(responses["roles"], &rolesResp) + + var activityResp struct { + Activity map[string]float64 `json:"activity"` + } + json.Unmarshal(responses["activity"], &activityResp) + + // Enrich users with roles and activity + for i, user := range usersResp.Users { + id := user["id"].(string) + if role, ok := rolesResp.Roles[id]; ok { + usersResp.Users[i]["role"] = role + } + if count, ok := activityResp.Activity[id]; ok { + usersResp.Users[i]["activity_count"] = count + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "enriched_users": usersResp.Users, + }) + }) + + req := httptest.NewRequest("GET", "/api/users", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + users := result["enriched_users"].([]interface{}) + assert.Len(t, users, 2) + + u1 := users[0].(map[string]interface{}) + assert.Equal(t, "Alice", u1["name"]) + assert.Equal(t, "admin", u1["role"]) + assert.Equal(t, float64(42), u1["activity_count"]) + + u2 := users[1].(map[string]interface{}) + assert.Equal(t, "Bob", u2["name"]) + assert.Equal(t, "user", u2["role"]) + assert.Equal(t, float64(7), u2["activity_count"]) +} + +// ============================================================================ +// Empty Response Policy Tests +// ============================================================================ + +func TestEmptyResponsePolicy_AllowEmpty_Pipeline(t *testing.T) { + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"stage1"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) // Empty JSON object + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "s1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "s2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseAllow) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(ctx, "GET", backends[1].URL+"/test", nil) + }, + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + // Both stages should be present + assert.Contains(t, result, "s1") + assert.Contains(t, result, "s2") +} + +func TestEmptyResponsePolicy_SkipEmpty_Pipeline(t *testing.T) { + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"stage1"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) // Empty JSON object + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "s1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "s2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseSkip) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(ctx, "GET", backends[1].URL+"/test", nil) + }, + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + // Only s1 should be present; s2 was empty and skipped + assert.Contains(t, result, "s1") + assert.NotContains(t, result, "s2") +} + +func TestEmptyResponsePolicy_FailOnEmpty_Pipeline(t *testing.T) { + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"stage1"}`)) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`[]`)) // Empty JSON array + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "s1", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "s2", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseFail) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return http.NewRequestWithContext(ctx, "GET", backends[1].URL+"/test", nil) + }, + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + // Should fail because stage 2 returned empty + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) +} + +func TestEmptyResponsePolicy_SkipEmpty_FanOutMerge(t *testing.T) { + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": []string{"a", "b", "c"}, + }) + })) + defer backendA.Close() + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`null`)) // Empty/null response + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "primary", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "ancillary", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseSkip) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // ancillary should be skipped + _, hasAncillary := responses["ancillary"] + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "has_ancillary": hasAncillary, + "backends": len(responses), + }) + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + assert.Equal(t, false, result["has_ancillary"]) + assert.Equal(t, float64(1), result["backends"]) +} + +func TestEmptyResponsePolicy_FailOnEmpty_FanOutMerge(t *testing.T) { + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"data":"ok"}`)) + })) + defer backendA.Close() + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(``)) // Completely empty body + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "primary", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "ancillary", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseFail) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + return MakeJSONResponse(http.StatusOK, responses) + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) +} + +// ============================================================================ +// isEmptyBody Tests +// ============================================================================ + +func TestIsEmptyBody(t *testing.T) { + tests := []struct { + name string + body []byte + expected bool + }{ + {"nil body", nil, true}, + {"empty body", []byte{}, true}, + {"whitespace only", []byte(" \n\t "), true}, + {"null JSON", []byte("null"), true}, + {"empty object", []byte("{}"), true}, + {"empty array", []byte("[]"), true}, + {"null with whitespace", []byte(" null "), true}, + {"non-empty object", []byte(`{"key":"value"}`), false}, + {"non-empty array", []byte(`[1,2,3]`), false}, + {"string value", []byte(`"hello"`), false}, + {"number value", []byte(`42`), false}, + {"boolean true", []byte(`true`), false}, + {"object with empty string", []byte(`{"key":""}`), false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isEmptyBody(tt.body) + assert.Equal(t, tt.expected, result) + }) + } +} + +// ============================================================================ +// MakeJSONResponse Helper Tests +// ============================================================================ + +func TestMakeJSONResponse(t *testing.T) { + data := map[string]interface{}{ + "key": "value", + "num": 42, + } + + resp, err := MakeJSONResponse(http.StatusOK, data) + require.NoError(t, err) + require.NotNil(t, resp) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "application/json", resp.Header.Get("Content-Type")) + + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + var result map[string]interface{} + json.Unmarshal(body, &result) + assert.Equal(t, "value", result["key"]) + assert.Equal(t, float64(42), result["num"]) +} + +func TestMakeJSONResponse_CustomStatusCode(t *testing.T) { + resp, err := MakeJSONResponse(http.StatusCreated, map[string]string{"status": "created"}) + require.NoError(t, err) + assert.Equal(t, http.StatusCreated, resp.StatusCode) + resp.Body.Close() +} + +// ============================================================================ +// Complex Scenario Tests +// ============================================================================ + +func TestPipelineStrategy_ConversationListWithFollowUps(t *testing.T) { + // Scenario from the issue: list page with queued conversations + // Backend A has general conversation details + // Backend B has ancillary details (follow-ups) + + conversationsBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "conversations": []map[string]interface{}{ + {"id": "c-100", "status": "queued", "counselor": "Alice", "created_at": "2024-01-01T10:00:00Z"}, + {"id": "c-101", "status": "queued", "counselor": "Bob", "created_at": "2024-01-01T10:05:00Z"}, + {"id": "c-102", "status": "active", "counselor": "Carol", "created_at": "2024-01-01T10:10:00Z"}, + {"id": "c-103", "status": "queued", "counselor": nil, "created_at": "2024-01-01T10:15:00Z"}, + }, + "total": 4, + "page": 1, + }) + })) + defer conversationsBackend.Close() + + followUpBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // This backend receives specific conversation IDs to check + idsParam := r.URL.Query().Get("conversation_ids") + ids := strings.Split(idsParam, ",") + + followUps := make(map[string]interface{}) + // c-100 is a follow-up to c-50 + for _, id := range ids { + if id == "c-100" { + followUps[id] = map[string]interface{}{ + "is_follow_up": true, + "original_conv_id": "c-50", + "follow_up_count": 2, + } + } + if id == "c-103" { + followUps[id] = map[string]interface{}{ + "is_follow_up": true, + "original_conv_id": "c-90", + "follow_up_count": 1, + } + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "follow_ups": followUps, + }) + })) + defer followUpBackend.Close() + + backends := []*Backend{ + {ID: "conversations", URL: conversationsBackend.URL, Client: http.DefaultClient}, + {ID: "followups", URL: followUpBackend.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + if nextBackendID == "followups" { + // Extract conversation IDs from the first response + var convResp struct { + Conversations []struct { + ID string `json:"id"` + } `json:"conversations"` + } + if body, ok := previousResponses["conversations"]; ok { + if err := json.Unmarshal(body, &convResp); err != nil { + return nil, err + } + } + + ids := make([]string, 0, len(convResp.Conversations)) + for _, c := range convResp.Conversations { + ids = append(ids, c.ID) + } + + url := followUpBackend.URL + "/followups?conversation_ids=" + strings.Join(ids, ",") + return http.NewRequestWithContext(ctx, "GET", url, nil) + } + return nil, fmt.Errorf("unknown backend: %s", nextBackendID) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + var convResp struct { + Conversations []map[string]interface{} `json:"conversations"` + Total int `json:"total"` + Page int `json:"page"` + } + json.Unmarshal(allResponses["conversations"], &convResp) + + var fuResp struct { + FollowUps map[string]interface{} `json:"follow_ups"` + } + json.Unmarshal(allResponses["followups"], &fuResp) + + // Enrich conversations with follow-up data + for i, conv := range convResp.Conversations { + if id, ok := conv["id"].(string); ok { + if fu, exists := fuResp.FollowUps[id]; exists { + convResp.Conversations[i]["follow_up_info"] = fu + } + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "conversations": convResp.Conversations, + "total": convResp.Total, + "page": convResp.Page, + }) + }, + }) + + req := httptest.NewRequest("GET", "/api/conversations?status=queued", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + conversations := result["conversations"].([]interface{}) + assert.Len(t, conversations, 4) + + // c-100 should have follow_up_info + c100 := conversations[0].(map[string]interface{}) + assert.Equal(t, "c-100", c100["id"]) + fuInfo, hasFU := c100["follow_up_info"] + assert.True(t, hasFU) + fuMap := fuInfo.(map[string]interface{}) + assert.Equal(t, true, fuMap["is_follow_up"]) + assert.Equal(t, "c-50", fuMap["original_conv_id"]) + + // c-101 should NOT have follow_up_info + c101 := conversations[1].(map[string]interface{}) + _, hasFU101 := c101["follow_up_info"] + assert.False(t, hasFU101) + + // c-103 should have follow_up_info + c103 := conversations[3].(map[string]interface{}) + assert.Equal(t, "c-103", c103["id"]) + _, hasFU103 := c103["follow_up_info"] + assert.True(t, hasFU103) +} + +func TestFanOutMerge_ComplexNestedResponses(t *testing.T) { + // Complex scenario: merging nested JSON structures + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "orders": []map[string]interface{}{ + { + "id": "ord-1", + "amount": 99.99, + "items": []map[string]interface{}{ + {"sku": "SKU-001", "qty": 2}, + {"sku": "SKU-002", "qty": 1}, + }, + }, + { + "id": "ord-2", + "amount": 149.50, + "items": []map[string]interface{}{ + {"sku": "SKU-003", "qty": 3}, + }, + }, + }, + }) + })) + defer backendA.Close() + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "shipping": map[string]interface{}{ + "ord-1": map[string]interface{}{ + "status": "shipped", + "tracking": "TRACK-12345", + "carrier": "FedEx", + }, + "ord-2": map[string]interface{}{ + "status": "processing", + "tracking": "", + "carrier": "", + }, + }, + }) + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "orders", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "shipping", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + var ordersResp struct { + Orders []map[string]interface{} `json:"orders"` + } + json.Unmarshal(responses["orders"], &ordersResp) + + var shippingResp struct { + Shipping map[string]interface{} `json:"shipping"` + } + json.Unmarshal(responses["shipping"], &shippingResp) + + for i, order := range ordersResp.Orders { + if id, ok := order["id"].(string); ok { + if shipping, exists := shippingResp.Shipping[id]; exists { + ordersResp.Orders[i]["shipping"] = shipping + } + } + } + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "orders": ordersResp.Orders, + }) + }) + + req := httptest.NewRequest("GET", "/api/orders", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + orders := result["orders"].([]interface{}) + assert.Len(t, orders, 2) + + ord1 := orders[0].(map[string]interface{}) + shipping := ord1["shipping"].(map[string]interface{}) + assert.Equal(t, "shipped", shipping["status"]) + assert.Equal(t, "TRACK-12345", shipping["tracking"]) +} + +func TestFanOutMerge_EmptyAncillaryData_AllowPolicy(t *testing.T) { + // Backend A returns data, Backend B returns empty (no ancillary data exists) + // With allow-empty policy, merger should handle gracefully + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": []map[string]interface{}{ + {"id": "x1", "name": "X1"}, + }, + }) + })) + defer backendA.Close() + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) // Empty response - no ancillary data + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "primary", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "ancillary", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseAllow) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Both responses should be present + _, hasPrimary := responses["primary"] + _, hasAncillary := responses["ancillary"] + + return MakeJSONResponse(http.StatusOK, map[string]interface{}{ + "primary_present": hasPrimary, + "ancillary_present": hasAncillary, + }) + }) + + req := httptest.NewRequest("GET", "/api/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + assert.Equal(t, true, result["primary_present"]) + assert.Equal(t, true, result["ancillary_present"]) +} + +func TestPipelineStrategy_WithRequestBody(t *testing.T) { + // Test pipeline with POST request containing body + backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var input map[string]interface{} + json.Unmarshal(body, &input) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "processed": true, + "input": input, + "result_id": "res-123", + }) + })) + defer backend1.Close() + + backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var input map[string]interface{} + json.Unmarshal(body, &input) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "stored": true, + "result_id": input["result_id"], + }) + })) + defer backend2.Close() + + backends := []*Backend{ + {ID: "process", URL: backend1.URL, Client: http.DefaultClient}, + {ID: "store", URL: backend2.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + if nextBackendID == "store" { + // Parse the result_id from process response + var processResp struct { + ResultID string `json:"result_id"` + } + json.Unmarshal(previousResponses["process"], &processResp) + + // Build store request with result_id + storeBody, _ := json.Marshal(map[string]interface{}{ + "result_id": processResp.ResultID, + "action": "save", + }) + req, _ := http.NewRequestWithContext(ctx, "POST", backends[1].URL+"/store", + bytes.NewReader(storeBody)) + req.Header.Set("Content-Type", "application/json") + return req, nil + } + return nil, fmt.Errorf("unknown backend: %s", nextBackendID) + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + result := make(map[string]interface{}) + for k, v := range allResponses { + var parsed interface{} + json.Unmarshal(v, &parsed) + result[k] = parsed + } + return MakeJSONResponse(http.StatusOK, result) + }, + }) + + inputBody := `{"data":"test-payload"}` + req := httptest.NewRequest("POST", "/api/process", strings.NewReader(inputBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + + // Verify process stage + processResult := result["process"].(map[string]interface{}) + assert.Equal(t, true, processResult["processed"]) + assert.Equal(t, "res-123", processResult["result_id"]) + + // Verify store stage received the result_id from process stage + storeResult := result["store"].(map[string]interface{}) + assert.Equal(t, true, storeResult["stored"]) + assert.Equal(t, "res-123", storeResult["result_id"]) +} + +// ============================================================================ +// Module Integration Tests +// ============================================================================ + +func TestModuleSetPipelineConfig(t *testing.T) { + module := NewModule() + + config := PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return nil, nil + }, + } + + module.SetPipelineConfig("/api/pipeline", config) + assert.NotNil(t, module.pipelineConfigs["/api/pipeline"]) +} + +func TestModuleSetFanOutMerger(t *testing.T) { + module := NewModule() + + merger := func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + return MakeJSONResponse(http.StatusOK, responses) + } + + module.SetFanOutMerger("/api/fanout", merger) + assert.NotNil(t, module.fanOutMergers["/api/fanout"]) +} + +func TestModuleSetEmptyResponsePolicy(t *testing.T) { + module := NewModule() + + module.SetEmptyResponsePolicy("/api/test", EmptyResponseSkip) + assert.Equal(t, EmptyResponseSkip, module.emptyResponsePolicies["/api/test"]) +} + +// ============================================================================ +// Edge Case Tests +// ============================================================================ + +func TestPipeline_AllBackendsFail(t *testing.T) { + // When all backends fail, we should get a bad gateway + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "b1", URL: "http://localhost:1", Client: &http.Client{Timeout: 100 * time.Millisecond}}, // Will fail + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 5*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return nil, nil + }, + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) +} + +func TestFanOutMerge_SingleBackend(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{"single": true}) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "only", URL: backend.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + var data interface{} + json.Unmarshal(responses["only"], &data) + return MakeJSONResponse(http.StatusOK, data) + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + json.Unmarshal(body, &result) + assert.Equal(t, true, result["single"]) +} + +func TestFanOutMerge_MergerError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "b1", URL: backend.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + return nil, fmt.Errorf("intentional merger error") + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) +} + +func TestPipelineStrategy_ResponseMergerError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) + })) + defer backend.Close() + + backends := []*Backend{ + {ID: "b1", URL: backend.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyPipeline, 10*time.Second) + handler.SetPipelineConfig(&PipelineConfig{ + RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) { + return nil, nil + }, + ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) { + return nil, fmt.Errorf("intentional merger error") + }, + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) +} + +func TestFanOutMerge_AllBackendsFail_Returns502(t *testing.T) { + // When all backends fail (unreachable), executeFanOutMerge should return 502 + backends := []*Backend{ + {ID: "b1", URL: "http://127.0.0.1:1", Client: &http.Client{Timeout: 50 * time.Millisecond}}, + {ID: "b2", URL: "http://127.0.0.1:1", Client: &http.Client{Timeout: 50 * time.Millisecond}}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 5*time.Second) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Should never be called since all backends fail + return MakeJSONResponse(http.StatusOK, map[string]interface{}{"ok": true}) + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) +} + +func TestFanOutMerge_AllEmptyWithSkipPolicy_Returns502(t *testing.T) { + // When all responses are empty and skip-empty policy is set, return 502 + backendA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`null`)) // Empty/null + })) + defer backendA.Close() + + backendB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) // Empty object + })) + defer backendB.Close() + + backends := []*Backend{ + {ID: "a", URL: backendA.URL, Client: http.DefaultClient}, + {ID: "b", URL: backendB.URL, Client: http.DefaultClient}, + } + + handler := NewCompositeHandler(backends, StrategyFanOutMerge, 10*time.Second) + handler.SetEmptyResponsePolicy(EmptyResponseSkip) + handler.SetFanOutMerger(func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) { + // Should never be called since all responses are skipped + return MakeJSONResponse(http.StatusOK, map[string]interface{}{"ok": true}) + }) + + req := httptest.NewRequest("GET", "/test", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) +} + +func TestMakeJSONResponse_StatusFormat(t *testing.T) { + // Verify the status string is formatted per net/http conventions (e.g. "200 OK") + resp, err := MakeJSONResponse(http.StatusOK, map[string]interface{}{"ok": true}) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, "200 OK", resp.Status) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + resp2, err := MakeJSONResponse(http.StatusNotFound, map[string]interface{}{"error": "not found"}) + require.NoError(t, err) + defer resp2.Body.Close() + assert.Equal(t, "404 Not Found", resp2.Status) + assert.Equal(t, http.StatusNotFound, resp2.StatusCode) +} + +func TestCreateCompositeHandler_InvalidEmptyPolicy(t *testing.T) { + // createCompositeHandler should return an error for invalid empty_policy values + m := NewModule() + require.NotNil(t, m) + + cfg := &ReverseProxyConfig{ + DefaultBackend: "backend-a", + BackendServices: map[string]string{ + "backend-a": "http://localhost:9999", + "backend-b": "http://localhost:9998", + }, + CompositeRoutes: map[string]CompositeRoute{ + "/api/test": { + Pattern: "/api/test", + Backends: []string{"backend-a", "backend-b"}, + Strategy: "pipeline", + EmptyPolicy: "invalid-policy", + }, + }, + } + m.config = cfg + + route := cfg.CompositeRoutes["/api/test"] + _, err := m.createCompositeHandler(context.Background(), route, cfg) + require.Error(t, err) + assert.ErrorIs(t, err, ErrInvalidEmptyResponsePolicy) +} diff --git a/modules/reverseproxy/config.go b/modules/reverseproxy/config.go index 532feaca..6c8c15c8 100644 --- a/modules/reverseproxy/config.go +++ b/modules/reverseproxy/config.go @@ -96,6 +96,11 @@ type CompositeRoute struct { Backends []string `json:"backends" yaml:"backends" toml:"backends" env:"BACKENDS"` Strategy string `json:"strategy" yaml:"strategy" toml:"strategy" env:"STRATEGY"` + // EmptyPolicy defines how empty backend responses are handled. + // Valid values: "allow-empty" (default), "skip-empty", "fail-on-empty". + // This is used by pipeline and fan-out-merge strategies. + EmptyPolicy string `json:"empty_policy" yaml:"empty_policy" toml:"empty_policy" env:"EMPTY_POLICY"` + // FeatureFlagID is the ID of the feature flag that controls whether this composite route is enabled // If specified and the feature flag evaluates to false, this route will return 404 FeatureFlagID string `json:"feature_flag_id" yaml:"feature_flag_id" toml:"feature_flag_id" env:"FEATURE_FLAG_ID"` diff --git a/modules/reverseproxy/errors.go b/modules/reverseproxy/errors.go index baec962c..e9c4b151 100644 --- a/modules/reverseproxy/errors.go +++ b/modules/reverseproxy/errors.go @@ -40,8 +40,9 @@ var ( ErrNoSubjectForEventEmission = errors.New("no subject available for event emission") // Dynamic operation errors - ErrBackendIDRequired = errors.New("backend id required") - ErrServiceURLRequired = errors.New("service URL required") - ErrNoBackendsConfigured = errors.New("no backends configured") - ErrBackendNotConfigured = errors.New("backend not configured") + ErrBackendIDRequired = errors.New("backend id required") + ErrServiceURLRequired = errors.New("service URL required") + ErrNoBackendsConfigured = errors.New("no backends configured") + ErrBackendNotConfigured = errors.New("backend not configured") + ErrInvalidEmptyResponsePolicy = errors.New("invalid empty_policy: must be one of allow-empty, skip-empty, fail-on-empty") ) diff --git a/modules/reverseproxy/features/composite_pipeline.feature b/modules/reverseproxy/features/composite_pipeline.feature new file mode 100644 index 00000000..15565ae1 --- /dev/null +++ b/modules/reverseproxy/features/composite_pipeline.feature @@ -0,0 +1,37 @@ +Feature: Pipeline and Fan-Out-Merge Composite Strategies + As a developer building a multi-backend application + I want to chain backend requests and merge responses by ID + So that I can aggregate data from multiple services into unified responses + + Background: + Given I have a modular application with reverse proxy module configured + + Scenario: Pipeline strategy chains requests through multiple backends + Given I have a pipeline composite route with two backends + When I send a request to the pipeline route + Then the first backend should be called with the original request + And the second backend should receive data derived from the first response + And the final response should contain merged data from all stages + + Scenario: Fan-out-merge strategy merges responses by ID + Given I have a fan-out-merge composite route with two backends + When I send a request to the fan-out-merge route + Then both backends should be called in parallel + And the responses should be merged by matching IDs + And items with matching ancillary data should be enriched + + Scenario: Pipeline with empty response using skip policy + Given I have a pipeline route with skip-empty policy + When I send a request and a backend returns an empty response + Then the empty response should be excluded from the result + And the non-empty responses should still be present + + Scenario: Fan-out-merge with empty response using fail policy + Given I have a fan-out-merge route with fail-on-empty policy + When I send a request and a backend returns an empty response + Then the request should fail with a bad gateway error + + Scenario: Pipeline filters results using ancillary data + Given I have a pipeline route that filters by ancillary backend data + When I send a request to fetch filtered results + Then only items matching the ancillary criteria should be returned diff --git a/modules/reverseproxy/module.go b/modules/reverseproxy/module.go index 61c1cbac..a002ee67 100644 --- a/modules/reverseproxy/module.go +++ b/modules/reverseproxy/module.go @@ -73,6 +73,15 @@ type ReverseProxyModule struct { // Response transformers for composite routes (keyed by route pattern) responseTransformers map[string]ResponseTransformer + // Pipeline configurations for composite routes (keyed by route pattern) + pipelineConfigs map[string]*PipelineConfig + + // Fan-out merger functions for composite routes (keyed by route pattern) + fanOutMergers map[string]FanOutMerger + + // Empty response policies for composite routes (keyed by route pattern) + emptyResponsePolicies map[string]EmptyResponsePolicy + // Metrics collection metrics *MetricsCollector enableMetrics bool @@ -170,17 +179,20 @@ func NewModule() *ReverseProxyModule { // either in Constructor (if httpclient service is available) // or in Init (with default settings) module := &ReverseProxyModule{ - httpClient: nil, - backendProxies: make(map[string]*httputil.ReverseProxy), - backendRoutes: make(map[string]map[string]http.HandlerFunc), - compositeRoutes: make(map[string]http.HandlerFunc), - tenants: make(map[modular.TenantID]*ReverseProxyConfig), - tenantBackendProxies: make(map[modular.TenantID]map[string]*httputil.ReverseProxy), - preProxyTransforms: make(map[string]func(*http.Request)), - circuitBreakers: make(map[string]*CircuitBreaker), - enableMetrics: true, - loadBalanceCounters: make(map[string]int), - responseTransformers: make(map[string]ResponseTransformer), + httpClient: nil, + backendProxies: make(map[string]*httputil.ReverseProxy), + backendRoutes: make(map[string]map[string]http.HandlerFunc), + compositeRoutes: make(map[string]http.HandlerFunc), + tenants: make(map[modular.TenantID]*ReverseProxyConfig), + tenantBackendProxies: make(map[modular.TenantID]map[string]*httputil.ReverseProxy), + preProxyTransforms: make(map[string]func(*http.Request)), + circuitBreakers: make(map[string]*CircuitBreaker), + enableMetrics: true, + loadBalanceCounters: make(map[string]int), + responseTransformers: make(map[string]ResponseTransformer), + pipelineConfigs: make(map[string]*PipelineConfig), + fanOutMergers: make(map[string]FanOutMerger), + emptyResponsePolicies: make(map[string]EmptyResponsePolicy), } return module @@ -1565,6 +1577,27 @@ func (m *ReverseProxyModule) SetResponseTransformer(pattern string, transformer m.responseTransformers[pattern] = transformer } +// SetPipelineConfig sets the pipeline configuration for a specific composite route pattern. +// This is required for routes using the "pipeline" strategy. +// The PipelineConfig includes a RequestBuilder (to construct each subsequent request +// from previous responses) and an optional ResponseMerger (to assemble the final response). +func (m *ReverseProxyModule) SetPipelineConfig(pattern string, config PipelineConfig) { + m.pipelineConfigs[pattern] = &config +} + +// SetFanOutMerger sets the fan-out merger function for a specific composite route pattern. +// This is required for routes using the "fan-out-merge" strategy. +// The merger receives all parallel backend response bodies and produces a unified response. +func (m *ReverseProxyModule) SetFanOutMerger(pattern string, merger FanOutMerger) { + m.fanOutMergers[pattern] = merger +} + +// SetEmptyResponsePolicy sets the empty response policy for a specific composite route pattern. +// This controls how empty backend responses are handled in pipeline and fan-out-merge strategies. +func (m *ReverseProxyModule) SetEmptyResponsePolicy(pattern string, policy EmptyResponsePolicy) { + m.emptyResponsePolicies[pattern] = policy +} + // createReverseProxyForBackend creates a reverse proxy for a specific backend with per-backend configuration. func (m *ReverseProxyModule) createReverseProxyForBackend(ctx context.Context, target *url.URL, backendID string, endpoint string) *httputil.ReverseProxy { proxy := httputil.NewSingleHostReverseProxy(target)