Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/reverse-proxy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ reverseproxy:
# Custom Transformer Backends
profile-backend: "http://localhost:9013"
analytics-backend: "http://localhost:9014"

# Pipeline Strategy Backends
conversations-backend: "http://localhost:9015"
followup-backend: "http://localhost:9016"

# Fan-Out-Merge Strategy Backends
tickets-backend: "http://localhost:9017"
assignments-backend: "http://localhost:9018"

default_backend: "global-default"
tenant_id_header: "X-Tenant-ID"
Expand Down Expand Up @@ -100,6 +108,29 @@ reverseproxy:
- "analytics-backend"
strategy: "merge" # Strategy is set, but transformer overrides merge behavior

# STRATEGY 4: PIPELINE
# Executes backends sequentially where each stage's response informs the next request.
# Use case: A list page showing queued conversations. Backend A returns conversation
# details, those IDs are fed into Backend B to fetch follow-up information,
# and the responses are merged into a unified view.
"/api/composite/pipeline":
pattern: "/api/composite/pipeline"
backends:
- "conversations-backend"
- "followup-backend"
strategy: "pipeline"

# STRATEGY 5: FAN-OUT-MERGE
# Executes all backends in parallel, then merges responses by matching IDs.
# Use case: A ticket dashboard where tickets come from one service and
# assignment/priority data comes from another. The merger correlates by ticket ID.
"/api/composite/fanout-merge":
pattern: "/api/composite/fanout-merge"
backends:
- "tickets-backend"
- "assignments-backend"
strategy: "fan-out-merge"

# ChiMux router configuration
chimux:
basepath: ""
Expand Down
199 changes: 199 additions & 0 deletions examples/reverse-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"time"

"github.com/CrisisTextLine/modular"
Expand Down Expand Up @@ -168,6 +170,118 @@ func main() {
return resp, nil
})

// PIPELINE STRATEGY EXAMPLE:
// This demonstrates chained backend requests where backend B's request is constructed
// using data from backend A's response. This is the map/reduce pattern.
//
// Use case: A list page shows queued conversations. Backend A returns conversation details,
// then those conversation IDs are fed into Backend B to fetch follow-up information.
// The responses are then merged to produce a unified view.
proxyModule.SetPipelineConfig("/api/composite/pipeline", reverseproxy.PipelineConfig{
RequestBuilder: func(ctx context.Context, originalReq *http.Request, previousResponses map[string][]byte, nextBackendID string) (*http.Request, error) {
if nextBackendID == "followup-backend" {
// Extract conversation IDs from the conversations backend response
var convResp struct {
Conversations []struct {
ID string `json:"id"`
} `json:"conversations"`
}
if body, ok := previousResponses["conversations-backend"]; ok {
if err := json.Unmarshal(body, &convResp); err != nil {
return nil, fmt.Errorf("failed to parse conversations: %w", err)
}
}

// Build the follow-up request with those IDs
ids := make([]string, 0, len(convResp.Conversations))
for _, c := range convResp.Conversations {
ids = append(ids, c.ID)
}
idsParam := ""
for i, id := range ids {
if i > 0 {
idsParam += ","
}
idsParam += id
}

url := "http://localhost:9016/followups?ids=" + idsParam
return http.NewRequestWithContext(ctx, "GET", url, nil)
}
return nil, fmt.Errorf("unknown pipeline backend: %s", nextBackendID)
},
ResponseMerger: func(ctx context.Context, originalReq *http.Request, allResponses map[string][]byte) (*http.Response, error) {
// Parse conversations
var convResp struct {
Conversations []map[string]interface{} `json:"conversations"`
}
if body, ok := allResponses["conversations-backend"]; ok {
json.Unmarshal(body, &convResp)
}

// Parse follow-ups
var fuResp struct {
FollowUps map[string]interface{} `json:"follow_ups"`
}
if body, ok := allResponses["followup-backend"]; ok {
json.Unmarshal(body, &fuResp)
}

// Merge follow-up data into each conversation
for i, conv := range convResp.Conversations {
if id, ok := conv["id"].(string); ok {
if fu, exists := fuResp.FollowUps[id]; exists {
convResp.Conversations[i]["follow_up"] = fu
}
}
}

return reverseproxy.MakeJSONResponse(http.StatusOK, map[string]interface{}{
"conversations": convResp.Conversations,
"strategy": "pipeline",
})
},
})

// FAN-OUT-MERGE STRATEGY EXAMPLE:
// This demonstrates parallel requests to multiple backends with custom ID-based
// response merging. Both backends are called simultaneously, then their responses
// are correlated by matching IDs.
//
// Use case: Show a ticket dashboard where tickets come from one service and
// priority/assignment data comes from another. The merger matches by ticket ID.
proxyModule.SetFanOutMerger("/api/composite/fanout-merge", func(ctx context.Context, originalReq *http.Request, responses map[string][]byte) (*http.Response, error) {
// Parse tickets from the tickets backend
var ticketsResp struct {
Tickets []map[string]interface{} `json:"tickets"`
}
if body, ok := responses["tickets-backend"]; ok {
json.Unmarshal(body, &ticketsResp)
}

// Parse assignments from the assignments backend
var assignResp struct {
Assignments map[string]interface{} `json:"assignments"`
}
if body, ok := responses["assignments-backend"]; ok {
json.Unmarshal(body, &assignResp)
}

// Merge assignments into tickets by ID
for i, ticket := range ticketsResp.Tickets {
if id, ok := ticket["id"].(string); ok {
if assignment, exists := assignResp.Assignments[id]; exists {
ticketsResp.Tickets[i]["assignment"] = assignment
}
}
}

return reverseproxy.MakeJSONResponse(http.StatusOK, map[string]interface{}{
"tickets": ticketsResp.Tickets,
"strategy": "fan-out-merge",
})
})

app.RegisterModule(proxyModule)
app.RegisterModule(httpserver.NewHTTPServerModule())

Expand Down Expand Up @@ -403,4 +517,89 @@ func startMockBackends() {
fmt.Printf("Backend server error on :9014: %v\n", err)
}
}()

// ========================================
// Backends for PIPELINE strategy demonstration
// ========================================

// Conversations backend (port 9015) - Returns queued conversations
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"conversations":[{"id":"conv-1","status":"queued","counselor":"Alice","created_at":"2024-01-01T10:00:00Z"},{"id":"conv-2","status":"queued","counselor":"Bob","created_at":"2024-01-01T10:05:00Z"},{"id":"conv-3","status":"active","counselor":"Carol","created_at":"2024-01-01T10:10:00Z"}]}`)
})
fmt.Println("Starting conversations-backend (pipeline demo) on :9015")
if err := http.ListenAndServe(":9015", mux); err != nil { //nolint:gosec
fmt.Printf("Backend server error on :9015: %v\n", err)
}
}()

// Follow-up backend (port 9016) - Returns follow-up details for given conversation IDs
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
idsParam := r.URL.Query().Get("ids")
followUps := make(map[string]interface{})
if idsParam != "" {
for _, id := range strings.Split(idsParam, ",") {
switch id {
case "conv-1":
followUps[id] = map[string]interface{}{
"is_follow_up": true,
"original_conv_id": "conv-50",
"follow_up_count": 2,
}
case "conv-3":
followUps[id] = map[string]interface{}{
"is_follow_up": true,
"original_conv_id": "conv-90",
"follow_up_count": 1,
}
}
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
resp, _ := json.Marshal(map[string]interface{}{"follow_ups": followUps})
w.Write(resp) //nolint:errcheck
})
fmt.Println("Starting followup-backend (pipeline demo) on :9016")
if err := http.ListenAndServe(":9016", mux); err != nil { //nolint:gosec
fmt.Printf("Backend server error on :9016: %v\n", err)
}
}()

// ========================================
// Backends for FAN-OUT-MERGE strategy demonstration
// ========================================

// Tickets backend (port 9017) - Returns support tickets
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"tickets":[{"id":"ticket-1","subject":"Login issue","status":"open","created":"2024-01-15"},{"id":"ticket-2","subject":"Billing question","status":"open","created":"2024-01-16"},{"id":"ticket-3","subject":"Feature request","status":"pending","created":"2024-01-17"}]}`)
})
fmt.Println("Starting tickets-backend (fan-out-merge demo) on :9017")
if err := http.ListenAndServe(":9017", mux); err != nil { //nolint:gosec
fmt.Printf("Backend server error on :9017: %v\n", err)
}
}()

// Assignments backend (port 9018) - Returns ticket assignments and priorities
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"assignments":{"ticket-1":{"assignee":"Alice","priority":"high","sla_deadline":"2024-01-16T12:00:00Z"},"ticket-3":{"assignee":"Bob","priority":"low","sla_deadline":"2024-01-20T12:00:00Z"}}}`)
})
fmt.Println("Starting assignments-backend (fan-out-merge demo) on :9018")
if err := http.ListenAndServe(":9018", mux); err != nil { //nolint:gosec
fmt.Printf("Backend server error on :9018: %v\n", err)
}
}()
}
Loading
Loading