diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index bf4b154e..806b80cc 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -100,6 +100,22 @@ All modules are registered in `engine.go` and instantiated from YAML config. Org | `step.db_query` | Executes parameterized SQL SELECT queries against a named database | | `step.db_exec` | Executes parameterized SQL INSERT/UPDATE/DELETE against a named database | | `step.json_response` | Writes HTTP JSON response with custom status code and headers | +| `step.jq` | Applies a JQ expression to pipeline data for complex transformations | +| `step.ai_complete` | AI text completion using a configured provider | +| `step.ai_classify` | AI text classification into named categories | +| `step.ai_extract` | AI structured data extraction using tool use or prompt-based parsing | + +### CI/CD Pipeline Steps +| Type | Description | +|------|-------------| +| `step.docker_build` | Builds a Docker image from a context directory and Dockerfile | +| `step.docker_push` | Pushes a Docker image to a remote registry | +| `step.docker_run` | Runs a command inside a Docker container via sandbox | +| `step.scan_sast` | Static Application Security Testing (SAST) via configurable scanner | +| `step.scan_container` | Container image vulnerability scanning via Trivy | +| `step.scan_deps` | Dependency vulnerability scanning via Grype | +| `step.artifact_push` | Stores a file in the artifact store for cross-step sharing | +| `step.artifact_pull` | Retrieves an artifact from a prior execution, URL, or S3 | ### Template Functions @@ -115,6 +131,14 @@ Pipeline steps support Go template syntax with these built-in functions: Template expressions can reference previous step outputs via `{{ .steps.step-name.field }}` or for hyphenated names `{{index .steps "step-name" "field"}}`. +### Infrastructure +| Type | Description | +|------|-------------| +| `license.validator` | License key validation against a remote server with caching and grace period | +| `platform.provider` | Cloud infrastructure provider declaration (e.g., Terraform, Pulumi) | +| `platform.resource` | Infrastructure resource managed by a platform provider | +| `platform.context` | Execution context for platform operations (org, environment, tier) | + ### Observability | Type | Description | |------|-------------| @@ -161,6 +185,548 @@ Template expressions can reference previous step outputs via `{{ .steps.step-nam | `data.transformer` | Data transformation | | `workflow.registry` | Workflow registration and discovery | +## Module Type Reference + +Detailed configuration reference for module types not covered in the main table above. + +### Audit Logging (`audit/`) + +The `audit/` package provides a structured JSON audit logger for recording security-relevant events. It is used internally by the engine and admin platform -- not a YAML module type, but rather a Go library used by other modules. + +**Event types:** `auth`, `auth_failure`, `admin_op`, `escalation`, `data_access`, `config_change`, `component_op` + +Each audit event is written as a single JSON line containing `timestamp`, `type`, `action`, `actor`, `resource`, `detail`, `source_ip`, `success`, and `metadata` fields. + +--- + +### `license.validator` + +Validates license keys against a remote server with local caching and an offline grace period. When no `server_url` is configured the module operates in offline/starter mode and synthesizes a valid starter-tier license locally. + +**Configuration:** + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `server_url` | string | `""` | License validation server URL. Leave empty for offline/starter mode. | +| `license_key` | string | `""` | License key. Supports `$ENV_VAR` expansion. Falls back to `WORKFLOW_LICENSE_KEY` env var. | +| `cache_ttl` | duration | `1h` | How long to cache a valid license result before re-validating. | +| `grace_period` | duration | `72h` | How long to allow operation when the license server is unreachable. | +| `refresh_interval` | duration | `1h` | How often the background goroutine re-validates the license. | + +**Outputs:** Provides the `license-validator` service (`LicenseValidator`). + +**Example:** + +```yaml +modules: + - name: license + type: license.validator + config: + server_url: "https://license.gocodalone.com/api/v1" + license_key: "$WORKFLOW_LICENSE_KEY" + cache_ttl: "1h" + grace_period: "72h" + refresh_interval: "1h" +``` + +--- + +### `platform.provider` + +Declares a cloud infrastructure provider (e.g., Terraform, Pulumi) for use with the platform workflow handler and reconciliation trigger. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `name` | string | yes | Provider name used to construct the service name `platform.provider.`. | + +**Example:** + +```yaml +modules: + - name: cloud-provider + type: platform.provider + config: + name: "aws" +``` + +--- + +### `platform.resource` + +Declares an infrastructure resource managed by a platform provider. Config keys are provider-specific and passed through as-is. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `type` | string | yes | Infrastructure resource type (e.g., `database`, `queue`, `container_runtime`). | +| *(additional keys)* | any | no | Provider-specific resource properties. | + +**Example:** + +```yaml +modules: + - name: orders-db + type: platform.resource + config: + type: database + engine: postgresql + storage: "10Gi" +``` + +--- + +### `platform.context` + +Provides the execution context for platform operations. Used to identify the organization, environment, and tier for a deployment. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `path` | string | yes | Path identifying this context. | +| `org` | string | no | Organization name. | +| `environment` | string | no | Deployment environment (e.g., `production`, `staging`). | +| `tier` | number | no | Platform tier level. | + +**Example:** + +```yaml +modules: + - name: platform-ctx + type: platform.context + config: + path: "acme-corp/production" + org: "acme-corp" + environment: "production" + tier: 3 +``` + +--- + +### `observability.otel` + +Initializes an OpenTelemetry distributed tracing provider that exports spans via OTLP/HTTP to a collector. Sets the global OTel tracer provider so all instrumented code in the process is covered. + +**Configuration:** + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `endpoint` | string | `localhost:4318` | OTLP collector endpoint (host:port). | +| `serviceName` | string | `workflow` | Service name used for trace attribution. | + +**Outputs:** Provides the `tracer` service (`trace.Tracer`). + +**Example:** + +```yaml +modules: + - name: tracing + type: observability.otel + config: + endpoint: "otel-collector:4318" + serviceName: "order-api" +``` + +--- + +### `step.jq` + +Applies a JQ expression to pipeline data for complex transformations. Uses the `gojq` pure-Go JQ implementation, supporting the full JQ language: field access, pipes, `map`/`select`, object construction, arithmetic, conditionals, and more. + +The expression is compiled at startup so syntax errors are caught early. When the result is a single object, its keys are merged into the step output so downstream steps can access fields directly. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `expression` | string | yes | JQ expression to evaluate. | +| `input_from` | string | no | Dotted path to the input value (e.g., `steps.fetch.items`). Defaults to the full current pipeline context. | + +**Output fields:** `result` — the JQ result. When the result is a single object, its keys are also promoted to the top level. + +**Example:** + +```yaml +steps: + - name: extract-active + type: step.jq + config: + input_from: "steps.fetch-users.users" + expression: "[.[] | select(.active == true) | {id, email}]" +``` + +--- + +### `step.ai_complete` + +Invokes an AI provider to produce a text completion. Provider resolution order: explicit `provider` name, then model-based lookup, then first registered provider. + +**Configuration:** + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `provider` | string | `""` | Named AI provider to use. Omit to auto-select. | +| `model` | string | `""` | Model name (e.g., `claude-3-5-sonnet-20241022`). Used for provider lookup if `provider` is unset. | +| `system_prompt` | string | `""` | System prompt. Supports Go template syntax with pipeline context. | +| `input_from` | string | `""` | Template expression to resolve the user message (e.g., `.body`). Falls back to `text` or `body` fields in current context. | +| `max_tokens` | number | `1024` | Maximum tokens in the completion. | +| `temperature` | number | `0` | Sampling temperature (0.0–1.0). | + +**Output fields:** `content`, `model`, `finish_reason`, `usage.input_tokens`, `usage.output_tokens`. + +**Example:** + +```yaml +steps: + - name: summarize + type: step.ai_complete + config: + model: "claude-3-5-haiku-20241022" + system_prompt: "You are a helpful assistant. Summarize the following text concisely." + input_from: ".body" + max_tokens: 512 +``` + +--- + +### `step.ai_classify` + +Classifies input text into one of a configured set of categories using an AI provider. Returns the winning category, a confidence score (0.0–1.0), and brief reasoning. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `categories` | array of strings | yes | List of valid classification categories. | +| `provider` | string | no | Named AI provider. Auto-selected if omitted. | +| `model` | string | no | Model name for provider lookup. | +| `input_from` | string | no | Template expression for the input text. Falls back to `text` or `body` fields. | +| `max_tokens` | number | `256` | Maximum tokens for the classification response. | +| `temperature` | number | `0` | Sampling temperature. | + +**Output fields:** `category`, `confidence`, `reasoning`, `raw`, `model`, `usage.input_tokens`, `usage.output_tokens`. + +**Example:** + +```yaml +steps: + - name: classify-ticket + type: step.ai_classify + config: + input_from: ".body" + categories: + - "billing" + - "technical-support" + - "account" + - "general-inquiry" +``` + +--- + +### `step.ai_extract` + +Extracts structured data from text using an AI provider. When the provider supports tool use, it uses the tool-calling API for reliable structured output. Otherwise it falls back to prompt-based JSON extraction. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `schema` | object | yes | JSON Schema object describing the fields to extract. | +| `provider` | string | no | Named AI provider. Auto-selected if omitted. | +| `model` | string | no | Model name for provider lookup. | +| `input_from` | string | no | Template expression for the input text. Falls back to `text` or `body` fields. | +| `max_tokens` | number | `1024` | Maximum tokens. | +| `temperature` | number | `0` | Sampling temperature. | + +**Output fields:** `extracted` (map of extracted fields), `method` (`tool_use`, `text_parse`, or `prompt`), `model`, `usage.input_tokens`, `usage.output_tokens`. + +**Example:** + +```yaml +steps: + - name: extract-order + type: step.ai_extract + config: + input_from: ".body" + schema: + type: object + properties: + customer_name: {type: string} + order_items: {type: array, items: {type: string}} + total_amount: {type: number} +``` + +--- + +### `step.docker_build` + +Builds a Docker image from a context directory and Dockerfile using the Docker SDK. The context directory is tar-archived and sent to the Docker daemon. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `context` | string | yes | Path to the build context directory. | +| `dockerfile` | string | `Dockerfile` | Dockerfile path relative to the context directory. | +| `tags` | array of strings | no | Image tags to apply (e.g., `["myapp:latest", "myapp:1.2.3"]`). | +| `build_args` | map | no | Build argument key/value pairs. | +| `cache_from` | array of strings | no | Image references to use as layer cache sources. | + +**Output fields:** `image_id`, `tags`, `context`. + +**Example:** + +```yaml +steps: + - name: build-image + type: step.docker_build + config: + context: "./src" + dockerfile: "Dockerfile" + tags: + - "myapp:latest" + build_args: + APP_VERSION: "1.2.3" +``` + +--- + +### `step.docker_push` + +Pushes a Docker image to a remote registry. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `image` | string | yes | Image name/tag to push. | +| `registry` | string | no | Registry hostname prefix (prepended to `image` when constructing the reference). | +| `auth_provider` | string | no | Named auth provider for registry credentials (informational; credentials are read from Docker daemon config). | + +**Output fields:** `image`, `registry`, `digest`, `auth_provider`. + +**Example:** + +```yaml +steps: + - name: push-image + type: step.docker_push + config: + image: "myapp:latest" + registry: "ghcr.io/myorg" +``` + +--- + +### `step.docker_run` + +Runs a command inside a Docker container using the sandbox. Returns exit code, stdout, and stderr. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `image` | string | yes | Docker image to run. | +| `command` | array of strings | no | Command to execute inside the container. Uses image default entrypoint if omitted. | +| `env` | map | no | Environment variables to set in the container. | +| `wait_for_exit` | boolean | `true` | Whether to wait for the container to exit. | +| `timeout` | duration | `""` | Maximum time to wait for the container. | + +**Output fields:** `exit_code`, `stdout`, `stderr`, `image`. + +**Example:** + +```yaml +steps: + - name: run-tests + type: step.docker_run + config: + image: "golang:1.25" + command: ["go", "test", "./..."] + env: + CI: "true" + timeout: "10m" +``` + +--- + +### `step.scan_sast` + +Runs a Static Application Security Testing (SAST) scanner inside a Docker container and evaluates findings against a severity gate. Supports Semgrep and generic scanner commands. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `scanner` | string | yes | Scanner to use. Supported: `semgrep`. Generic commands also accepted. | +| `image` | string | `semgrep/semgrep:latest` | Docker image for the scanner. | +| `source_path` | string | `/workspace` | Path to the source code to scan. | +| `rules` | array of strings | no | Semgrep rule configs to apply (e.g., `auto`, `p/owasp-top-ten`). | +| `fail_on_severity` | string | `error` | Minimum severity that causes the step to fail (`error`, `warning`, `info`). | +| `output_format` | string | `sarif` | Output format: `sarif` or `json`. | + +**Output fields:** `scan_result`, `command`, `image`. + +**Example:** + +```yaml +steps: + - name: sast-scan + type: step.scan_sast + config: + scanner: "semgrep" + source_path: "/workspace/src" + rules: + - "p/owasp-top-ten" + - "p/golang" + fail_on_severity: "error" +``` + +--- + +### `step.scan_container` + +Scans a container image for vulnerabilities using Trivy. Evaluates findings against a configurable severity threshold. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `target_image` | string | yes | Container image to scan (e.g., `myapp:latest`). | +| `scanner` | string | `trivy` | Scanner to use. | +| `severity_threshold` | string | `HIGH` | Minimum severity to report: `CRITICAL`, `HIGH`, `MEDIUM`, `LOW`, or `INFO`. | +| `ignore_unfixed` | boolean | `false` | Skip vulnerabilities without a known fix. | +| `output_format` | string | `sarif` | Output format: `sarif` or `json`. | + +**Output fields:** `scan_result`, `command`, `image`, `target_image`. + +**Example:** + +```yaml +steps: + - name: scan-image + type: step.scan_container + config: + target_image: "myapp:latest" + severity_threshold: "HIGH" + ignore_unfixed: true +``` + +--- + +### `step.scan_deps` + +Scans project dependencies for known vulnerabilities using Grype. Evaluates findings against a severity gate. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `scanner` | string | `grype` | Scanner to use. | +| `image` | string | `anchore/grype:latest` | Docker image for the scanner. | +| `source_path` | string | `/workspace` | Path to the project source to scan. | +| `fail_on_severity` | string | `high` | Minimum severity that causes the step to fail: `critical`, `high`, `medium`, `low`, or `info`. | +| `output_format` | string | `sarif` | Output format: `sarif` or `json`. | + +**Output fields:** `scan_result`, `command`, `image`. + +**Example:** + +```yaml +steps: + - name: dep-scan + type: step.scan_deps + config: + source_path: "/workspace" + fail_on_severity: "high" +``` + +--- + +### `step.artifact_push` + +Reads a file from `source_path` and stores it in the pipeline's artifact store. Computes a SHA-256 checksum of the artifact. Requires `artifact_store` and `execution_id` in pipeline metadata. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `source_path` | string | yes | Path to the file to store. | +| `key` | string | yes | Artifact key under which to store the file. | +| `dest` | string | `artifact_store` | Destination identifier (informational). | + +**Output fields:** `key`, `size`, `checksum`, `dest`. + +**Example:** + +```yaml +steps: + - name: upload-binary + type: step.artifact_push + config: + source_path: "./bin/server" + key: "server-binary" +``` + +--- + +### `step.artifact_pull` + +Retrieves an artifact from a prior execution, a URL, or S3 and writes it to a local destination path. + +**Configuration:** + +| Key | Type | Required | Description | +|-----|------|----------|-------------| +| `source` | string | yes | Source type: `previous_execution`, `url`, or `s3`. | +| `dest` | string | yes | Local file path to write the artifact to. | +| `key` | string | yes (for `previous_execution`, `s3`) | Artifact key to retrieve. | +| `execution_id` | string | no | Specific execution ID to pull from. Defaults to current execution. | +| `url` | string | yes (for `url`) | URL to fetch the artifact from. | + +**Output fields:** `source`, `key`, `dest`, `size`, `bytes_written`. + +**Example:** + +```yaml +steps: + - name: download-binary + type: step.artifact_pull + config: + source: "previous_execution" + key: "server-binary" + dest: "./bin/server" +``` + +--- + +### Admin Core Plugin (`plugin/admincore/`) + +The `admincore` plugin is a NativePlugin that registers the built-in admin UI page definitions. It declares no HTTP routes -- all views are rendered entirely in the React frontend. Registering this plugin ensures navigation is driven by the plugin system with no static fallbacks. + +**UI pages declared:** + +| ID | Label | Category | +|----|-------|----------| +| `dashboard` | Dashboard | global | +| `editor` | Editor | global | +| `marketplace` | Marketplace | global | +| `templates` | Templates | global | +| `environments` | Environments | global | +| `settings` | Settings | global | +| `executions` | Executions | workflow | +| `logs` | Logs | workflow | +| `events` | Events | workflow | + +Global pages appear in the main navigation. Workflow-scoped pages (`executions`, `logs`, `events`) are only shown when a workflow is open. + +The plugin is auto-registered via `init()` in `plugin/admincore/plugin.go`. No YAML configuration is required. + +--- + ## Workflow Types Workflows are configured in YAML and dispatched by the engine through registered handlers (`handlers/` package): diff --git a/admin/config.yaml b/admin/config.yaml index 3f64d3be..8da1acd6 100644 --- a/admin/config.yaml +++ b/admin/config.yaml @@ -42,6 +42,20 @@ modules: requestsPerMinute: 120 burstSize: 20 + # Strict per-IP rate limiter for login: 10 attempts/minute, burst 10 + - name: auth-login-ratelimit + type: http.middleware.ratelimit + config: + requestsPerMinute: 10 + burstSize: 10 + + # Strict per-IP rate limiter for registration: 5 attempts/hour, burst 5 + - name: auth-register-ratelimit + type: http.middleware.ratelimit + config: + requestsPerHour: 5 + burstSize: 5 + # --- Data Layer --- - name: admin-db type: storage.sqlite @@ -143,14 +157,16 @@ workflows: middlewares: [admin-cors, admin-ratelimit] # === Auth (unauthenticated) === + # login: strict per-IP rate limit (10/minute) to defend against brute-force - method: POST path: "/api/v1/auth/login" handler: admin-auth - middlewares: [admin-cors, admin-ratelimit] + middlewares: [admin-cors, auth-login-ratelimit] + # register: strict per-IP rate limit (5/hour) to defend against account enumeration - method: POST path: "/api/v1/auth/register" handler: admin-auth - middlewares: [admin-cors, admin-ratelimit] + middlewares: [admin-cors, auth-register-ratelimit] - method: POST path: "/api/v1/auth/refresh" handler: admin-auth diff --git a/module/api_gateway.go b/module/api_gateway.go index e4090020..1a03d09d 100644 --- a/module/api_gateway.go +++ b/module/api_gateway.go @@ -48,6 +48,10 @@ type AuthConfig struct { // APIGateway is a composable gateway module that combines routing, auth, // rate limiting, and proxying into a single module. +// +// Each APIGateway instance maintains its own independent rate limiter state. +// Rate limiters are never shared across instances, so multiple APIGateway +// instances (e.g. in multi-tenant deployments) do not interfere with each other. type APIGateway struct { name string routes []GatewayRoute @@ -55,10 +59,10 @@ type APIGateway struct { auth *AuthConfig // internal state - sortedRoutes []GatewayRoute // sorted by prefix length (longest first) - proxies map[string]*httputil.ReverseProxy - rateLimiters map[string]*gatewayRateLimiter // keyed by path prefix - globalLimiter *gatewayRateLimiter + sortedRoutes []GatewayRoute // sorted by prefix length (longest first) + proxies map[string]*httputil.ReverseProxy + rateLimiters map[string]*gatewayRateLimiter // keyed by path prefix + instanceRateLimiter *gatewayRateLimiter // instance-scoped limiter applied before per-route limits } // gatewayRateLimiter is a simple per-client token bucket limiter for the gateway. @@ -90,13 +94,36 @@ func (rl *gatewayRateLimiter) allow(clientIP string) bool { return bucket.allow() } -// NewAPIGateway creates a new APIGateway module. -func NewAPIGateway(name string) *APIGateway { - return &APIGateway{ +// APIGatewayOption is a functional option for configuring an APIGateway at construction time. +type APIGatewayOption func(*APIGateway) + +// WithRateLimit sets an instance-level rate limit applied to all requests before per-route +// limits are checked. The limiter is scoped to this APIGateway instance and does not affect +// any other instance. +func WithRateLimit(cfg *RateLimitConfig) APIGatewayOption { + return func(g *APIGateway) { + if cfg != nil && cfg.RequestsPerMinute > 0 { + burst := cfg.BurstSize + if burst <= 0 { + burst = cfg.RequestsPerMinute + } + g.instanceRateLimiter = newGatewayRateLimiter(cfg.RequestsPerMinute, burst) + } + } +} + +// NewAPIGateway creates a new APIGateway module. Optional functional options can be +// provided to configure the instance at construction time (e.g. WithRateLimit). +func NewAPIGateway(name string, opts ...APIGatewayOption) *APIGateway { + g := &APIGateway{ name: name, proxies: make(map[string]*httputil.ReverseProxy), rateLimiters: make(map[string]*gatewayRateLimiter), } + for _, opt := range opts { + opt(g) + } + return g } // SetRoutes configures the gateway routes. @@ -146,17 +173,27 @@ func (g *APIGateway) SetRoutes(routes []GatewayRoute) error { return nil } -// SetGlobalRateLimit configures a global rate limit applied to all routes. -func (g *APIGateway) SetGlobalRateLimit(cfg *RateLimitConfig) { +// SetRateLimit configures an instance-level rate limit applied to all routes on this gateway. +// The limiter is scoped to this APIGateway instance and does not affect any other instance. +// Prefer injecting rate limit config via WithRateLimit at construction time when possible. +func (g *APIGateway) SetRateLimit(cfg *RateLimitConfig) { if cfg != nil && cfg.RequestsPerMinute > 0 { burst := cfg.BurstSize if burst <= 0 { burst = cfg.RequestsPerMinute } - g.globalLimiter = newGatewayRateLimiter(cfg.RequestsPerMinute, burst) + g.instanceRateLimiter = newGatewayRateLimiter(cfg.RequestsPerMinute, burst) } } +// SetGlobalRateLimit is deprecated: use SetRateLimit instead. +// The rate limiter has always been instance-scoped; this method was misleadingly named. +// +// Deprecated: Use SetRateLimit. +func (g *APIGateway) SetGlobalRateLimit(cfg *RateLimitConfig) { + g.SetRateLimit(cfg) +} + // SetCORS configures CORS settings. func (g *APIGateway) SetCORS(cfg *CORSConfig) { g.cors = cfg @@ -214,9 +251,9 @@ func (g *APIGateway) Handle(w http.ResponseWriter, r *http.Request) { clientIP := extractClientIP(r) - // Global rate limiting - if g.globalLimiter != nil { - if !g.globalLimiter.allow(clientIP) { + // Instance-level rate limiting (applied before per-route limits) + if g.instanceRateLimiter != nil { + if !g.instanceRateLimiter.allow(clientIP) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusTooManyRequests) _ = json.NewEncoder(w).Encode(map[string]string{ diff --git a/module/api_gateway_test.go b/module/api_gateway_test.go index 1941e691..da1f0876 100644 --- a/module/api_gateway_test.go +++ b/module/api_gateway_test.go @@ -219,14 +219,14 @@ func TestAPIGateway_CORS(t *testing.T) { } } -func TestAPIGateway_GlobalRateLimit(t *testing.T) { +func TestAPIGateway_InstanceRateLimit_SetRateLimit(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) })) defer backend.Close() gw := NewAPIGateway("gw") - gw.SetGlobalRateLimit(&RateLimitConfig{RequestsPerMinute: 60, BurstSize: 2}) + gw.SetRateLimit(&RateLimitConfig{RequestsPerMinute: 60, BurstSize: 2}) _ = gw.SetRoutes([]GatewayRoute{ {PathPrefix: "/api", Backend: backend.URL}, }) @@ -252,6 +252,75 @@ func TestAPIGateway_GlobalRateLimit(t *testing.T) { } } +func TestAPIGateway_InstanceRateLimit_WithRateLimit(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + gw := NewAPIGateway("gw", WithRateLimit(&RateLimitConfig{RequestsPerMinute: 60, BurstSize: 1})) + _ = gw.SetRoutes([]GatewayRoute{ + {PathPrefix: "/api", Backend: backend.URL}, + }) + + // First should succeed (burst=1) + req := httptest.NewRequest("GET", "/api/test", nil) + req.RemoteAddr = "10.0.0.2:1234" + w := httptest.NewRecorder() + gw.Handle(w, req) + if w.Code != http.StatusOK { + t.Errorf("first request expected 200, got %d", w.Code) + } + + // Second should be rate limited + req = httptest.NewRequest("GET", "/api/test", nil) + req.RemoteAddr = "10.0.0.2:1234" + w = httptest.NewRecorder() + gw.Handle(w, req) + if w.Code != http.StatusTooManyRequests { + t.Errorf("expected 429, got %d", w.Code) + } +} + +func TestAPIGateway_InstanceRateLimiters_AreIsolated(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + cfg := &RateLimitConfig{RequestsPerMinute: 60, BurstSize: 1} + gw1 := NewAPIGateway("gw1", WithRateLimit(cfg)) + gw2 := NewAPIGateway("gw2", WithRateLimit(cfg)) + _ = gw1.SetRoutes([]GatewayRoute{{PathPrefix: "/api", Backend: backend.URL}}) + _ = gw2.SetRoutes([]GatewayRoute{{PathPrefix: "/api", Backend: backend.URL}}) + + // Exhaust gw1's burst for this client + req := httptest.NewRequest("GET", "/api/test", nil) + req.RemoteAddr = "10.0.0.3:1234" + w := httptest.NewRecorder() + gw1.Handle(w, req) + if w.Code != http.StatusOK { + t.Errorf("gw1 first request expected 200, got %d", w.Code) + } + + req = httptest.NewRequest("GET", "/api/test", nil) + req.RemoteAddr = "10.0.0.3:1234" + w = httptest.NewRecorder() + gw1.Handle(w, req) + if w.Code != http.StatusTooManyRequests { + t.Errorf("gw1 second request expected 429, got %d", w.Code) + } + + // gw2 should be unaffected — its burst is independent + req = httptest.NewRequest("GET", "/api/test", nil) + req.RemoteAddr = "10.0.0.3:1234" + w = httptest.NewRecorder() + gw2.Handle(w, req) + if w.Code != http.StatusOK { + t.Errorf("gw2 should be isolated from gw1; expected 200, got %d", w.Code) + } +} + func TestAPIGateway_PerRouteRateLimit(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/module/http_middleware.go b/module/http_middleware.go index 89665ec2..484ab696 100644 --- a/module/http_middleware.go +++ b/module/http_middleware.go @@ -33,6 +33,7 @@ const ( type RateLimitMiddleware struct { name string requestsPerMinute int + ratePerMinute float64 // fractional rate, used when requestsPerHour is set burstSize int strategy RateLimitStrategy tokenHeader string // HTTP header to extract token from @@ -44,7 +45,7 @@ type RateLimitMiddleware struct { // client tracks the rate limiting state for a single client type client struct { - tokens int + tokens float64 lastTimestamp time.Time } @@ -53,6 +54,7 @@ func NewRateLimitMiddleware(name string, requestsPerMinute, burstSize int) *Rate return &RateLimitMiddleware{ name: name, requestsPerMinute: requestsPerMinute, + ratePerMinute: float64(requestsPerMinute), burstSize: burstSize, strategy: RateLimitByIP, tokenHeader: "Authorization", @@ -62,6 +64,24 @@ func NewRateLimitMiddleware(name string, requestsPerMinute, burstSize int) *Rate } } +// NewRateLimitMiddlewareWithHourlyRate creates a rate limiting middleware using +// a per-hour rate. Useful for low-frequency endpoints like registration where +// fractional per-minute rates are needed. +func NewRateLimitMiddlewareWithHourlyRate(name string, requestsPerHour, burstSize int) *RateLimitMiddleware { + m := &RateLimitMiddleware{ + name: name, + requestsPerMinute: 0, // not used when ratePerMinute is set + ratePerMinute: float64(requestsPerHour) / 60.0, + burstSize: burstSize, + strategy: RateLimitByIP, + tokenHeader: "Authorization", + clients: make(map[string]*client), + cleanupInterval: 5 * time.Minute, + stopCleanup: make(chan struct{}), + } + return m +} + // NewRateLimitMiddlewareWithStrategy creates a rate limiting middleware with // a specific client identification strategy. func NewRateLimitMiddlewareWithStrategy(name string, requestsPerMinute, burstSize int, strategy RateLimitStrategy) *RateLimitMiddleware { @@ -132,20 +152,20 @@ func (m *RateLimitMiddleware) Process(next http.Handler) http.Handler { m.mu.Lock() c, exists := m.clients[key] if !exists { - c = &client{tokens: m.burstSize, lastTimestamp: time.Now()} + c = &client{tokens: float64(m.burstSize), lastTimestamp: time.Now()} m.clients[key] = c } else { - // Refill tokens based on elapsed time + // Refill tokens based on elapsed time using fractional rate elapsed := time.Since(c.lastTimestamp).Minutes() - tokensToAdd := int(elapsed * float64(m.requestsPerMinute)) + tokensToAdd := elapsed * m.ratePerMinute if tokensToAdd > 0 { - c.tokens = min(c.tokens+tokensToAdd, m.burstSize) + c.tokens = min(c.tokens+tokensToAdd, float64(m.burstSize)) c.lastTimestamp = time.Now() } } // Check if request can proceed - if c.tokens <= 0 { + if c.tokens < 1 { m.mu.Unlock() w.Header().Set("Retry-After", "60") http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) @@ -163,7 +183,12 @@ func (m *RateLimitMiddleware) Process(next http.Handler) http.Handler { // cleanupStaleClients removes client entries that haven't been seen in over // twice the refill window. This prevents unbounded memory growth. func (m *RateLimitMiddleware) cleanupStaleClients() { - staleThreshold := 2 * time.Minute * time.Duration(max(1, m.burstSize/max(1, m.requestsPerMinute))) + // Use fractional ratePerMinute to compute refill window correctly + refillWindow := 1.0 + if m.ratePerMinute > 0 { + refillWindow = float64(m.burstSize) / m.ratePerMinute + } + staleThreshold := time.Duration(2*refillWindow) * time.Minute if staleThreshold < 10*time.Minute { staleThreshold = 10 * time.Minute } diff --git a/module/http_middleware_test.go b/module/http_middleware_test.go index cd7a820f..11866d3f 100644 --- a/module/http_middleware_test.go +++ b/module/http_middleware_test.go @@ -527,11 +527,11 @@ func TestRateLimitMiddleware_CleanupStaleClients(t *testing.T) { // Manually add a stale client m.mu.Lock() m.clients["ip:stale-client"] = &client{ - tokens: 10, + tokens: 10.0, lastTimestamp: time.Now().Add(-1 * time.Hour), // 1 hour old } m.clients["ip:fresh-client"] = &client{ - tokens: 10, + tokens: 10.0, lastTimestamp: time.Now(), } m.mu.Unlock() @@ -562,3 +562,81 @@ func TestRateLimitMiddleware_StartStop_Lifecycle(t *testing.T) { t.Fatalf("Stop failed: %v", err) } } + +// -- Hourly rate middleware tests -- + +func TestNewRateLimitMiddlewareWithHourlyRate_AllowsBurst(t *testing.T) { + // 5 requests/hour, burst 5 + m := NewRateLimitMiddlewareWithHourlyRate("rl-hour", 5, 5) + + handler := m.Process(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + // First 5 requests should succeed (burst exhausted) + for i := range 5 { + req := httptest.NewRequest("POST", "/api/v1/auth/register", nil) + req.RemoteAddr = "10.0.0.1:1234" + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Errorf("request %d: expected 200, got %d", i+1, rec.Code) + } + } + + // 6th request should be rate limited + req := httptest.NewRequest("POST", "/api/v1/auth/register", nil) + req.RemoteAddr = "10.0.0.1:1234" + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + if rec.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 after burst exhausted, got %d", rec.Code) + } +} + +func TestNewRateLimitMiddlewareWithHourlyRate_PerIP(t *testing.T) { + // 5 requests/hour, burst 2 + m := NewRateLimitMiddlewareWithHourlyRate("rl-hour", 5, 2) + + handler := m.Process(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + // Exhaust IP A burst + for i := range 2 { + req := httptest.NewRequest("POST", "/api/v1/auth/register", nil) + req.RemoteAddr = "10.0.0.1:1234" + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Errorf("IP A request %d: expected 200, got %d", i+1, rec.Code) + } + } + + // IP A should be rate limited + reqA := httptest.NewRequest("POST", "/api/v1/auth/register", nil) + reqA.RemoteAddr = "10.0.0.1:1234" + recA := httptest.NewRecorder() + handler.ServeHTTP(recA, reqA) + if recA.Code != http.StatusTooManyRequests { + t.Errorf("IP A: expected 429, got %d", recA.Code) + } + + // IP B should still be allowed (separate bucket) + reqB := httptest.NewRequest("POST", "/api/v1/auth/register", nil) + reqB.RemoteAddr = "10.0.0.2:5678" + recB := httptest.NewRecorder() + handler.ServeHTTP(recB, reqB) + if recB.Code != http.StatusOK { + t.Errorf("IP B: expected 200, got %d", recB.Code) + } +} + +func TestNewRateLimitMiddlewareWithHourlyRate_RatePerMinute(t *testing.T) { + // 60 requests/hour = 1 request/minute + m := NewRateLimitMiddlewareWithHourlyRate("rl-hour", 60, 1) + // ratePerMinute should be 1.0 + if m.ratePerMinute != 1.0 { + t.Errorf("expected ratePerMinute=1.0, got %f", m.ratePerMinute) + } +} diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index 2f049fa8..10833c75 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -218,7 +218,7 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { if v, ok := glCfg["burstSize"].(float64); ok { rl.BurstSize = int(v) } - gw.SetGlobalRateLimit(rl) + gw.SetRateLimit(rl) } // CORS if corsCfg, ok := cfg["cors"].(map[string]any); ok { diff --git a/plugins/http/modules.go b/plugins/http/modules.go index db85ea03..c72c7b61 100644 --- a/plugins/http/modules.go +++ b/plugins/http/modules.go @@ -133,18 +133,27 @@ func loggingMiddlewareFactory(name string, cfg map[string]any) modular.Module { } func rateLimitMiddlewareFactory(name string, cfg map[string]any) modular.Module { - requestsPerMinute := 60 burstSize := 10 - if rpm, ok := cfg["requestsPerMinute"].(int); ok { - requestsPerMinute = rpm - } else if rpm, ok := cfg["requestsPerMinute"].(float64); ok { - requestsPerMinute = int(rpm) - } if bs, ok := cfg["burstSize"].(int); ok { burstSize = bs } else if bs, ok := cfg["burstSize"].(float64); ok { burstSize = int(bs) } + + // requestsPerHour takes precedence over requestsPerMinute for low-frequency + // endpoints (e.g. registration) where fractional per-minute rates are needed. + if rph, ok := cfg["requestsPerHour"].(int); ok { + return module.NewRateLimitMiddlewareWithHourlyRate(name, rph, burstSize) + } else if rph, ok := cfg["requestsPerHour"].(float64); ok { + return module.NewRateLimitMiddlewareWithHourlyRate(name, int(rph), burstSize) + } + + requestsPerMinute := 60 + if rpm, ok := cfg["requestsPerMinute"].(int); ok { + requestsPerMinute = rpm + } else if rpm, ok := cfg["requestsPerMinute"].(float64); ok { + requestsPerMinute = int(rpm) + } return module.NewRateLimitMiddleware(name, requestsPerMinute, burstSize) } diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go index ab061522..acea60ef 100644 --- a/plugins/http/plugin_test.go +++ b/plugins/http/plugin_test.go @@ -327,6 +327,35 @@ func TestWorkflowHandlerFactorySmoke(t *testing.T) { } } +func TestRateLimitMiddlewareFactory_RequestsPerHour(t *testing.T) { + factories := moduleFactories() + factory, ok := factories["http.middleware.ratelimit"] + if !ok { + t.Fatal("no factory for http.middleware.ratelimit") + } + + // requestsPerHour as int + mod := factory("auth-register-rl", map[string]any{ + "requestsPerHour": 5, + "burstSize": 5, + }) + if mod == nil { + t.Fatal("factory returned nil for requestsPerHour config") + } + if mod.Name() != "auth-register-rl" { + t.Errorf("expected name %q, got %q", "auth-register-rl", mod.Name()) + } + + // requestsPerHour as float64 (YAML unmarshals numbers as float64) + mod2 := factory("auth-register-rl2", map[string]any{ + "requestsPerHour": float64(5), + "burstSize": float64(5), + }) + if mod2 == nil { + t.Fatal("factory returned nil for requestsPerHour float64 config") + } +} + func TestPluginLoaderIntegration(t *testing.T) { p := New() diff --git a/plugins/http/schemas.go b/plugins/http/schemas.go index 3c625069..ef019ee8 100644 --- a/plugins/http/schemas.go +++ b/plugins/http/schemas.go @@ -159,10 +159,11 @@ func rateLimitMiddlewareSchema() *schema.ModuleSchema { Inputs: []schema.ServiceIODef{{Name: "request", Type: "http.Request", Description: "HTTP request to rate-limit"}}, Outputs: []schema.ServiceIODef{{Name: "limited", Type: "http.Request", Description: "HTTP request (passed through if within limit)"}}, ConfigFields: []schema.ConfigFieldDef{ - {Key: "requestsPerMinute", Label: "Requests Per Minute", Type: schema.FieldTypeNumber, DefaultValue: 60, Description: "Maximum number of requests per minute per client"}, + {Key: "requestsPerMinute", Label: "Requests Per Minute", Type: schema.FieldTypeNumber, DefaultValue: 60, Description: "Maximum number of requests per minute per client; used when requestsPerHour is not set"}, + {Key: "requestsPerHour", Label: "Requests Per Hour", Type: schema.FieldTypeNumber, DefaultValue: 0, Description: "Maximum number of requests per hour per client; takes precedence over requestsPerMinute when set"}, {Key: "burstSize", Label: "Burst Size", Type: schema.FieldTypeNumber, DefaultValue: 10, Description: "Maximum burst of requests allowed above the rate limit"}, }, - DefaultConfig: map[string]any{"requestsPerMinute": 60, "burstSize": 10}, + DefaultConfig: map[string]any{"requestsPerMinute": 60, "requestsPerHour": 0, "burstSize": 10}, } }