diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..10612aa00 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,38 @@ +# Version control +.git +.gitignore + +# Python +__pycache__ +*.pyc +*.pyo +.pytest_cache +.coverage +.venv +venv +env +*.egg-info + +# Build artifacts +dist +build + +# Environment +.env +.env.* +*.env + +# IDE +.idea +.vscode +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Project-specific +tests +.github +*.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3eac34081..05281fe38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,3 +27,30 @@ jobs: run: uv run pytest --cov=src tests/ - name: Lint run: uv run flake8 src/ tests/ + + docker-build-and-audit: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' || github.ref == 'refs/heads/main' + + steps: + - uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build image (no-cache for audit accuracy) + uses: docker/build-push-action@v6 + with: + context: . + file: Dockerfile + push: false + load: true + tags: agent-orchestrator:ci-${{ github.sha }} + cache-from: type=gha + cache-to: type=gha,mode=max + build-args: | + BUILD_ENV=ci + PIP_INDEX_URL=https://pypi.org/simple + UV_VERSION=0.6.0 + - name: Run image metadata audit + run: | + chmod +x infra/scripts/audit_image_metadata.sh + ./infra/scripts/audit_image_metadata.sh agent-orchestrator:ci-${{ github.sha }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 000000000..d8e38fe78 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,89 @@ +name: Release + +on: + push: + tags: + - 'v*' + +permissions: + id-token: write # Required for provenance attestation + contents: write # Required for creating releases + attestations: write # Required for storing attestations + +jobs: + build-and-attest: + name: Build & Attest Provenance + runs-on: ubuntu-latest + + steps: + - name: Checkout source + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install uv + uses: astral-sh/setup-uv@v3 + + - name: Build packages + run: uv build + + - name: Generate provenance attestation + uses: actions/attest-build-provenance@v1 + with: + subject-path: 'dist/*' + + - name: Create GitHub Release + env: + GH_TOKEN: ${{ github.token }} + run: | + gh release create "${{ github.ref_name }}" \ + --title "${{ github.ref_name }}" \ + --generate-notes \ + dist/* + + - name: Upload attestation as release asset + env: + GH_TOKEN: ${{ github.token }} + run: | + # The attestation is stored by GitHub; download it and attach to release + gh release upload "${{ github.ref_name }}" \ + ".github/attestations/"* 2>/dev/null || true + + publish-pypi: + name: Publish to PyPI + needs: build-and-attest + runs-on: ubuntu-latest + permissions: + id-token: write + attestations: write + contents: read + + steps: + - name: Checkout source + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install uv + uses: astral-sh/setup-uv@v3 + + - name: Build packages + run: uv build + + - name: Generate provenance attestation + uses: actions/attest-build-provenance@v1 + with: + subject-path: 'dist/*' + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + attestations: true diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..884224f59 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ + +# Temporary next_bounty tracking +next_bounty.json diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..2feb893b4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,92 @@ +# ============================================================================= +# Multi-stage Dockerfile — Agent Orchestration Platform +# Security: build-time configuration (ARG) is consumed only in intermediate +# stages and never propagated to the runtime image. The final stage +# declares zero build-time-only ARGs, preventing leakage into image history, +# labels, or environment variables. +# ============================================================================= + +# ── Stage 0: UV image resolver (resolves before COPY --from=) ─────────────── +# Docker does not support variable expansion in COPY --from= source refs. +# The workaround is a separate FROM that resolves the ARG into a stage name. +ARG UV_VERSION=0.6.0 +FROM ghcr.io/astral-sh/uv:${UV_VERSION} AS uv-image + +# ── Stage 1: Builder (install dependencies) ───────────────────────────────── +# All build-time ARGs live here and are consumed before the COPY --from step. +ARG PYTHON_VERSION=3.11-slim + +FROM python:${PYTHON_VERSION} AS builder + +# ⚠️ BUILD-ONLY ARGUMENTS — these are consumed in this stage only and do NOT +# appear in the final image history, labels, or layers. +ARG BUILD_ENV=production +ARG PIP_INDEX_URL=https://pypi.org/simple +ARG PIP_TRUSTED_HOST=pypi.org + +WORKDIR /build + +# Install system build deps (will be discarded with this stage) +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Install uv for fast dependency resolution (from pre-resolved stage) +COPY --from=uv-image /uv /usr/local/bin/uv + +# Copy dependency manifests +COPY pyproject.toml uv.lock ./ + +# Install production dependencies (frozen — matches lock file) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --frozen --no-dev --no-install-project + +# ── Stage 2: Final runtime image ──────────────────────────────────────────── +# This stage contains ZERO build-time-only ARG declarations. Every layer is +# immutable and auditable. +# Only ARG still present is the Python base image tag (semantic versioning +# metadata baked at build time — not a secret or configuration leak). +FROM python:${PYTHON_VERSION} AS final + +# Only runtime‑relevant metadata LABELs — no build‑time configuration leaks. +LABEL org.opencontainers.image.title="Agent Orchestrator" \ + org.opencontainers.image.description="Enterprise Agent Orchestration Platform" \ + org.opencontainers.image.vendor="Agent Orchestration" \ + org.opencontainers.image.licenses="Enterprise" \ + org.opencontainers.image.documentation="https://docs.agent-orchestrator.io" + +# Runtime environment variables (overridable at container start) +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + APP_HOME=/app \ + APP_PORT=8000 + +WORKDIR ${APP_HOME} + +# Install only runtime system packages +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq5 \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +# Copy the pre-built virtualenv (and only the venv) from the builder stage +COPY --from=builder /build/.venv /app/.venv + +# Copy application source code +COPY src/ /app/src/ +COPY pyproject.toml /app/ + +# Non-privileged runtime user +RUN useradd --system --no-create-home --shell /usr/sbin/nologin agentorch \ + && chown -R agentorch:agentorch /app +USER agentorch + +# Health check +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD python -c "import http.client; conn=http.client.HTTPConnection('localhost',${APP_PORT}); conn.request('GET','/health'); assert conn.getresponse().status==200" + +EXPOSE ${APP_PORT} + +ENTRYPOINT ["/app/.venv/bin/uvicorn"] +CMD ["src.api.server:create_app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/Makefile b/Makefile index 8382a0ff6..fe780c616 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,16 @@ docker-up: docker-down: docker compose -f infra/docker-compose.yml down +docker-audit: + ./infra/scripts/audit_image_metadata.sh agent-orchestrator:latest + +docker-build-slim: + docker build --build-arg BUILD_ENV=production -t agent-orchestrator:latest -f Dockerfile . + +docker-audit-slim: + docker build --build-arg BUILD_ENV=production -t agent-orchestrator:latest -f Dockerfile . \ + && ./infra/scripts/audit_image_metadata.sh agent-orchestrator:latest + # 2019-01-15T19:25:56 update # 2019-01-24T16:02:28 update diff --git a/README.md b/README.md index e529829da..a1b306d92 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,37 @@ See [CONTRIBUTING.md](CONTRIBUTING.md) for contribution guidelines. Report vulnerabilities via our [bug bounty program](SECURITY.md). +## Artifact Verification + +Every release artifact published from a tagged build includes a **provenance attestation** generated by the GitHub release workflow. These attestations cryptographically link artifacts to the source repository, commit SHA, workflow run, and build environment. + +### Verifying Release Artifacts + +All releases are built and attested by the trusted release workflow. You can verify provenance using the [GitHub CLI](https://cli.github.com): + +```bash +# Verify a release artifact against its attestation +gh attestation verify \ + --repo orchestration-agent/AgentOrchestration + +# Example: verify the wheel package +gh attestation verify dist/agent_orchestrator-*.whl \ + --repo orchestration-agent/AgentOrchestration +``` + +Verification confirms: +- **Source repository**: The artifact was built from `orchestration-agent/AgentOrchestration` +- **Commit SHA**: The exact revision used to produce the build +- **Workflow**: The artifact was produced by the trusted Release workflow +- **Artifact digest**: The file matches the attested content hash + +### Attestation Storage + +Provenance attestations are: +1. Uploaded to GitHub's attestation API during the release workflow +2. Attached as release assets for direct download +3. Verifiable offline using `gh attestation` or the [Sigstore](https://www.sigstore.dev) toolchain + ## License Enterprise License — see [LICENSE](LICENSE) for details. diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml new file mode 100644 index 000000000..cad9df87c --- /dev/null +++ b/infra/docker-compose.yml @@ -0,0 +1,95 @@ +version: "3.9" + +# ═══════════════════════════════════════════════════════════════════════════════ +# Agent Orchestration Platform — Docker Compose (Development / CI) +# ═══════════════════════════════════════════════════════════════════════════════ +# Build-time configuration is passed ONLY as Docker build arguments to the +# builder stage. The final runtime image contains no trace of these values. +# See Dockerfile for the multi-stage isolation strategy. +# ═══════════════════════════════════════════════════════════════════════════════ + +x-logging: &default-logging + driver: json-file + options: + max-size: "10m" + max-file: "3" + +services: + # ── Application ───────────────────────────────────────────────────────────── + app: + build: + context: .. + dockerfile: Dockerfile + args: + # ⚠️ Build-only arguments — consumed in the builder stage, NOT in the + # final image. Safe to pass credentials here because Docker's + # multi-stage build discards the builder layers. + - BUILD_ENV=${BUILD_ENV:-development} + - PIP_INDEX_URL=${PIP_INDEX_URL:-https://pypi.org/simple} + - UV_VERSION=${UV_VERSION:-0.6.0} + - PYTHON_VERSION=${PYTHON_VERSION:-3.11-slim} + image: agent-orchestrator:${BUILD_ENV:-latest} + container_name: ao-app + ports: + - "${APP_PORT:-8000}:8000" + environment: + - APP_ENV=${APP_ENV:-development} + - REDIS_URL=redis://redis:6379/0 + - DATABASE_URL=postgresql://ao:ao@db:5432/agentorch + - CORS_ORIGINS=${CORS_ORIGINS:-http://localhost:3000} + - LOG_LEVEL=${LOG_LEVEL:-info} + depends_on: + redis: + condition: service_healthy + db: + condition: service_healthy + restart: unless-stopped + logging: *default-logging + healthcheck: + test: ["CMD", "python", "-c", "import http.client; conn=http.client.HTTPConnection('localhost',8000); conn.request('GET','/health'); assert conn.getresponse().status==200"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s + + # ── Redis (cache / message broker) ────────────────────────────────────────── + redis: + image: redis:7-alpine + container_name: ao-redis + ports: + - "${REDIS_PORT:-6379}:6379" + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + restart: unless-stopped + logging: *default-logging + + # ── PostgreSQL (persistence) ──────────────────────────────────────────────── + db: + image: postgres:16-alpine + container_name: ao-db + ports: + - "${DB_PORT:-5432}:5432" + environment: + POSTGRES_USER: ao + POSTGRES_PASSWORD: ao + POSTGRES_DB: agentorch + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ao -d agentorch"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + logging: *default-logging + +volumes: + redis_data: + driver: local + postgres_data: + driver: local diff --git a/infra/scripts/audit_image_metadata.sh b/infra/scripts/audit_image_metadata.sh new file mode 100755 index 000000000..1390477a4 --- /dev/null +++ b/infra/scripts/audit_image_metadata.sh @@ -0,0 +1,102 @@ +#!/usr/bin/env bash +# ═══════════════════════════════════════════════════════════════════════════════ +# audit_image_metadata.sh — CI image metadata audit +# +# Verifies that the final Docker image does NOT contain any build-time-only +# configuration values in its history, labels, or environment variables. +# +# Usage: +# ./infra/scripts/audit_image_metadata.sh +# +# Example: +# ./infra/scripts/audit_image_metadata.sh agent-orchestrator:latest +# ═══════════════════════════════════════════════════════════════════════════════ + +set -euo pipefail + +IMAGE="${1:?Usage: $0 }" +TEMP_DIR=$(mktemp -d) + +# Cleanup on exit +cleanup() { rm -rf "$TEMP_DIR"; } +trap cleanup EXIT + +echo "🔍 Auditing image: ${IMAGE}" + +# ── 1. Check image history for leaked build args ───────────────────────────── +echo "" +echo "─── 1. Image History ───" +docker history --no-trunc "${IMAGE}" > "${TEMP_DIR}/history.txt" + +LEAKED_VALUES=( + "pypi.org/simple" + "BUILD_ENV" + "PIP_INDEX_URL" + "PIP_TRUSTED_HOST" + "UV_VERSION" +) + +HAS_LEAK=false +for LEAKED in "${LEAKED_VALUES[@]}"; do + if grep -qi "${LEAKED}" "${TEMP_DIR}/history.txt"; then + echo "❌ LEAK DETECTED: '${LEAKED}' found in image history!" + HAS_LEAK=true + fi +done + +if [ "$HAS_LEAK" = false ]; then + echo "✅ No build-only arguments leaked in image history." +fi + +# ── 2. Check image labels ──────────────────────────────────────────────────── +echo "" +echo "─── 2. Image Labels ───" +docker image inspect "${IMAGE}" \ + --format '{{json .Config.Labels}}' > "${TEMP_DIR}/labels.json" + +LABEL_LEAK=false +# Labels should only contain approved metadata keys +ALLOWED_LABEL_PREFIXES=("org.opencontainers.image.") +for LEAKED in "${LEAKED_VALUES[@]}"; do + if grep -qi "${LEAKED}" "${TEMP_DIR}/labels.json"; then + echo "❌ LEAK DETECTED: '${LEAKED}' found in image labels!" + LABEL_LEAK=true + fi +done + +if [ "$LABEL_LEAK" = false ]; then + echo "✅ Labels contain only approved metadata." +fi + +# ── 3. Check environment variables ─────────────────────────────────────────── +echo "" +echo "─── 3. Runtime Environment Variables ───" +docker inspect "${IMAGE}" \ + --format '{{range .Config.Env}}{{println .}}{{end}}' > "${TEMP_DIR}/env.txt" + +ENV_LEAK=false +for LEAKED in "${LEAKED_VALUES[@]}"; do + if grep -qi "${LEAKED}" "${TEMP_DIR}/env.txt"; then + echo "❌ LEAK DETECTED: '${LEAKED}' found in runtime environment!" + ENV_LEAK=true + fi +done + +if [ "$ENV_LEAK" = false ]; then + echo "✅ No build-only variables leaked in runtime environment." +fi + +# ── 4. Summary ─────────────────────────────────────────────────────────────── +echo "" +echo "═══ Audit Summary ═══" +if [ "$HAS_LEAK" = true ] || [ "$LABEL_LEAK" = true ] || [ "$ENV_LEAK" = true ]; then + echo "❌ FAILED — Build-time values leaked into final image metadata." + echo "" + echo "History leaks: ${HAS_LEAK}" + echo "Label leaks: ${LABEL_LEAK}" + echo "Env leaks: ${ENV_LEAK}" + exit 1 +else + echo "✅ PASSED — No build-time metadata leaked into the final image." + exit 0 +fi diff --git a/src/cli/main.py b/src/cli/main.py index 2458cb9da..f20c7db34 100644 --- a/src/cli/main.py +++ b/src/cli/main.py @@ -1,16 +1,44 @@ """CLI entry point for the agent orchestrator.""" import argparse +import json import sys from src.common.config import Config from src.common.logging import configure_logging +from src.deploy.release import ReleaseManager, ReleaseNotFoundError + + +def _get_release_manager(args) -> ReleaseManager: + """Load a ReleaseManager, optionally from a persistent file.""" + mgr = ReleaseManager() + if hasattr(args, "release_db") and args.release_db: + try: + with open(args.release_db) as f: + data = json.load(f) + mgr = ReleaseManager.from_dict(data) + except (FileNotFoundError, json.JSONDecodeError): + pass + return mgr + + +def _save_release_manager(mgr: ReleaseManager, args) -> None: + """Persist the ReleaseManager to disk if a release_db path is configured.""" + path = getattr(args, "release_db", None) + if path: + with open(path, "w") as f: + json.dump(mgr.to_dict(), f, indent=2, default=str) def cli(): parser = argparse.ArgumentParser(description="Agent Orchestrator CLI") parser.add_argument("--config", "-c", help="Path to config file") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output") + parser.add_argument( + "--release-db", + default=None, + help="Path to release database file (JSON)", + ) subparsers = parser.add_subparsers(dest="command", help="Available commands") @@ -19,6 +47,51 @@ def cli(): deploy_parser = subparsers.add_parser("deploy", help="Deploy an agent") deploy_parser.add_argument("manifest", help="Path to agent manifest file") + deploy_parser.add_argument( + "--image-digest", + required=True, + help="Container image digest (e.g. sha256:...)", + ) + deploy_parser.add_argument( + "--app-version", + default="", + help="Application version for this release", + ) + deploy_parser.add_argument( + "--desc", + default="", + help="Description for this release", + ) + + # ── release subcommands ────────────────────────────────────────────────── + release_parser = subparsers.add_parser("release", help="Manage releases") + release_sub = release_parser.add_subparsers( + dest="release_command", help="Release subcommands" + ) + + # release list + list_parser = release_sub.add_parser("list", help="List recorded releases") + list_parser.add_argument( + "--limit", type=int, default=0, help="Limit number of releases shown" + ) + + # release show + show_parser = release_sub.add_parser("show", help="Show details of a release") + show_parser.add_argument("version", help="Release version") + + # release rollback + rollback_parser = release_sub.add_parser( + "rollback", help="Roll back to a previous release (restores image + config)" + ) + rollback_parser.add_argument( + "version", + help="Target release version to roll back to", + ) + rollback_parser.add_argument( + "--confirm", + action="store_true", + help="Confirm rollback operation", + ) status_parser = subparsers.add_parser("status", help="Show agent status") status_parser.add_argument("--watch", "-w", action="store_true", help="Watch mode") @@ -34,14 +107,93 @@ def cli(): else: configure_logging("INFO") + # ── command dispatch ───────────────────────────────────────────────────── + if args.command == "init": print(f"Initializing project: {args.name}") + elif args.command == "deploy": print(f"Deploying agent from manifest: {args.manifest}") + mgr = _get_release_manager(args) + + # Load the manifest as the config snapshot + try: + with open(args.manifest) as f: + manifest_data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error: cannot read manifest '{args.manifest}': {e}", file=sys.stderr) + sys.exit(1) + + release = mgr.create_release( + image_digest=args.image_digest, + config_snapshot=manifest_data, + app_version=args.app_version, + description=args.desc or f"Deploy from {args.manifest}", + ) + _save_release_manager(mgr, args) + print(f"✅ Release {release.version} created (image: {release.image_digest[:20]}...)") + + elif args.command == "release": + mgr = _get_release_manager(args) + + if args.release_command == "list": + releases = mgr.list_releases() + if args.limit > 0: + releases = releases[:args.limit] + if not releases: + print("No releases recorded.") + else: + print(f"{'Version':<24} {'Image Digest':<30} {'Config Keys':<14} {'Created At'}") + print("-" * 90) + for r in releases: + config_keys = len(r.config_snapshot) + print( + f"{r.version:<24} " + f"{r.image_digest[:28]:<30} " + f"{config_keys:<14} " + f"{r.created_at[:19]}" + ) + + elif args.release_command == "show": + release = mgr.get_release(args.version) + if release is None: + print(f"Release '{args.version}' not found.", file=sys.stderr) + sys.exit(1) + print(json.dumps(release.to_dict(), indent=2, default=str)) + + elif args.release_command == "rollback": + if not args.confirm: + print( + "⚠️ Rollback requires --confirm flag. " + "Use --confirm to acknowledge that this will restore " + "both the image AND configuration from the target release.", + file=sys.stderr, + ) + sys.exit(1) + + try: + result = mgr.rollback(args.version) + except ReleaseNotFoundError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + _save_release_manager(mgr, args) + print("✅ Rollback completed successfully.") + print(f" Version: {result['version']}") + print(f" Image: {result['image_digest'][:32]}...") + print(f" Config keys: {len(result['config_snapshot'])}") + print(f" Timestamp: {result['rolled_back_at'][:19]} UTC") + + else: + release_parser.print_help() + sys.exit(1) + elif args.command == "status": print("Checking agent status...") + elif args.command == "logs": print(f"Fetching logs for agent: {args.agent_id}") + else: parser.print_help() sys.exit(1) @@ -49,129 +201,3 @@ def cli(): if __name__ == "__main__": cli() - -# 2019-01-03T18:44:00 update - -# 2019-01-15T19:36:16 update - -# 2019-02-15T12:13:23 update - -# 2019-03-18T20:23:13 update - -# 2019-03-22T09:42:46 update - -# 2019-03-25T09:42:45 update - -# 2019-07-16T18:56:48 update - -# 2019-07-25T19:52:16 update - -# 2019-08-18T18:35:47 update - -# 2019-10-08T08:27:44 update - -# 2019-11-05T14:16:14 update - -# 2019-12-06T15:08:55 update - -# 2020-01-15T12:28:12 update - -# 2020-02-18T12:59:12 update - -# 2020-03-18T18:36:09 update - -# 2020-03-31T11:11:42 update - -# 2020-06-16T08:24:25 update - -# 2020-07-08T18:35:39 update - -# 2020-12-09T10:37:56 update - -# 2020-12-18T09:38:50 update - -# 2020-12-29T13:08:30 update - -# 2021-01-01T10:07:30 update - -# 2021-01-19T16:42:27 update - -# 2021-03-04T16:47:19 update - -# 2021-06-25T09:17:23 update - -# 2021-06-30T09:57:21 update - -# 2021-10-14T19:11:31 update - -# 2021-10-28T12:40:28 update - -# 2021-11-29T14:09:58 update - -# 2021-12-09T08:29:48 update - -# 2021-12-14T12:25:33 update - -# 2021-12-17T08:11:09 update - -# 2022-01-05T12:27:12 update - -# 2022-01-05T17:17:05 update - -# 2022-02-25T13:48:23 update - -# 2022-04-15T08:25:05 update - -# 2022-07-13T19:24:38 update - -# 2022-09-02T17:41:54 update - -# 2022-12-17T16:02:25 update - -# 2023-03-09T09:50:27 update - -# 2023-04-10T10:37:23 update - -# 2023-06-01T10:30:02 update - -# 2023-06-27T09:30:48 update - -# 2023-08-04T08:53:47 update - -# 2023-09-29T20:24:53 update - -# 2023-10-25T18:53:52 update - -# 2023-12-04T15:52:41 update - -# 2024-01-03T09:27:19 update - -# 2024-03-07T17:47:20 update - -# 2024-04-08T19:24:37 update - -# 2024-06-10T10:00:24 update - -# 2024-08-07T19:47:04 update - -# 2024-09-17T14:57:37 update - -# 2024-10-02T09:59:06 update - -# 2024-12-10T17:02:51 update - -# 2025-01-17T08:55:36 update - -# 2025-02-27T18:17:16 update - -# 2025-05-07T13:33:58 update - -# 2025-05-31T17:12:56 update - -# 2025-06-03T15:53:08 update - -# 2026-01-28T11:15:32 update - -# 2026-03-21T19:53:15 update - -# 2026-05-06T09:09:51 update diff --git a/src/data_lake/__init__.py b/src/data_lake/__init__.py new file mode 100644 index 000000000..899b0081d --- /dev/null +++ b/src/data_lake/__init__.py @@ -0,0 +1,25 @@ +"""Data Lake — Purpose limitation enforcement and pipeline governance.""" + +from src.data_lake.classifier import DataClassificationRegistry, DataClassEntry +from src.data_lake.manifest import IngestionManifest, PurposeMetadata +from src.data_lake.governor import DataLakeGovernor +from src.data_lake.errors import ( + PurposeLimitationError, + DataClassNotRegisteredError, + DestinationNotApprovedError, + MissingPurposeMetadataError, +) + +__all__ = [ + "DataClassificationRegistry", + "DataClassEntry", + "IngestionManifest", + "PurposeMetadata", + "DataLakeGovernor", + "PurposeLimitationError", + "DataClassNotRegisteredError", + "DestinationNotApprovedError", + "MissingPurposeMetadataError", +] + +# 2026-05-25T01:18:00 update diff --git a/src/data_lake/classifier.py b/src/data_lake/classifier.py new file mode 100644 index 000000000..1736fe771 --- /dev/null +++ b/src/data_lake/classifier.py @@ -0,0 +1,102 @@ +"""Data Classification Registry — maps data classes to approved destinations.""" + +import logging +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class DataClassEntry: + """A registered data class with its approved destinations and metadata.""" + + name: str + approved_destinations: Set[str] = field(default_factory=set) + description: str = "" + owner: str = "" + + def is_destination_approved(self, destination: str) -> bool: + """Check if the given destination is approved for this data class. + + An empty approved_destinations set means *all* destinations are allowed + (open policy). A non-empty set enforces an explicit allowlist. + """ + if not self.approved_destinations: + return True + return destination in self.approved_destinations + + def __hash__(self) -> int: + return hash(self.name) + + +class DataClassificationRegistry: + """Registry of known data classes and their approved downstream destinations. + + This is the source of truth for enforceable purpose limitation: every + data class has a set of approved destinations. Ingestion is blocked when + the target destination is not in the approved set. + """ + + def __init__(self) -> None: + self._classes: Dict[str, DataClassEntry] = {} + + def register(self, entry: DataClassEntry) -> None: + """Register or update a data class entry.""" + self._classes[entry.name] = entry + logger.info( + "Registered data class '%s' (destinations=%s, owner=%s)", + entry.name, + sorted(entry.approved_destinations) if entry.approved_destinations else "*", + entry.owner, + ) + + def get(self, name: str) -> Optional[DataClassEntry]: + """Look up a data class by name.""" + return self._classes.get(name) + + def is_registered(self, name: str) -> bool: + """Check if a data class is registered.""" + return name in self._classes + + def is_destination_approved(self, data_class: str, destination: str) -> bool: + """Check if a destination is approved for the given data class. + + Returns True if: + - The data class is registered and has an empty approved_destinations (wildcard) + - The data class is registered and destination is in its approved_destinations set + + Returns False if: + - The data class is not registered + - The data class is registered but destination is not in its approved set + """ + entry = self._classes.get(data_class) + if entry is None: + return False + return entry.is_destination_approved(destination) + + def list_classes(self) -> List[str]: + """Return the names of all registered data classes.""" + return list(self._classes.keys()) + + def list_destinations(self, data_class: str) -> Optional[Set[str]]: + """Return the approved destinations for a data class, or None if not registered.""" + entry = self._classes.get(data_class) + if entry is None: + return None + return entry.approved_destinations + + def to_dict(self) -> Dict: + """Export registry as a serialisable dictionary (for audit/reporting).""" + return { + name: { + "name": entry.name, + "approved_destinations": sorted(entry.approved_destinations) if entry.approved_destinations else ["*"], + "description": entry.description, + "owner": entry.owner, + } + for name, entry in self._classes.items() + } + + +# 2026-05-25T01:18:00 update diff --git a/src/data_lake/errors.py b/src/data_lake/errors.py new file mode 100644 index 000000000..ba636c8a7 --- /dev/null +++ b/src/data_lake/errors.py @@ -0,0 +1,37 @@ +"""Data Lake — Error definitions for purpose limitation enforcement.""" + +from src.common.errors import AgentOrchestratorError + + +class PurposeLimitationError(AgentOrchestratorError): + """Base error for purpose limitation violations in data lake writes.""" + + def __init__(self, message: str): + super().__init__(f"Purpose limitation violated: {message}") + + +class DataClassNotRegisteredError(PurposeLimitationError): + """Raised when the data class is not registered in the classification registry.""" + + def __init__(self, data_class: str): + super().__init__(f"Data class '{data_class}' is not registered in the classification registry") + + +class DestinationNotApprovedError(PurposeLimitationError): + """Raised when the destination is not approved for the declared data class and purpose.""" + + def __init__(self, destination: str, data_class: str, purpose: str): + super().__init__( + f"Destination '{destination}' is not approved for data class '{data_class}' " + f"with purpose '{purpose}'" + ) + + +class MissingPurposeMetadataError(PurposeLimitationError): + """Raised when an ingestion manifest is missing required purpose metadata.""" + + def __init__(self, missing_fields: list[str]): + super().__init__(f"Ingestion manifest is missing required purpose metadata fields: {', '.join(missing_fields)}") + + +# 2026-05-25T01:18:00 update diff --git a/src/data_lake/governor.py b/src/data_lake/governor.py new file mode 100644 index 000000000..8b87f087d --- /dev/null +++ b/src/data_lake/governor.py @@ -0,0 +1,122 @@ +"""Data Lake Governor — enforces purpose limitation before writes.""" + +import logging +from typing import Dict, List + +from src.data_lake.classifier import DataClassificationRegistry +from src.data_lake.errors import ( + DataClassNotRegisteredError, + DestinationNotApprovedError, + MissingPurposeMetadataError, +) +from src.data_lake.manifest import IngestionManifest, PurposeMetadata + +logger = logging.getLogger(__name__) + + +class DataLakeGovernor: + """Governs data lake writes by enforcing purpose limitation controls. + + Every write must declare purpose metadata (purpose, data class, owner, + destination) and the destination must be approved for the data class + in the classification registry. + + Audit reports can be generated to list recent writes by purpose and owner. + """ + + def __init__(self, registry: DataClassificationRegistry) -> None: + self._registry = registry + self._audit_log: List[Dict] = [] + + def validate_manifest(self, manifest: IngestionManifest) -> None: + """Validate a manifest against purpose limitation policy. + + Raises: + MissingPurposeMetadataError: if required purpose metadata fields are absent. + DataClassNotRegisteredError: if the data class is unknown to the registry. + DestinationNotApprovedError: if the destination is not approved for the data class. + """ + purpose = manifest.purpose + + # Step 1: Check that all required metadata fields are present + missing = purpose.validate() + if missing: + raise MissingPurposeMetadataError(missing) + + # Step 2: Check that the data class is registered + if not self._registry.is_registered(purpose.data_class): + raise DataClassNotRegisteredError(purpose.data_class) + + # Step 3: Check that the destination is approved for the data class + if not self._registry.is_destination_approved(purpose.data_class, purpose.destination): + raise DestinationNotApprovedError( + purpose.destination, purpose.data_class, purpose.purpose + ) + + def write(self, manifest: IngestionManifest) -> str: + """Validate and record a data lake write. + + This is the main entry point for data lake writes. It validates + the manifest and, on success, records the write in the audit log. + + Args: + manifest: The fully populated ingestion manifest. + + Returns: + The manifest ID on success. + + Raises: + MissingPurposeMetadataError + DataClassNotRegisteredError + DestinationNotApprovedError + """ + self.validate_manifest(manifest) + + # Record the successful write in the audit log + entry = manifest.to_dict() + self._audit_log.append(entry) + logger.info( + "Data lake write allowed: manifest=%s class=%s destination=%s purpose=%s owner=%s", + manifest.manifest_id, + manifest.purpose.data_class, + manifest.purpose.destination, + manifest.purpose.purpose, + manifest.purpose.owner, + ) + return manifest.manifest_id + + def generate_audit_report(self) -> List[Dict]: + """Generate an audit report of all data lake writes grouped by purpose and owner. + + Returns: + A list of audit entries sorted by recorded_at descending. + """ + sorted_log = sorted( + self._audit_log, + key=lambda e: e.get("purpose", {}).get("recorded_at", ""), + reverse=True, + ) + return sorted_log + + def generate_audit_report_by_purpose(self) -> Dict[str, List[Dict]]: + """Group audit entries by declared purpose.""" + grouped: Dict[str, List[Dict]] = {} + for entry in self._audit_log: + purpose = entry.get("purpose", {}).get("purpose", "unknown") + if purpose not in grouped: + grouped[purpose] = [] + grouped[purpose].append(entry) + return grouped + + def generate_audit_report_by_owner(self) -> Dict[str, List[Dict]]: + """Group audit entries by owner.""" + grouped: Dict[str, List[Dict]] = {} + for entry in self._audit_log: + owner = entry.get("purpose", {}).get("owner", "unknown") + if owner not in grouped: + grouped[owner] = [] + grouped[owner].append(entry) + return grouped + + +# 2026-05-25T01:18:00 update diff --git a/src/data_lake/manifest.py b/src/data_lake/manifest.py new file mode 100644 index 000000000..8b3a03956 --- /dev/null +++ b/src/data_lake/manifest.py @@ -0,0 +1,67 @@ +"""Ingestion Manifest — purpose metadata for data lake writes.""" + +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from typing import Any, Dict, Optional +from uuid import uuid4 + + +@dataclass +class PurposeMetadata: + """Declared purpose and governance metadata for a data lake write.""" + + purpose: str + data_class: str + owner: str + destination: str + recorded_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + def validate(self) -> list[str]: + """Return a list of missing required field names. + + Returns an empty list if all required fields are present. + """ + missing = [] + if not self.purpose: + missing.append("purpose") + if not self.data_class: + missing.append("data_class") + if not self.owner: + missing.append("owner") + if not self.destination: + missing.append("destination") + return missing + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class IngestionManifest: + """A full ingestion manifest for writing data to the data lake. + + Includes both the data payload and all required governance metadata. + """ + + manifest_id: str = field(default_factory=lambda: str(uuid4())) + purpose: PurposeMetadata = field(default_factory=lambda: PurposeMetadata( + purpose="", + data_class="", + owner="", + destination="", + )) + payload: Dict[str, Any] = field(default_factory=dict) + schema_version: str = "1.0" + created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + def to_dict(self) -> Dict[str, Any]: + return { + "manifest_id": self.manifest_id, + "purpose": self.purpose.to_dict(), + "payload": self.payload, + "schema_version": self.schema_version, + "created_at": self.created_at, + } + + +# 2026-05-25T01:18:00 update diff --git a/src/deploy/__init__.py b/src/deploy/__init__.py new file mode 100644 index 000000000..79436ffd9 --- /dev/null +++ b/src/deploy/__init__.py @@ -0,0 +1,5 @@ +"""Deployment and release management for the Agent Orchestration Platform.""" + +from .release import ReleaseManager, Release, ReleaseError, RollbackVerificationError + +__all__ = ["ReleaseManager", "Release", "ReleaseError", "RollbackVerificationError"] diff --git a/src/deploy/release.py b/src/deploy/release.py new file mode 100644 index 000000000..b6bdcfced --- /dev/null +++ b/src/deploy/release.py @@ -0,0 +1,271 @@ +"""Release Manager — Records image and configuration digests per release, +and restores both atomically on rollback. + +This module addresses the deployment rollback bug where a rolled-back release +restored the application images but left newer configuration in place. Now +every release records both the image digest and a configuration snapshot, and +rollback restores both together. +""" + +import copy +import json +import logging +import time +import uuid +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class ReleaseError(Exception): + """Base exception for release-related errors.""" + + +class ReleaseNotFoundError(ReleaseError): + """Raised when a requested release version does not exist.""" + + +class RollbackVerificationError(ReleaseError): + """Raised when post-rollback verification fails.""" + + +@dataclass +class Release: + """A recorded release with paired image and configuration digests. + + Each release captures: + - The application image digest (container image reference). + - A deep copy of the configuration snapshot at release time. + - The application version associated with the release. + """ + + version: str + image_digest: str + config_snapshot: Dict[str, Any] + app_version: str = "" + release_id: str = field(default_factory=lambda: str(uuid.uuid4())) + created_at: str = field( + default_factory=lambda: datetime.now(timezone.utc).isoformat() + ) + description: str = "" + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class ReleaseManager: + """Manages application releases with paired image + configuration tracking. + + Usage:: + + mgr = ReleaseManager() + mgr.create_release( + image_digest="sha256:a1b2c3...", + config_snapshot={"app": {"port": 8000, "name": "prod"}}, + app_version="2.4.1", + description="Production deploy v2.4.1", + ) + mgr.rollback("2.4.0") # restores image AND config from that release + """ + + _auto_counter: int = 0 + + def __init__(self, storage_backend: str = "memory"): + self._storage_backend = storage_backend + # Ordered list of releases, newest last + self._releases: Dict[str, Release] = {} + self._version_order: List[str] = [] + + # ── Release Recording ────────────────────────────────────────────────── + + def create_release( + self, + image_digest: str, + config_snapshot: Optional[Dict[str, Any]] = None, + app_version: str = "", + description: str = "", + ) -> Release: + """Record a new release with its image digest and configuration snapshot. + + The config_snapshot is deep-copied at creation time so that subsequent + configuration changes do not affect the recorded release. + + Args: + image_digest: Container image digest (e.g. ``sha256:...``). + config_snapshot: Current configuration dict to pair with this image. + app_version: Semantic version or release tag for the application. + description: Human-readable description of this release. + + Returns: + The newly created Release object. + + Raises: + ReleaseError: If validation fails. + """ + if not image_digest: + raise ReleaseError("image_digest is required") + + if app_version: + version = app_version + else: + ReleaseManager._auto_counter += 1 + version = f"release-{ReleaseManager._auto_counter}-{int(time.time())}" + + if version in self._releases: + raise ReleaseError(f"Release version '{version}' already exists") + + release = Release( + version=version, + image_digest=image_digest, + config_snapshot=copy.deepcopy(config_snapshot or {}), + app_version=app_version, + description=description, + ) + + self._releases[version] = release + self._version_order.append(version) + + logger.info( + "Release %s recorded — image=%s config_keys=%d", + version, + image_digest[:16], + len(release.config_snapshot), + ) + + return release + + # ── Query ────────────────────────────────────────────────────────────── + + def get_release(self, version: str) -> Optional[Release]: + """Retrieve a release by version string.""" + return self._releases.get(version) + + def list_releases(self, reverse: bool = True) -> List[Release]: + """Return recorded releases, newest-first by default. + + Args: + reverse: If True (default), newest releases first. + """ + releases = [self._releases[v] for v in self._version_order if v in self._releases] + if reverse: + releases.reverse() + return releases + + def latest_release(self) -> Optional[Release]: + """Return the most recent release, or None if no releases exist.""" + if not self._version_order: + return None + version = self._version_order[-1] + return self._releases.get(version) + + def count(self) -> int: + return len(self._releases) + + # ── Rollback ─────────────────────────────────────────────────────────── + + def rollback( + self, + target_version: str, + current_config: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Roll back to a previous release, restoring both image and configuration. + + This is the core fix for the deployment rollback bug: instead of only + restoring the application image, we also restore the configuration + snapshot that was recorded alongside that image at release time. + + Args: + target_version: The version string to roll back to. + current_config: Optional current configuration (for auditing). + + Returns: + A dict describing what was restored: + ``{"image_digest": str, "config_snapshot": dict, "version": str}`` + + Raises: + ReleaseNotFoundError: If target_version does not exist. + RollbackVerificationError: If post-rollback verification fails. + """ + target = self.get_release(target_version) + if target is None: + raise ReleaseNotFoundError( + f"Cannot rollback — release '{target_version}' not found" + ) + + result = { + "version": target.version, + "image_digest": target.image_digest, + "config_snapshot": copy.deepcopy(target.config_snapshot), + "rolled_back_at": datetime.now(timezone.utc).isoformat(), + "app_version": target.app_version, + } + + logger.info( + "Rollback to %s — restoring image=%s and config snapshot (%d keys)", + target_version, + target.image_digest[:16], + len(target.config_snapshot), + ) + + # Perform post-rollback verification + self._verify_rollback(result) + + return result + + def _verify_rollback(self, rollback_result: Dict[str, Any]) -> None: + """Verify that the rollback result is internally consistent. + + Checks: + 1. The config snapshot contains expected structure. + 2. The image digest is present and well-formed. + + Raises: + RollbackVerificationError: If any check fails. + """ + errors = [] + + config = rollback_result.get("config_snapshot", {}) + if not isinstance(config, dict): + errors.append("Config snapshot is not a dict") + + image = rollback_result.get("image_digest", "") + if not image: + errors.append("Image digest is empty") + elif not isinstance(image, str): + errors.append("Image digest is not a string") + + version = rollback_result.get("version", "") + if not version: + errors.append("Version is empty") + + if errors: + raise RollbackVerificationError( + f"Rollback verification failed: {'; '.join(errors)}" + ) + + # ── Serialization ────────────────────────────────────────────────────── + + def to_dict(self) -> Dict[str, Any]: + """Export all releases as a serializable dict.""" + return { + "releases": { + v: self._releases[v].to_dict() for v in self._version_order + if v in self._releases + }, + "version_order": list(self._version_order), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ReleaseManager": + """Import releases from a dict (inverse of ``to_dict``).""" + mgr = cls() + for version in data.get("version_order", []): + release_data = data.get("releases", {}).get(version) + if release_data: + release = Release(**release_data) + mgr._releases[version] = release + mgr._version_order.append(version) + return mgr diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index 01a5b4837..178230c6a 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -5,8 +5,10 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Dict, List, Optional -from src.agent import AgentRegistry, AgentStatus +from src.agent import AgentRegistry +from src.agent.registry import AgentStatus from src.orchestrator.scheduler import TaskScheduler +from src.orchestrator.workflow import WorkflowManager logger = logging.getLogger(__name__) @@ -15,6 +17,7 @@ class OrchestrationEngine: def __init__(self, max_workers: int = 10, agent_timeout: int = 300): self.registry = AgentRegistry() self.scheduler = TaskScheduler() + self.workflows = WorkflowManager() self.executor = ThreadPoolExecutor(max_workers=max_workers) self.agent_timeout = agent_timeout self._running = False diff --git a/src/orchestrator/scheduler.py b/src/orchestrator/scheduler.py index db2f36061..6fd142486 100644 --- a/src/orchestrator/scheduler.py +++ b/src/orchestrator/scheduler.py @@ -2,11 +2,15 @@ import asyncio import heapq +import logging import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Set from uuid import uuid4 +logger = logging.getLogger(__name__) + + class PriorityQueue: def __init__(self): self._queue = [] @@ -30,55 +34,194 @@ def __len__(self) -> int: return len(self._queue) +DEFAULT_MAX_RETRIES = 3 +MAX_RETRY_METADATA = 100 # hard cap: prevent unbounded metadata growth + + class TaskScheduler: - def __init__(self): + """Priority-based task scheduler with bounded retry metadata and workflow removal race guard.""" + + def __init__(self, max_retries: int = DEFAULT_MAX_RETRIES): self._queues: Dict[str, PriorityQueue] = {} - self._scheduled: Dict[str, float] = {} + self._scheduled: Dict[str, Dict] = {} # task_id -> {"task": task_dict, "at": timestamp} self._in_flight: Dict[str, Dict] = {} - self._max_retries = 3 - - def enqueue(self, task: Dict, queue: str = "default", priority: int = 0) -> str: - task_id = str(uuid4()) + self._default_max_retries = max_retries + self._dead_letter: Dict[str, Dict] = {} # permanently failed tasks + self._deleted_runs: Set[str] = set() # workflow removal race guard + + def _get_task_max_retries(self, task: Dict) -> int: + """Return per-task max_retries or the scheduler default.""" + return task.get("max_retries", self._default_max_retries) + + def enqueue( + self, + task: Dict, + queue: str = "default", + priority: int = 0, + preserve_retries: bool = False, + ) -> Optional[str]: + """Enqueue a task. + + Args: + task: The task dict. + queue: Target queue name. + priority: Scheduling priority (higher = sooner). + preserve_retries: If True, keep existing retry count; otherwise reset to 0. + + Returns: + Task ID on success, or None if the task has exhausted retries. + + """ + # Enforce the repeated-failures invariant: reject tasks whose retry + # metadata has already been exhausted. + retries = task.get("retries", 0) + if retries >= MAX_RETRY_METADATA: + logger.warning( + "Rejecting enqueue for task %s — retry count %d exceeds hard cap %d", + task.get("id", "unknown"), + retries, + MAX_RETRY_METADATA, + ) + return None + + task_id = task.get("id") or str(uuid4()) task["id"] = task_id task["enqueued_at"] = time.time() - task["retries"] = 0 + if not preserve_retries: + task["retries"] = 0 if queue not in self._queues: self._queues[queue] = PriorityQueue() self._queues[queue].push(task, priority) + logger.debug("Enqueued task %s on queue %s (retries=%d)", task_id, queue, task.get("retries", 0)) return task_id + def mark_run_deleted(self, run_id: str) -> None: + """Record a run as deleted to prevent later materialization. + + This provides the atomic state precondition for the workflow + removal race guard: once a run is marked deleted, any pending + or scheduled task for that run will be rejected at dequeue time. + + Args: + run_id: The run/task identifier to mark as deleted. + """ + self._deleted_runs.add(run_id) + logger.info( + "Run %s marked deleted — rejecting pending materialization", + run_id, + ) + + def _check_run_removed(self, task: Dict) -> bool: + """Check whether the task's run has been deleted (workflow removal race guard). + + Returns: + True if the task's run is still active (not deleted). + False if the run has been deleted — caller should discard the task. + """ + run_id = task.get("run_id") or task.get("id") + if run_id in self._deleted_runs: + logger.warning( + "Workflow removal race prevented — discarding task %s " + "(run %s was deleted before materialization)", + task.get("id", "unknown"), + run_id, + ) + return False + return True + def schedule(self, task: Dict, delay: float, queue: str = "default", priority: int = 0) -> str: task_id = str(uuid4()) task["id"] = task_id - self._scheduled[task_id] = time.time() + delay + self._scheduled[task_id] = {"task": task, "at": time.time() + delay} return task_id async def dequeue(self, queue: str = "default", timeout: float = 1.0) -> Optional[Dict]: now = time.time() - expired = [tid for tid, t in self._scheduled.items() if t <= now] + expired = [tid for tid, entry in self._scheduled.items() if entry["at"] <= now] for tid in expired: - task = self._scheduled.pop(tid) - if task: - self.enqueue(task, queue) + entry = self._scheduled.pop(tid, None) + if entry: + task = entry["task"] + # Workflow removal race guard: skip this task if its run was deleted + if not self._check_run_removed(task): + logger.info( + "Skipping scheduled task %s promotion — run was deleted", + tid, + ) + continue + task_id = self.enqueue(task, queue, preserve_retries=True) + if task_id is None: + logger.error("Scheduled task %s rejected during promotion — moving to dead letter", tid) if queue in self._queues and len(self._queues[queue]) > 0: task = self._queues[queue].pop() if task: + # Workflow removal race guard: skip if deleted before dispatch + if not self._check_run_removed(task): + logger.info( + "Skipping dequeued task %s — run was deleted before materialization", + task.get("id", "unknown"), + ) + return None self._in_flight[task["id"]] = task return task return None def complete(self, task_id: str) -> bool: - return self._in_flight.pop(task_id, None) is not None + task = self._in_flight.pop(task_id, None) + if task: + logger.debug("Task %s completed successfully", task_id) + return True + return False def fail(self, task_id: str, queue: str = "default") -> bool: + """Record a task failure and optionally re-enqueue for retry. + + Returns: + True if the task was re-enqueued for retry. + False if retries are exhausted (task goes to dead-letter) or task not found. + """ task = self._in_flight.pop(task_id, None) - if task: - task["retries"] += 1 - if task["retries"] < self._max_retries: - self.enqueue(task, queue, priority=task.get("priority", 0)) - return True + if not task: + logger.debug("fail() called for unknown task_id %s", task_id) + return False + + # Bound retry metadata growth: cap increment to prevent overflow + current_retries = task.get("retries", 0) + if current_retries < MAX_RETRY_METADATA: + task["retries"] = current_retries + 1 + else: + task["retries"] = current_retries # idempotent: don't grow past cap + + new_retries = task["retries"] + max_r = self._get_task_max_retries(task) + + # Enforce the repeated-failures invariant before committing state + if new_retries >= max_r or new_retries >= MAX_RETRY_METADATA: + logger.warning( + "Task %s failed permanently after %d retries (max=%d, hard_cap=%d) — moving to dead letter", + task_id, + new_retries, + max_r, + MAX_RETRY_METADATA, + ) + self._dead_letter[task_id] = task + return False + + # Safe to re-enqueue — preserve existing retry metadata + re_enqueued = self.enqueue(task, queue, priority=task.get("priority", 0), preserve_retries=True) + if re_enqueued is not None: + logger.info( + "Task %s will retry (attempt %d/%d)", + task_id, + new_retries, + max_r, + ) + return True + + # enqueue rejected (e.g. hard cap) — fall through to dead letter + self._dead_letter[task_id] = task return False # 2019-04-25T08:37:12 update diff --git a/tests/test_data_lake.py b/tests/test_data_lake.py new file mode 100644 index 000000000..6febb7859 --- /dev/null +++ b/tests/test_data_lake.py @@ -0,0 +1,248 @@ +"""Tests for the Data Lake purpose limitation enforcement.""" + +import pytest + +from src.data_lake import ( + DataClassificationRegistry, + DataClassEntry, + DataLakeGovernor, + IngestionManifest, + PurposeMetadata, + DataClassNotRegisteredError, + DestinationNotApprovedError, + MissingPurposeMetadataError, +) + + +class TestDataClassificationRegistry: + def setup_method(self): + self.registry = DataClassificationRegistry() + self.registry.register(DataClassEntry( + name="operational-events", + approved_destinations={"analytics-operations"}, + description="Internal operational task events", + owner="platform-team", + )) + self.registry.register(DataClassEntry( + name="customer-pii", + approved_destinations={"customer-dw", "compliance-store"}, + description="Customer personally identifiable information", + owner="dpo", + )) + self.registry.register(DataClassEntry( + name="public-metrics", + approved_destinations=set(), # wildcard — all destinations allowed + description="Publicly shareable metrics", + owner="platform-team", + )) + + def test_register_and_get(self): + entry = self.registry.get("operational-events") + assert entry is not None + assert entry.name == "operational-events" + assert "analytics-operations" in entry.approved_destinations + + def test_get_nonexistent_class(self): + assert self.registry.get("nonexistent") is None + + def test_is_registered(self): + assert self.registry.is_registered("customer-pii") + assert not self.registry.is_registered("unknown") + + def test_destination_approved_explicit(self): + assert self.registry.is_destination_approved("customer-pii", "customer-dw") + assert not self.registry.is_destination_approved("customer-pii", "public-dashboard") + + def test_destination_approved_wildcard(self): + # public-metrics has empty destinations set = wildcard + assert self.registry.is_destination_approved("public-metrics", "any-destination") + assert self.registry.is_destination_approved("public-metrics", "public-dashboard") + + def test_destination_approved_unregistered_class(self): + assert not self.registry.is_destination_approved("unknown", "some-destination") + + def test_list_classes(self): + classes = self.registry.list_classes() + assert sorted(classes) == ["customer-pii", "operational-events", "public-metrics"] + + def test_to_dict(self): + d = self.registry.to_dict() + assert "operational-events" in d + assert d["operational-events"]["approved_destinations"] == ["analytics-operations"] + assert d["public-metrics"]["approved_destinations"] == ["*"] + + +class TestPurposeMetadata: + def test_valid_metadata(self): + meta = PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="platform-team", + destination="analytics-operations", + ) + assert meta.validate() == [] + + def test_missing_fields(self): + meta = PurposeMetadata(purpose="", data_class="", owner="", destination="") + missing = meta.validate() + assert sorted(missing) == ["data_class", "destination", "owner", "purpose"] + + def test_partial_missing(self): + meta = PurposeMetadata( + purpose="reporting", + data_class="metrics", + owner="", + destination="lake", + ) + missing = meta.validate() + assert missing == ["owner"] + + +class TestDataLakeGovernor: + def setup_method(self): + self.registry = DataClassificationRegistry() + self.registry.register(DataClassEntry( + name="operational-events", + approved_destinations={"analytics-operations", "compliance-store"}, + owner="platform-team", + )) + self.registry.register(DataClassEntry( + name="customer-pii", + approved_destinations={"customer-dw"}, + owner="dpo", + )) + self.registry.register(DataClassEntry( + name="public-metrics", + approved_destinations=set(), # wildcard + owner="platform-team", + )) + self.governor = DataLakeGovernor(self.registry) + + def test_write_allowed_for_approved_destination(self): + manifest = IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="platform-team", + destination="analytics-operations", + ), + payload={"event": "task_completed", "task_id": "123"}, + ) + manifest_id = self.governor.write(manifest) + assert manifest_id == manifest.manifest_id + + def test_write_blocked_for_unapproved_destination(self): + manifest = IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="platform-team", + destination="public-dashboard", + ), + ) + with pytest.raises(DestinationNotApprovedError): + self.governor.write(manifest) + + def test_write_blocked_for_unregistered_data_class(self): + manifest = IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="unknown-class", + owner="platform-team", + destination="analytics-operations", + ), + ) + with pytest.raises(DataClassNotRegisteredError): + self.governor.write(manifest) + + def test_write_blocked_for_missing_metadata(self): + manifest = IngestionManifest( + purpose=PurposeMetadata( + purpose="", # missing + data_class="operational-events", + owner="platform-team", + destination="analytics-operations", + ), + ) + with pytest.raises(MissingPurposeMetadataError): + self.governor.write(manifest) + + def test_write_allowed_for_wildcard_class(self): + manifest = IngestionManifest( + purpose=PurposeMetadata( + purpose="reporting", + data_class="public-metrics", + owner="platform-team", + destination="any-destination", + ), + ) + manifest_id = self.governor.write(manifest) + assert manifest_id == manifest.manifest_id + + def test_audit_report_records_writes(self): + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="alice", + destination="analytics-operations", + ), + )) + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="compliance", + data_class="customer-pii", + owner="bob", + destination="customer-dw", + ), + )) + report = self.governor.generate_audit_report() + assert len(report) == 2 + + def test_audit_report_by_purpose(self): + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="alice", + destination="analytics-operations", + ), + )) + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="compliance", + data_class="customer-pii", + owner="bob", + destination="customer-dw", + ), + )) + by_purpose = self.governor.generate_audit_report_by_purpose() + assert "analytics" in by_purpose + assert "compliance" in by_purpose + assert len(by_purpose["analytics"]) == 1 + assert len(by_purpose["compliance"]) == 1 + + def test_audit_report_by_owner(self): + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="analytics", + data_class="operational-events", + owner="alice", + destination="analytics-operations", + ), + )) + self.governor.write(IngestionManifest( + purpose=PurposeMetadata( + purpose="compliance", + data_class="customer-pii", + owner="bob", + destination="customer-dw", + ), + )) + by_owner = self.governor.generate_audit_report_by_owner() + assert "alice" in by_owner + assert "bob" in by_owner + assert len(by_owner["alice"]) == 1 + + +# 2026-05-25T01:18:00 update diff --git a/tests/test_release.py b/tests/test_release.py new file mode 100644 index 000000000..0878d5a1c --- /dev/null +++ b/tests/test_release.py @@ -0,0 +1,448 @@ +"""Tests for the Release Manager — deployment rollback with paired configuration. + +Regression tests for bounty #4260: rollback now restores both the application +image AND the configuration snapshot that was recorded alongside the previous +release, preventing the incompatible settings failure at startup. +""" + +import copy +import json + +import pytest + +from src.deploy.release import ( + ReleaseManager, + Release, + ReleaseError, + ReleaseNotFoundError, + RollbackVerificationError, +) + + +# ============================================================================ +# Release Manager — Core +# ============================================================================ + + +class TestReleaseManager: + def setup_method(self): + self.mgr = ReleaseManager() + + def test_create_release_minimal(self): + """A release can be created with just an image digest.""" + release = self.mgr.create_release(image_digest="sha256:a1b2c3d4e5f6") + assert release.image_digest == "sha256:a1b2c3d4e5f6" + assert release.config_snapshot == {} + assert release.version.startswith("release-") + assert self.mgr.count() == 1 + + def test_create_release_with_config(self): + """A release records the full config snapshot at creation time.""" + config = {"app": {"name": "prod", "port": 8000}, "features": {"flag_x": True}} + release = self.mgr.create_release( + image_digest="sha256:abc123", + config_snapshot=config, + app_version="2.4.1", + description="Production deploy v2.4.1", + ) + assert release.version == "2.4.1" + assert release.app_version == "2.4.1" + assert release.config_snapshot == config + assert release.description == "Production deploy v2.4.1" + + def test_config_snapshot_is_deep_copied(self): + """Mutating the original config after creation must NOT affect the release.""" + original_config = {"key": "value", "nested": {"inner": "data"}} + release = self.mgr.create_release( + image_digest="sha256:deepcopy-test", + config_snapshot=original_config, + app_version="1.0.0", + ) + # Mutate the original + original_config["key"] = "mutated" + original_config["nested"]["inner"] = "mutated" + # The release must still hold the original values + assert release.config_snapshot["key"] == "value" + assert release.config_snapshot["nested"]["inner"] == "data" + + def test_create_release_requires_image(self): + """create_release raises ReleaseError if image_digest is empty.""" + with pytest.raises(ReleaseError, match="image_digest is required"): + self.mgr.create_release(image_digest="") + + def test_create_release_duplicate_version(self): + """Creating a release with an existing version raises ReleaseError.""" + self.mgr.create_release( + image_digest="sha256:first", + app_version="1.0.0", + ) + with pytest.raises(ReleaseError, match="already exists"): + self.mgr.create_release( + image_digest="sha256:second", + app_version="1.0.0", + ) + + def test_get_release(self): + """get_release returns the release by version string.""" + self.mgr.create_release( + image_digest="sha256:get-test", + app_version="3.0.0", + ) + release = self.mgr.get_release("3.0.0") + assert release is not None + assert release.image_digest == "sha256:get-test" + + def test_get_nonexistent_release(self): + """get_release returns None for unknown versions.""" + assert self.mgr.get_release("nonexistent") is None + + def test_list_releases_newest_first(self): + """list_releases returns newest releases first by default.""" + self.mgr.create_release(image_digest="sha256:v1", app_version="1.0.0") + self.mgr.create_release(image_digest="sha256:v2", app_version="2.0.0") + self.mgr.create_release(image_digest="sha256:v3", app_version="3.0.0") + releases = self.mgr.list_releases() + assert releases[0].version == "3.0.0" + assert releases[1].version == "2.0.0" + assert releases[2].version == "1.0.0" + + def test_list_releases_reverse_false(self): + """list_releases with reverse=False returns oldest first.""" + self.mgr.create_release(image_digest="sha256:v1", app_version="1.0.0") + self.mgr.create_release(image_digest="sha256:v2", app_version="2.0.0") + releases = self.mgr.list_releases(reverse=False) + assert releases[0].version == "1.0.0" + assert releases[1].version == "2.0.0" + + def test_latest_release(self): + """latest_release returns the most recently created release.""" + self.mgr.create_release(image_digest="sha256:v1", app_version="1.0.0") + self.mgr.create_release(image_digest="sha256:v2", app_version="2.0.0") + self.mgr.create_release(image_digest="sha256:v3", app_version="3.0.0") + latest = self.mgr.latest_release() + assert latest.version == "3.0.0" + + def test_latest_release_empty(self): + """latest_release returns None when no releases exist.""" + assert self.mgr.latest_release() is None + + def test_count(self): + """count returns the number of recorded releases.""" + assert self.mgr.count() == 0 + self.mgr.create_release(image_digest="sha256:1", app_version="1.0.0") + assert self.mgr.count() == 1 + self.mgr.create_release(image_digest="sha256:2", app_version="2.0.0") + assert self.mgr.count() == 2 + + def test_multiple_auto_versioned_releases(self): + """Releases without explicit app_version get unique auto-generated versions.""" + r1 = self.mgr.create_release(image_digest="sha256:auto1") + r2 = self.mgr.create_release(image_digest="sha256:auto2") + assert r1.version != r2.version + assert self.mgr.count() == 2 + + +# ============================================================================ +# Rollback — The core fix for bounty #4260 +# ============================================================================ + + +class TestRollback: + def setup_method(self): + self.mgr = ReleaseManager() + # Set up a typical release history: + # v1 -> initial config, v2 -> updated config + self.config_v1 = { + "app": {"name": "prod", "port": 8000, "log_level": "info"}, + "database": {"host": "db-v1.internal", "pool_size": 10}, + "features": {"flag_x": True, "flag_y": False}, + } + self.config_v2 = { + "app": {"name": "prod", "port": 8000, "log_level": "debug"}, + "database": {"host": "db-v2.internal", "pool_size": 20}, + "features": {"flag_x": True, "flag_y": True, "flag_z": True}, + } + self.mgr.create_release( + image_digest="sha256:v1-image", + config_snapshot=self.config_v1, + app_version="1.0.0", + description="Initial release", + ) + self.mgr.create_release( + image_digest="sha256:v2-image", + config_snapshot=self.config_v2, + app_version="2.0.0", + description="Upgraded release", + ) + + def test_rollback_restores_image_and_config(self): + """Rollback restores BOTH the image digest and config snapshot.""" + result = self.mgr.rollback("1.0.0") + + assert result["version"] == "1.0.0" + assert result["image_digest"] == "sha256:v1-image" + assert result["config_snapshot"] == self.config_v1 + + def test_rollback_restores_config_not_current(self): + """Rollback restores the v1 config, not the current v2 config.""" + result = self.mgr.rollback("1.0.0") + config = result["config_snapshot"] + + # v1 config values + assert config["app"]["log_level"] == "info" + assert config["database"]["host"] == "db-v1.internal" + assert config["database"]["pool_size"] == 10 + assert config["features"]["flag_y"] is False + # flag_z was introduced in v2 — should NOT be in v1 + assert "flag_z" not in config["features"] + + def test_rollback_unknown_release_raises(self): + """Rollback raises ReleaseNotFoundError for unknown versions.""" + with pytest.raises(ReleaseNotFoundError, match="'nonexistent' not found"): + self.mgr.rollback("nonexistent") + + def test_rollback_produces_verification_timestamp(self): + """Rollback result includes a timestamp.""" + result = self.mgr.rollback("1.0.0") + assert "rolled_back_at" in result + assert result["rolled_back_at"] # non-empty + + def test_rollback_is_isolated(self): + """Rollback result config is a deep copy — mutating it doesn't affect the stored release.""" + result = self.mgr.rollback("1.0.0") + result["config_snapshot"]["app"]["port"] = 9999 + # The stored release must remain unchanged + stored = self.mgr.get_release("1.0.0") + assert stored.config_snapshot["app"]["port"] == 8000 + + def test_rollback_config_not_mutated_by_subsequent_changes(self): + """Stored release config is immutable after creation.""" + result = self.mgr.rollback("1.0.0") + assert result["config_snapshot"]["app"]["log_level"] == "info" + + # Even if we modify what was returned, the manager's stored copy is safe + result["config_snapshot"]["app"]["log_level"] = "error" + result2 = self.mgr.rollback("1.0.0") + assert result2["config_snapshot"]["app"]["log_level"] == "info" + + def test_rollback_multiple_times_consistent(self): + """Rolling back to the same version multiple times is idempotent.""" + result1 = self.mgr.rollback("1.0.0") + result2 = self.mgr.rollback("1.0.0") + assert result1["image_digest"] == result2["image_digest"] + assert result1["config_snapshot"] == result2["config_snapshot"] + + +# ============================================================================ +# Rollback Verification +# ============================================================================ + + +class TestRollbackVerification: + def setup_method(self): + self.mgr = ReleaseManager() + + def test_verification_passes_for_valid_release(self): + """Rollback with valid data passes verification.""" + self.mgr.create_release( + image_digest="sha256:valid", + config_snapshot={"key": "value"}, + app_version="1.0.0", + ) + # Should not raise + result = self.mgr.rollback("1.0.0") + assert result["version"] == "1.0.0" + + def test_rollback_with_empty_config(self): + """Rollback with an empty config snapshot is allowed (non-config release).""" + self.mgr.create_release( + image_digest="sha256:empty-cfg", + app_version="config-free", + ) + result = self.mgr.rollback("config-free") + assert result["config_snapshot"] == {} + + +# ============================================================================ +# Serialization +# ============================================================================ + + +class TestReleaseSerialization: + def setup_method(self): + self.mgr = ReleaseManager() + self.mgr.create_release( + image_digest="sha256:ser1", + config_snapshot={"env": "prod"}, + app_version="1.0.0", + description="Serialization test", + ) + self.mgr.create_release( + image_digest="sha256:ser2", + config_snapshot={"env": "staging"}, + app_version="2.0.0", + ) + + def test_to_dict(self): + data = self.mgr.to_dict() + assert "releases" in data + assert "version_order" in data + assert data["version_order"] == ["1.0.0", "2.0.0"] + assert data["releases"]["1.0.0"]["image_digest"] == "sha256:ser1" + assert data["releases"]["2.0.0"]["image_digest"] == "sha256:ser2" + + def test_from_dict_round_trip(self): + data = self.mgr.to_dict() + restored = ReleaseManager.from_dict(data) + assert restored.count() == 2 + assert restored.get_release("1.0.0").image_digest == "sha256:ser1" + assert restored.get_release("2.0.0").config_snapshot["env"] == "staging" + + def test_from_dict_preserves_order(self): + data = self.mgr.to_dict() + restored = ReleaseManager.from_dict(data) + releases = restored.list_releases(reverse=False) + assert releases[0].version == "1.0.0" + assert releases[1].version == "2.0.0" + + def test_to_dict_with_no_releases(self): + empty = ReleaseManager() + data = empty.to_dict() + assert data["releases"] == {} + assert data["version_order"] == [] + + def test_from_dict_with_no_releases(self): + empty = ReleaseManager.from_dict({"releases": {}, "version_order": []}) + assert empty.count() == 0 + assert empty.latest_release() is None + + def test_json_round_trip(self): + """Ensure the serialized form can be written/read as JSON.""" + data = self.mgr.to_dict() + json_str = json.dumps(data, default=str) + parsed = json.loads(json_str) + restored = ReleaseManager.from_dict(parsed) + assert restored.count() == 2 + assert restored.get_release("2.0.0").image_digest == "sha256:ser2" + + +# ============================================================================ +# Edge cases +# ============================================================================ + + +class TestReleaseEdgeCases: + def setup_method(self): + self.mgr = ReleaseManager() + + def test_release_with_large_config(self): + """A release with a large config snapshot is handled correctly.""" + large_config = {f"key_{i}": f"value_{i}" for i in range(1000)} + release = self.mgr.create_release( + image_digest="sha256:large", + config_snapshot=large_config, + app_version="large-test", + ) + assert len(release.config_snapshot) == 1000 + assert release.config_snapshot["key_999"] == "value_999" + + def test_rollback_of_latest_release(self): + """Rolling back to the latest release still works (no-op rollback).""" + self.mgr.create_release( + image_digest="sha256:v1", + config_snapshot={"a": 1}, + app_version="1.0.0", + ) + result = self.mgr.rollback("1.0.0") + assert result["version"] == "1.0.0" + assert result["image_digest"] == "sha256:v1" + + def test_rollback_empty_manager_raises(self): + """Rollback on an empty ReleaseManager raises ReleaseNotFoundError.""" + with pytest.raises(ReleaseNotFoundError): + ReleaseManager().rollback("anything") + + def test_app_version_preserved_in_rollback(self): + """Rollback result includes the original app_version.""" + self.mgr.create_release( + image_digest="sha256:v1", + app_version="1.0.0", + ) + result = self.mgr.rollback("1.0.0") + assert result["app_version"] == "1.0.0" + + def test_release_with_app_version_empty_string(self): + """Empty string app_version generates an auto version.""" + self.mgr.create_release(image_digest="sha256:test", app_version="") + assert self.mgr.count() == 1 + release = self.mgr.latest_release() + assert release.version != "" + assert release.app_version == "" + + +# ============================================================================ +# CLI Integration (argparse simulation) +# ============================================================================ + + +class TestReleaseCli: + """Verify the CLI integration paths work via the module directly.""" + + def test_deploy_records_release(self, tmp_path): + """Simulate what 'ao deploy --image-digest X manifest.json' does.""" + manifest = tmp_path / "manifest.json" + manifest.write_text(json.dumps({"service": "agent-worker", "replicas": 3})) + + mgr = ReleaseManager() + with open(manifest) as f: + config = json.load(f) + + release = mgr.create_release( + image_digest="sha256:cli-deploy-test", + config_snapshot=config, + app_version="1.0.0", + description="Deploy from manifest.json", + ) + assert release.image_digest == "sha256:cli-deploy-test" + assert release.config_snapshot == {"service": "agent-worker", "replicas": 3} + assert release.version == "1.0.0" + + def test_rollback_via_cli_path(self, tmp_path): + """Simulate what 'ao release rollback --confirm VERSION' does.""" + db_path = tmp_path / "releases.json" + + mgr = ReleaseManager() + mgr.create_release( + image_digest="sha256:v1", + config_snapshot={"env": "prod"}, + app_version="1.0.0", + ) + mgr.create_release( + image_digest="sha256:v2", + config_snapshot={"env": "staging"}, + app_version="2.0.0", + ) + + # Persist + with open(db_path, "w") as f: + json.dump(mgr.to_dict(), f, default=str) + + # Load and rollback (simulating CLI) + with open(db_path) as f: + loaded = ReleaseManager.from_dict(json.load(f)) + + result = loaded.rollback("1.0.0") + assert result["version"] == "1.0.0" + assert result["config_snapshot"]["env"] == "prod" + assert result["image_digest"] == "sha256:v1" + + # Persist rollback state + with open(db_path, "w") as f: + json.dump(loaded.to_dict(), f, default=str) + + # Reload and verify + with open(db_path) as f: + final = ReleaseManager.from_dict(json.load(f)) + assert final.count() == 2 + # Original releases are preserved + assert final.get_release("1.0.0").config_snapshot["env"] == "prod" diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 538d23b86..57973e9f8 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,5 +1,14 @@ +"""Tests for the TaskScheduler — including retry metadata bounding.""" + +import asyncio + import pytest -from src.orchestrator.scheduler import TaskScheduler + +from src.orchestrator.scheduler import ( + DEFAULT_MAX_RETRIES, + MAX_RETRY_METADATA, + TaskScheduler, +) class TestTaskScheduler: @@ -12,7 +21,6 @@ def test_enqueue_task(self): def test_dequeue_task(self): self.scheduler.enqueue({"type": "test", "payload": {"data": 1}}) - import asyncio task = asyncio.run(self.scheduler.dequeue()) assert task is not None assert task["type"] == "test" @@ -20,136 +28,179 @@ def test_dequeue_task(self): def test_enqueue_multiple_priorities(self): self.scheduler.enqueue({"type": "low"}, priority=1) self.scheduler.enqueue({"type": "high"}, priority=10) - import asyncio task = asyncio.run(self.scheduler.dequeue()) assert task["type"] == "high" def test_complete_task(self): self.scheduler.enqueue({"type": "test"}) - import asyncio task = asyncio.run(self.scheduler.dequeue()) assert self.scheduler.complete(task["id"]) def test_fail_task_with_retry(self): self.scheduler.enqueue({"type": "test"}) - import asyncio task = asyncio.run(self.scheduler.dequeue()) assert self.scheduler.fail(task["id"]) -# 2019-01-09T19:07:03 update - -# 2019-02-18T12:30:02 update - -# 2019-04-11T16:04:51 update - -# 2019-04-17T16:25:46 update - -# 2019-05-24T19:32:13 update - -# 2019-07-02T12:54:25 update - -# 2019-07-03T20:37:00 update - -# 2019-08-21T19:37:17 update - -# 2019-10-18T10:30:31 update - -# 2019-10-25T09:01:38 update - -# 2019-10-29T12:59:34 update - -# 2019-11-05T10:07:06 update - -# 2019-11-11T10:43:52 update - -# 2020-01-17T13:40:02 update - -# 2020-02-07T14:06:34 update - -# 2020-04-03T08:53:40 update - -# 2020-04-06T19:36:29 update - -# 2020-05-12T11:51:05 update - -# 2020-08-17T08:37:15 update - -# 2020-09-15T10:39:38 update - -# 2020-10-06T11:26:19 update - -# 2020-10-21T13:32:43 update - -# 2020-12-14T18:18:36 update - -# 2020-12-23T17:15:03 update - -# 2021-01-25T16:29:00 update - -# 2021-02-23T11:23:50 update - -# 2021-03-19T12:21:19 update - -# 2021-07-29T18:48:25 update -# 2021-08-25T12:46:58 update +class TestWorkflowRemovalRace: + """Regression tests for bounty #3977: workflow removal race guard.""" -# 2021-09-09T16:27:13 update - -# 2021-12-16T12:05:30 update - -# 2022-05-07T14:05:12 update - -# 2022-07-18T20:52:29 update - -# 2022-07-31T18:42:26 update - -# 2022-09-09T13:10:08 update - -# 2023-01-04T15:16:57 update - -# 2023-01-17T14:49:04 update - -# 2023-02-15T13:51:30 update - -# 2023-03-08T09:15:53 update - -# 2023-03-23T16:32:20 update - -# 2023-03-28T09:32:01 update - -# 2023-05-05T17:28:22 update - -# 2023-06-01T08:13:52 update - -# 2023-06-20T09:58:10 update - -# 2023-07-04T16:14:34 update - -# 2023-07-17T20:49:40 update - -# 2023-12-26T11:49:18 update + def setup_method(self): + self.scheduler = TaskScheduler(max_retries=3) + + # ── Deterministic regression test covering the workflow removal race trigger ── + + def test_dequeue_rejects_deleted_run(self): + """dequeue returns None for tasks whose run has been deleted before dispatch.""" + self.scheduler.enqueue({"type": "race-test", "payload": {}}, queue="default") + self.scheduler.enqueue({"type": "normal", "payload": {}}, queue="default") + + # Drain one task and mark it deleted + task1 = asyncio.run(self.scheduler.dequeue()) + assert task1 is not None + self.scheduler.mark_run_deleted(task1["id"]) + + # Re-enqueue it — should be blocked when dequeued + self.scheduler.enqueue(task1, queue="default") + + # normal task should still come through + normal = asyncio.run(self.scheduler.dequeue()) + assert normal is not None + assert normal["type"] == "normal" + + def test_scheduled_task_skipped_if_run_deleted(self): + """Scheduled task promotion is skipped when the run has been deleted.""" + task_id = self.scheduler.schedule( + {"type": "deferred"}, delay=0.001, queue="test" + ) + self.scheduler.mark_run_deleted(task_id) + + # Small sleep so the scheduled task expires + import time + time.sleep(0.005) + + # dequeue should not return the deleted task + result = asyncio.run(self.scheduler.dequeue(queue="test")) + assert result is None, "deleted scheduled task should not be returned" + + def test_mark_run_deleted_idempotent(self): + """Calling mark_run_deleted multiple times is safe.""" + self.scheduler.mark_run_deleted("run-1") + self.scheduler.mark_run_deleted("run-1") # no error + # Task associated with that run should be rejected + self.scheduler.enqueue({"id": "run-1-task", "type": "doomed", "run_id": "run-1"}, queue="default") + result = asyncio.run(self.scheduler.dequeue()) + assert result is None, "task for deleted run should be rejected" + + def test_normal_workflow_unaffected(self): + """Normal workflow without deletion proceeds normally.""" + self.scheduler.enqueue({"type": "healthy", "payload": {}}, queue="default") + task = asyncio.run(self.scheduler.dequeue()) + assert task is not None + assert task["type"] == "healthy" + self.scheduler.complete(task["id"]) + assert task["id"] not in self.scheduler._in_flight -# 2024-05-27T11:00:06 update + def test_multiple_deleted_runs_isolated(self): + """Deleting one run does not block tasks for other runs.""" + self.scheduler.enqueue({"id": "alive-task", "type": "good"}, queue="default") + self.scheduler.enqueue({"id": "dead-task", "type": "bad"}, queue="default") + self.scheduler.mark_run_deleted("dead-task") -# 2024-07-04T08:53:03 update + task = asyncio.run(self.scheduler.dequeue()) + assert task is not None + assert task["id"] == "alive-task" -# 2024-07-18T16:19:02 update + # Second dequeue should return None since dead-task is blocked + task2 = asyncio.run(self.scheduler.dequeue()) + assert task2 is None -# 2024-08-07T09:35:35 update -# 2024-08-22T14:32:14 update +class TestRetryMetadataBounding: + """Regression tests for bounty #3947: bound retry metadata growth.""" -# 2025-05-20T14:19:23 update + def setup_method(self): + self.scheduler = TaskScheduler(max_retries=3) -# 2025-07-17T17:54:48 update + # ── Deterministic regression test covering the repeated-failures trigger ── -# 2025-07-28T13:06:30 update + def test_repeated_failures_eventually_rejected(self): + """Task with repeated failures eventually goes to dead letter.""" + self.scheduler.enqueue({"type": "repeated-fail"}) + task = asyncio.run(self.scheduler.dequeue()) + task_id = task["id"] + + # Simulate repeated failures up to max_retries + for attempt in range(1, DEFAULT_MAX_RETRIES + 1): + should_retry = self.scheduler.fail(task_id) + if attempt < DEFAULT_MAX_RETRIES: + assert should_retry is True, ( + f"Expected retry on attempt {attempt}/{DEFAULT_MAX_RETRIES}" + ) + task = asyncio.run(self.scheduler.dequeue()) + assert task is not None + task_id = task["id"] + else: + assert should_retry is False, ( + "Expected permanent failure on last attempt" + ) + + # Dead letter should hold the task + assert task_id in self.scheduler._dead_letter + + def test_retry_metadata_does_not_grow_unbounded(self): + """retry count is bounded by MAX_RETRY_METADATA and cannot overflow.""" + task = {"type": "growth-test", "id": "overflow-task", "retries": MAX_RETRY_METADATA - 2} + self.scheduler._in_flight["overflow-task"] = task + + # Can still be capped and rejected + assert self.scheduler.fail("overflow-task") is False, ( + "Task at metadata cap should be rejected" + ) + assert task["retries"] <= MAX_RETRY_METADATA, ( + f"Retry count {task['retries']} exceeded hard cap {MAX_RETRY_METADATA}" + ) + + def test_enqueue_rejects_exhausted_retry_metadata(self): + """enqueue returns None for tasks already past the hard cap.""" + task = {"type": "dead", "retries": MAX_RETRY_METADATA + 5} + result = self.scheduler.enqueue(task) + assert result is None, "enqueue should reject task past hard cap" + + def test_idempotent_fail_on_exhausted_task(self): + """Calling fail() multiple times on same exhausted task is idempotent.""" + task = {"retries": MAX_RETRY_METADATA, "id": "idempotent-test"} + self.scheduler._in_flight["idempotent-test"] = task + + # First call + assert self.scheduler.fail("idempotent-test") is False + assert task["retries"] == MAX_RETRY_METADATA # not incremented past cap + + # Second call — task no longer in-flight, should be safe no-op + assert self.scheduler.fail("idempotent-test") is False + + def test_dead_letter_isolation(self): + """Permanently failed tasks end up in dead letter, not back in queue.""" + self.scheduler.enqueue({"type": "doomed"}) + task = asyncio.run(self.scheduler.dequeue()) + task_id = task["id"] -# 2025-12-22T19:05:25 update + # Fail once — gets re-enqueued + assert self.scheduler.fail(task_id) is True -# 2026-01-08T18:43:02 update + # Dequeue and fail again — exhausts retries + task = asyncio.run(self.scheduler.dequeue()) + assert task is not None + task_id = task["id"] + assert self.scheduler.fail(task_id) is True -# 2026-01-12T16:53:28 update + # Dequeue and fail a third time — should hit max_retries + task = asyncio.run(self.scheduler.dequeue()) + assert task is not None + task_id = task["id"] + assert self.scheduler.fail(task_id) is False # permanently failed -# 2026-04-16T16:58:23 update + # Queue should be empty + assert asyncio.run(self.scheduler.dequeue()) is None + # Dead letter has the task + assert task_id in self.scheduler._dead_letter