Rewrite automation service to use Temporal#35
Conversation
Replace legacy polling-based execution with Temporal workflows: ## Removed (Legacy Code) - scheduler.py - Cron polling loop (replaced by Temporal Schedules) - dispatcher.py - Pending run polling (replaced by Temporal Workflows) - watchdog.py - Stale run detection (replaced by Temporal timeouts) - execution.py - Sandbox lifecycle (moved to Temporal Activities) - utils/run.py, utils/cron.py, utils/api_key.py - Legacy utilities ## Added (Temporal Implementation) - temporal/types.py - Data classes for workflow inputs/outputs - temporal/activities.py - 6 activities for sandbox operations - temporal/workflows.py - AutomationWorkflow with crash-proof execution - temporal/schedules.py - Temporal Schedule management for cron jobs - temporal/client.py - Temporal client factory - temporal/worker.py - Worker setup ## Updated - app.py - FastAPI app now runs Temporal worker as background task - router.py - API routes use Temporal for workflow execution - config.py - Added Temporal connection settings ## Tests Added - tests/temporal/test_types.py - Data class tests - tests/temporal/test_schedules.py - Schedule CRUD tests - tests/temporal/test_activities.py - Activity tests with ActivityEnvironment - tests/temporal/test_workflows.py - Workflow tests (skipped by default) Co-authored-by: openhands <openhands@all-hands.dev>
|
🚀 Deploy Preview PR Created/Updated A deploy preview has been created/updated for this PR. Deploy PR: https://github.com/OpenHands/deploy/pull/3663 Once the deploy PR's CI passes, the automation service will be deployed to the feature environment. |
- Add mock_temporal_client fixture to conftest.py - Override get_client dependency for async_client tests - Add list_workflows mock for readiness check - Update TestAuthIntegration tests to use mock_temporal_client - Fix line length issues flagged by pre-commit Co-authored-by: openhands <openhands@all-hands.dev>
- Mock response should return {items: [{exit_code, stdout, stderr}]}
- Mock asyncio.sleep to avoid test timeout
- These tests were timing out because they were waiting for exit_code
Co-authored-by: openhands <openhands@all-hands.dev>
- test_ready_endpoint_db_unavailable: Check 'errors' array instead of 'error' - dispatch tests: Expect RUNNING status instead of PENDING - dispatch tests: Expect started_at to be set (workflow starts immediately) - Replace test_dispatch_updates_last_triggered_at with test_dispatch_creates_running_run - Fix execute_entrypoint tests with proper response structure and mock asyncio.sleep Co-authored-by: openhands <openhands@all-hands.dev>
…ctions The Temporal workflow sandbox restricts imports like httpx and urllib.request. The workflow was importing tarball_validation.py which transitively imported fastapi, sqlalchemy, and httpx through its dependency chain. This fix: 1. Creates tarball_url.py with pure URL parsing functions (no heavy deps) 2. Updates workflows.py to import from the lightweight module 3. Updates tarball_validation.py to re-export from tarball_url.py This allows the workflow code to use is_http_url() and parse_internal_upload_id() without triggering sandbox restriction errors. Co-authored-by: openhands <openhands@all-hands.dev>
When running with ddtrace-run, the import instrumentation conflicts with Temporal's workflow sandbox, causing circular import errors with beartype. This adds AUTOMATION_SKIP_WORKER config option that: - When true, the API server skips creating an in-process Temporal worker - Allows the API to run with ddtrace for HTTP tracing - The separate worker deployment handles workflow execution without ddtrace Co-authored-by: openhands <openhands@all-hands.dev>
The temporal package __init__.py was importing activities, workflows, and worker modules at package level. When Temporal's workflow sandbox loads the workflows module, it first loads the package's __init__.py, which triggered httpx import inside the sandbox (via activities.py). This fix removes those imports from __init__.py. All code already uses direct module imports (e.g., from automation.temporal.activities import ...) so this doesn't break any existing code. Co-authored-by: openhands <openhands@all-hands.dev>
The schedules module imports automation.models which uses SQLAlchemy,
causing sandbox validation to fail with:
TypeError: __annotations__ must be set to a dict object
Also added scripts/test_sandbox.py to verify sandbox validation locally
before deploying.
Co-authored-by: openhands <openhands@all-hands.dev>
When AUTOMATION_FAST_FAIL=true, all Temporal activity retry policies use maximum_attempts=1 instead of the default 3-5 attempts with backoff. This prevents long test durations when workflows fail - instead of waiting minutes for retries to exhaust, tests fail immediately. Production (default): Full retry policies for resilience - API key: 5 attempts, up to 60s backoff - Sandbox: 3 attempts, up to 2min backoff - Tarball: 3 attempts, up to 60s backoff - Cleanup: 3 attempts, up to 30s backoff Fast-fail mode: No retries - All policies: 1 attempt only Co-authored-by: openhands <openhands@all-hands.dev>
|
|
||
|
|
||
| @workflow.defn | ||
| class AutomationWorkflow: |
There was a problem hiding this comment.
Maybe call it AutomationExecutionWorkflow?
I'd imagine we will have some other workflows
jlav
left a comment
There was a problem hiding this comment.
Strongly recommend adding a docker compose setup that has the temporal dev server set up (and any other dependencies like postgres) for ease of local development: https://learn.temporal.io/getting_started/python/dev_environment/
Having the temporal UI available locally for debugging is super useful.
| logger.info("Downloading internal tarball: upload_id=%s", input.upload_id) | ||
|
|
||
| settings = get_settings() | ||
| engine_result = await create_engine(settings) |
There was a problem hiding this comment.
You can share a database connection pool across all your activities. Creating an engine here creates a new pool with new connections every time.
|
|
||
| try: | ||
| # 1. Get per-user API key | ||
| api_key = await workflow.execute_activity( |
There was a problem hiding this comment.
Activity output is stored in plain text in the temporal database, and is visible in the Temporal UI by default. Not a great place to store credentials.
Also, only return information from an activity that is guaranteed to be static until the end of the workflow execution. If these API keys are short lived and the workflow becomes long-lived, it's bound to fail permanently once the API Key expires.
Depending on the lifetime here, sometimes it's better to just re-retrieve info inside of individual activities. If it's long lived, you can look into data converters to encrypt the info: https://docs.temporal.io/default-custom-data-converters#default-data-converter
| if upload_id is None: | ||
| raise ValueError(f"Invalid tarball_path: {tarball_path}") | ||
|
|
||
| tarball_data = await workflow.execute_activity( |
There was a problem hiding this comment.
The largest factor in workflow throughput is the size of the activity payloads. These tarballs are going to be stored directly workflow history, inside of postgres. Postgres isn't a good place to store large binary data.
Factor this out to store tarballs somewhere externally and have the activity retrieve the contents directly.
EXTERNAL_MAX_FILESIZE is 100MB right now. The max per-event payload size defaults to 4MB in temporal, and the max history is 50MB with warnings emitted at 10MB. Increasing those defaults cripples performance, and right now any file larger than ~4MB would just fail altogether.
There was a problem hiding this comment.
Strongly recommend just biting the bullet and adding replay tests as early as possible. They'll catch non-determinism errors that you might introduce by modifying the workflow after it's already been deployed. When you make code changes to a workflow, if there are any active workflows when you deploy the change they are likely to fail if they aren't done in a backwards compatible way.
You can read more here on replay tests: https://docs.temporal.io/develop/python/testing-suite#replay
Summary
This PR rewrites the automation service to use Temporal for durable workflow execution, replacing the legacy polling-based scheduler/dispatcher/watchdog system.
Key Changes
Removed (Legacy Code)
scheduler.py- Cron polling loop → replaced by Temporal Schedulesdispatcher.py- Pending run polling → replaced by Temporal Workflowswatchdog.py- Stale run detection → replaced by Temporal timeoutsexecution.py- Sandbox lifecycle → moved to Temporal Activitiesutils/run.py,utils/cron.py,utils/api_key.py- Legacy utilitiesAdded (Temporal Implementation)
temporal/types.pytemporal/activities.pytemporal/workflows.pytemporal/schedules.pytemporal/client.pytemporal/worker.pyUpdated
app.py- FastAPI app now runs Temporal worker as background taskrouter.py- API routes use Temporal for workflow executionconfig.py- Added Temporal connection settingsBenefits of Temporal
Tests Added
test_types.pytest_schedules.pytest_activities.pytest_workflows.pyRunning Tests
Deployment Notes
Requires Temporal server. For Kubernetes deployment, Temporal can be deployed as pods. Configuration via environment variables:
This PR was created by an AI assistant (OpenHands) on behalf of the user.