Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions admin/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# - Phase B: V1 CRUD routes decomposed into pipeline primitives (db_query/db_exec)
# - Phase C: Missing DB tables added, broken routes replaced with working pipelines
# - Phase D: All remaining delegate steps given meaningful names
# - Phase D+: Pre-processing pipeline steps (validation, pagination, rate limiting) for domain routes

modules:
# --- HTTP Server (admin port) ---
Expand Down Expand Up @@ -266,6 +267,10 @@ workflows:
middlewares: [admin-cors, admin-auth-middleware]
pipeline:
steps:
- name: validate-body
type: step.validate_request_body
config:
required_fields: [config]
- name: validate-engine-config
type: step.delegate
config:
Expand All @@ -276,6 +281,17 @@ workflows:
middlewares: [admin-cors, admin-auth-middleware]
pipeline:
steps:
- name: rate-limit
type: step.rate_limit
config:
requests_per_minute: 5
burst_size: 2
key_from: "global"
- name: log-reload-attempt
type: step.log
config:
level: warn
message: "Engine reload requested via admin API"
- name: reload-engine
type: step.delegate
config:
Expand Down Expand Up @@ -1226,6 +1242,12 @@ workflows:
type: step.request_parse
config:
path_params: [id]
- name: validate-id
type: step.validate_path_param
config:
params: [id]
format: uuid
source: "steps.parse-request.path_params"
- name: get-execution
type: step.db_query
config:
Expand Down Expand Up @@ -1261,6 +1283,18 @@ workflows:
type: step.request_parse
config:
path_params: [id]
- name: validate-id
type: step.validate_path_param
config:
params: [id]
format: uuid
source: "steps.parse-request.path_params"
- name: validate-pagination
type: step.validate_pagination
config:
max_limit: 100
default_limit: 50
default_page: 1
- name: list-steps
type: step.db_query
config:
Expand Down Expand Up @@ -1696,6 +1730,12 @@ workflows:
middlewares: [admin-cors, admin-auth-middleware]
pipeline:
steps:
- name: validate-pagination
type: step.validate_pagination
config:
max_limit: 100
default_limit: 20
default_page: 1
- name: list-all-executions
type: step.delegate
config:
Expand Down Expand Up @@ -1781,6 +1821,22 @@ workflows:
middlewares: [admin-cors, admin-auth-middleware]
pipeline:
steps:
- name: parse-request
type: step.request_parse
config:
path_params: [id]
- name: validate-id
type: step.validate_path_param
config:
params: [id]
format: uuid
source: "steps.parse-request.path_params"
- name: rate-limit
type: step.rate_limit
config:
requests_per_minute: 30
burst_size: 5
key_from: "global"
- name: retry-dlq-entry
type: step.delegate
config:
Expand All @@ -1791,6 +1847,22 @@ workflows:
middlewares: [admin-cors, admin-auth-middleware]
pipeline:
steps:
- name: parse-request
type: step.request_parse
config:
path_params: [id]
- name: validate-id
type: step.validate_path_param
config:
params: [id]
format: uuid
source: "steps.parse-request.path_params"
- name: rate-limit
type: step.rate_limit
config:
requests_per_minute: 30
burst_size: 5
key_from: "global"
- name: discard-dlq-entry
type: step.delegate
config:
Expand Down
1 change: 0 additions & 1 deletion module/command_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) {
t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code)
}
}

109 changes: 109 additions & 0 deletions module/pipeline_step_validate_pagination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package module

import (
"context"
"fmt"
"net/http"
"strconv"

"github.com/CrisisTextLine/modular"
)

// ValidatePaginationStep validates and normalises page/limit query parameters.
// It reads from the HTTP request in pipeline metadata and outputs resolved
// pagination values with defaults applied.
type ValidatePaginationStep struct {
name string
maxLimit int
defaultLimit int
defaultPage int
}

// NewValidatePaginationStepFactory returns a StepFactory that creates ValidatePaginationStep instances.
func NewValidatePaginationStepFactory() StepFactory {
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
maxLimit := 100
if v, ok := config["max_limit"]; ok {
switch val := v.(type) {
case int:
maxLimit = val
case float64:
maxLimit = int(val)
}
}
if maxLimit <= 0 {
return nil, fmt.Errorf("validate_pagination step %q: max_limit must be positive", name)
}

defaultLimit := 20
if v, ok := config["default_limit"]; ok {
switch val := v.(type) {
case int:
defaultLimit = val
case float64:
defaultLimit = int(val)
}
}

defaultPage := 1
if v, ok := config["default_page"]; ok {
switch val := v.(type) {
case int:
defaultPage = val
case float64:
defaultPage = int(val)
}
}

return &ValidatePaginationStep{
name: name,
maxLimit: maxLimit,
defaultLimit: defaultLimit,
defaultPage: defaultPage,
}, nil
}
}

// Name returns the step name.
func (s *ValidatePaginationStep) Name() string { return s.name }

// Execute reads page and limit query parameters from the HTTP request,
// validates their ranges, applies defaults, and outputs the resolved values.
func (s *ValidatePaginationStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
page := s.defaultPage
limit := s.defaultLimit

req, _ := pc.Metadata["_http_request"].(*http.Request)
if req != nil {
q := req.URL.Query()

if pageStr := q.Get("page"); pageStr != "" {
p, err := strconv.Atoi(pageStr)
if err != nil || p < 1 {
return nil, fmt.Errorf("validate_pagination step %q: invalid page parameter %q — must be a positive integer", s.name, pageStr)
}
page = p
}

if limitStr := q.Get("limit"); limitStr != "" {
l, err := strconv.Atoi(limitStr)
if err != nil || l < 1 {
return nil, fmt.Errorf("validate_pagination step %q: invalid limit parameter %q — must be a positive integer", s.name, limitStr)
}
if l > s.maxLimit {
return nil, fmt.Errorf("validate_pagination step %q: limit %d exceeds maximum %d", s.name, l, s.maxLimit)
}
limit = l
}
}

offset := (page - 1) * limit

return &StepResult{
Output: map[string]any{
"page": page,
"limit": limit,
"offset": offset,
},
}, nil
}
121 changes: 121 additions & 0 deletions module/pipeline_step_validate_pagination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package module

import (
"context"
"net/http"
"net/http/httptest"
"testing"
)

func TestValidatePaginationStepFactory(t *testing.T) {
factory := NewValidatePaginationStepFactory()

t.Run("defaults", func(t *testing.T) {
step, err := factory("test", map[string]any{}, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if step.Name() != "test" {
t.Fatalf("expected name 'test', got %q", step.Name())
}
})

t.Run("invalid max_limit", func(t *testing.T) {
_, err := factory("test", map[string]any{"max_limit": 0}, nil)
if err == nil {
t.Fatal("expected error for zero max_limit")
}
})
}

func TestValidatePaginationStep_Execute(t *testing.T) {
factory := NewValidatePaginationStepFactory()

t.Run("defaults applied", func(t *testing.T) {
step, _ := factory("test", map[string]any{}, nil)
req := httptest.NewRequest(http.MethodGet, "/test", nil)
pc := &PipelineContext{
Metadata: map[string]any{"_http_request": req},
Current: map[string]any{},
}

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Output["page"] != 1 {
t.Fatalf("expected page=1, got %v", result.Output["page"])
}
if result.Output["limit"] != 20 {
t.Fatalf("expected limit=20, got %v", result.Output["limit"])
}
if result.Output["offset"] != 0 {
t.Fatalf("expected offset=0, got %v", result.Output["offset"])
}
})

t.Run("custom values", func(t *testing.T) {
step, _ := factory("test", map[string]any{"max_limit": 50}, nil)
req := httptest.NewRequest(http.MethodGet, "/test?page=3&limit=10", nil)
pc := &PipelineContext{
Metadata: map[string]any{"_http_request": req},
Current: map[string]any{},
}

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Output["page"] != 3 {
t.Fatalf("expected page=3, got %v", result.Output["page"])
}
if result.Output["limit"] != 10 {
t.Fatalf("expected limit=10, got %v", result.Output["limit"])
}
if result.Output["offset"] != 20 {
t.Fatalf("expected offset=20, got %v", result.Output["offset"])
}
})

t.Run("invalid page", func(t *testing.T) {
step, _ := factory("test", map[string]any{}, nil)
req := httptest.NewRequest(http.MethodGet, "/test?page=-1", nil)
pc := &PipelineContext{
Metadata: map[string]any{"_http_request": req},
Current: map[string]any{},
}

_, err := step.Execute(context.Background(), pc)
if err == nil {
t.Fatal("expected error for negative page")
}
})

t.Run("limit exceeds max", func(t *testing.T) {
step, _ := factory("test", map[string]any{"max_limit": 50}, nil)
req := httptest.NewRequest(http.MethodGet, "/test?limit=200", nil)
pc := &PipelineContext{
Metadata: map[string]any{"_http_request": req},
Current: map[string]any{},
}

_, err := step.Execute(context.Background(), pc)
if err == nil {
t.Fatal("expected error for limit exceeding max")
}
})

t.Run("non-numeric limit", func(t *testing.T) {
step, _ := factory("test", map[string]any{}, nil)
req := httptest.NewRequest(http.MethodGet, "/test?limit=abc", nil)
pc := &PipelineContext{
Metadata: map[string]any{"_http_request": req},
Current: map[string]any{},
}

_, err := step.Execute(context.Background(), pc)
if err == nil {
t.Fatal("expected error for non-numeric limit")
}
})
}
Loading
Loading