Skip to content
Draft
162 changes: 81 additions & 81 deletions automation/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""FastAPI application entrypoint."""
"""FastAPI application with Temporal workflow execution.

This application uses Temporal for durable workflow execution:
- Temporal worker runs as a background task
- Temporal Schedules handle cron-based automation triggers
- Activities handle sandbox operations with automatic retries
- Workflows provide crash-proof execution guarantees
"""

import asyncio
import logging
Expand All @@ -9,122 +16,106 @@
from fastapi.responses import JSONResponse
from sqlalchemy import text

from automation.auth import create_http_client
from automation.config import get_settings
from automation.db import create_engine, create_session_factory
from automation.dispatcher import dispatcher_loop
from automation.logger import setup_all_loggers
from automation.preset_router import router as preset_router
from automation.router import router
from automation.scheduler import scheduler_loop
from automation.temporal.client import close_temporal_client, get_temporal_client
from automation.temporal.worker import create_worker
from automation.uploads import router as uploads_router
from automation.watchdog import watchdog_loop


logger = logging.getLogger("automation.app")


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application startup/shutdown lifecycle."""
# Startup
"""Application startup/shutdown lifecycle with Temporal."""
settings = get_settings()

# Apply the repo-wide JSON structured-logging convention
# Apply structured logging
setup_all_loggers()

# Silence noisy third-party loggers
# Silence noisy loggers
for noisy_logger in (
"ddtrace",
"httpx",
"httpcore",
"sqlalchemy.engine", # Suppress SQL statement logging
"sqlalchemy.engine",
):
logging.getLogger(noisy_logger).setLevel(logging.WARNING)

logger.info("Starting OpenHands Automations Service")

# Create shared httpx client for auth (stored in app.state for DI)
app.state.http_client = create_http_client()

# Create engine and session factory, store in app.state
# Create database engine and session factory
engine_result = await create_engine(settings)
app.state.engine_result = engine_result
app.state.engine = engine_result.engine
app.state.session_factory = create_session_factory(engine_result.engine)

# Start the background scheduler and dispatcher
# Initialize Temporal client
try:
temporal_client = await get_temporal_client()
app.state.temporal_client = temporal_client
logger.info("Temporal client connected")
except Exception as e:
logger.error("Failed to connect to Temporal: %s", e)
raise

# Start Temporal worker as background task (unless skip_worker is set)
# When running with separate worker pods, skip_worker should be True to avoid
# conflicts between ddtrace instrumentation and Temporal's workflow sandbox
shutdown_event = asyncio.Event()
app.state.shutdown_event = shutdown_event
worker_task = None

# Scheduler: polls automations and creates PENDING runs
scheduler_task = asyncio.create_task(
scheduler_loop(
app.state.session_factory,
interval_seconds=settings.scheduler_interval_seconds,
shutdown_event=shutdown_event,
)
)
app.state.scheduler_task = scheduler_task
logger.info("Background scheduler started")

# Dispatcher: picks up PENDING runs and dispatches them
if not settings.base_url:
logger.warning(
"AUTOMATION_BASE_URL not set — using localhost. "
"Sandboxes in the cloud won't be able to reach this URL."
)
dispatcher_task = asyncio.create_task(
dispatcher_loop(
app.state.session_factory,
settings=settings,
interval_seconds=settings.dispatcher_interval_seconds,
shutdown_event=shutdown_event,
)
)
app.state.dispatcher_task = dispatcher_task
logger.info("Background dispatcher started")

# Watchdog: marks stale RUNNING runs as FAILED
watchdog_task = asyncio.create_task(
watchdog_loop(
app.state.session_factory,
settings=settings,
shutdown_event=shutdown_event,
if not settings.skip_worker:
worker = await create_worker(temporal_client, settings)
worker_task = asyncio.create_task(
_run_worker_with_shutdown(worker, shutdown_event),
name="temporal-worker",
)
)
app.state.watchdog_task = watchdog_task
logger.info("Background watchdog started")
app.state.worker_task = worker_task
logger.info("Temporal worker started")
else:
logger.info("Skipping in-process worker (AUTOMATION_SKIP_WORKER=true)")

yield

# Shutdown
logger.info("Shutting down background tasks...")
logger.info("Shutting down...")
shutdown_event.set()

# Wait for all tasks to exit gracefully
for task_name, task in [
("scheduler", scheduler_task),
("dispatcher", dispatcher_task),
("watchdog", watchdog_task),
]:
# Wait for worker to stop (if we started one)
if worker_task is not None:
try:
await asyncio.wait_for(task, timeout=5.0)
await asyncio.wait_for(worker_task, timeout=10.0)
except TimeoutError:
logger.warning("%s did not exit in time, cancelling", task_name)
task.cancel()
logger.warning("Worker did not stop in time, cancelling")
worker_task.cancel()
try:
await task
await worker_task
except asyncio.CancelledError:
pass

await app.state.http_client.aclose()
await app.state.engine_result.dispose()
# Close Temporal client
await close_temporal_client()

# Close database
await engine_result.dispose()
logger.info("Automations service shut down")


async def _run_worker_with_shutdown(worker, shutdown_event: asyncio.Event):
"""Run worker until shutdown event is set."""
async with worker:
await shutdown_event.wait()
logger.info("Worker received shutdown signal")


def _build_cors_origins() -> list[str]:
"""Build the list of allowed CORS origins from settings."""
"""Build the list of allowed CORS origins."""
settings = get_settings()
origins = [o.strip() for o in settings.cors_origins.split(",") if o.strip()]
if not origins:
Expand All @@ -135,14 +126,10 @@ def _build_cors_origins() -> list[str]:
def _create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
settings = get_settings()
# root_path is derived from AUTOMATION_BASE_URL path component.
# e.g., https://app.all-hands.dev/api/automation -> /api/automation
return FastAPI(
title="OpenHands Automations Service",
description=(
"Scheduled and event-driven automation execution for OpenHands Cloud"
),
version="0.1.0",
description="Scheduled and event-driven automation execution using Temporal",
version="0.2.0",
lifespan=lifespan,
root_path=settings.root_path,
)
Expand All @@ -158,32 +145,45 @@ def _create_app() -> FastAPI:
allow_headers=["*"],
)

# Include uploads_router and preset_router BEFORE router to avoid route conflict.
# The main router has /v1/{automation_id} which would match /v1/uploads
# or /v1/preset/prompt and fail UUID validation if included first.
# Include routers (order matters - more specific routes first)
app.include_router(uploads_router)
app.include_router(preset_router)
app.include_router(router)


@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "ok"}


@app.get("/ready")
async def readiness():
"""Readiness probe — checks DB connectivity.
"""Readiness probe — checks DB and Temporal connectivity."""
errors = []

Returns 503 when the DB is unreachable so Kubernetes stops routing traffic.
"""
# Check database
try:
async with app.state.engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return {"status": "ready"}
except Exception as e:
logger.error("Readiness check failed: %s", e, exc_info=True)
logger.error("Database check failed: %s", e)
errors.append("database unavailable")

# Check Temporal
try:
client = app.state.temporal_client
# Simple connectivity check - list workflows with limit 1
async for _ in client.list_workflows(query="", page_size=1):
break
except Exception as e:
logger.error("Temporal check failed: %s", e)
errors.append("temporal unavailable")

if errors:
return JSONResponse(
status_code=503,
content={"status": "not_ready", "error": "database unavailable"},
content={"status": "not_ready", "errors": errors},
)

return {"status": "ready"}
28 changes: 20 additions & 8 deletions automation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@ class Settings(BaseSettings):
# OpenHands SaaS API
openhands_api_base_url: str = "https://app.all-hands.dev"

# Scheduler (polls automations table for due cron jobs)
scheduler_interval_seconds: int = 60

# Dispatcher (polls automation_runs table for pending jobs)
dispatcher_interval_seconds: int = 10

# Watchdog (scans for stale RUNNING runs past their timeout)
watchdog_interval_seconds: int = 60
# Temporal configuration
temporal_host: str = "localhost"
temporal_port: int = 7233
temporal_namespace: str = "default"
temporal_task_queue: str = "automations"
# For Temporal Cloud: set to True and provide TLS cert/key paths
temporal_tls_enabled: bool = False
temporal_tls_cert_path: str | None = None
temporal_tls_key_path: str | None = None
# Skip starting an in-process worker (use when running separate worker pods)
# This avoids conflicts between ddtrace and Temporal's workflow sandbox
skip_worker: bool = False
# Fast-fail mode: disable retries for faster test feedback
# When True, all activity retry policies use maximum_attempts=1
fast_fail: bool = False

# Service key for authenticating with the SaaS API to fetch per-user
# API keys (called by the dispatcher before each automation run).
Expand All @@ -56,6 +63,11 @@ class Settings(BaseSettings):

model_config = {"env_prefix": "AUTOMATION_"}

@property
def temporal_address(self) -> str:
"""Full Temporal server address."""
return f"{self.temporal_host}:{self.temporal_port}"

@property
def resolved_base_url(self) -> str:
"""Public base URL with localhost fallback for dev."""
Expand Down
Loading
Loading