Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions automation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ async def lifespan(app: FastAPI):
app.state.engine = engine_result.engine
app.state.session_factory = create_session_factory(engine_result.engine)

# For SQLite: auto-create tables (SQLite is intended for simple local
# deployments where running Alembic migrations is not practical since the
# existing migration scripts use PostgreSQL-specific DDL).
if settings.is_sqlite:
from automation.models import Base

async with engine_result.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("SQLite tables created/verified")

# Start the background scheduler and dispatcher
shutdown_event = asyncio.Event()
app.state.shutdown_event = shutdown_event
Expand Down
16 changes: 16 additions & 0 deletions automation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ class ServiceSettings(BaseSettings):

Environment variables (AUTOMATION_ prefix):
# Database
AUTOMATION_DB_URL: Full database URL (optional, overrides host/port/name/user/pass).
Supports PostgreSQL (postgresql+asyncpg://...) and SQLite (sqlite+aiosqlite:///...).
When set, all other DB_* settings (except pool settings) are ignored.
Examples:
- sqlite+aiosqlite:///./automations.db (relative path)
- sqlite+aiosqlite:////tmp/automations.db (absolute path)
- postgresql+asyncpg://user:pass@host:5432/dbname

AUTOMATION_DB_HOST: Database host (default: localhost)
AUTOMATION_DB_PORT: Database port (default: 5432)
AUTOMATION_DB_NAME: Database name (default: automations)
Expand Down Expand Up @@ -260,6 +268,9 @@ class ServiceSettings(BaseSettings):
"""

# Database
# Full URL takes precedence over individual host/port/name/user/pass settings.
# Supports PostgreSQL and SQLite (e.g. sqlite+aiosqlite:///./automations.db).
db_url: str | None = None
db_host: str = "localhost"
db_port: int = 5432
db_name: str = "automations"
Expand All @@ -274,6 +285,11 @@ class ServiceSettings(BaseSettings):
gcp_project: str | None = None
gcp_region: str | None = None

@property
def is_sqlite(self) -> bool:
"""True when the database backend is SQLite."""
return self.db_url is not None and self.db_url.startswith("sqlite")

# OpenHands SaaS API
openhands_api_base_url: str = "https://app.all-hands.dev"

Expand Down
72 changes: 61 additions & 11 deletions automation/db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
"""Database engine and session management.

Follows the same patterns as OpenHands enterprise:
- asyncpg for PostgreSQL
- GCP Cloud SQL connector for production
Supports two backends:
- **PostgreSQL** (asyncpg) — default for production. Includes GCP Cloud SQL
connector support via ``AUTOMATION_GCP_DB_INSTANCE``.
- **SQLite** (aiosqlite) — lightweight local backend for open-source /
self-hosted deployments. Enabled by setting ``AUTOMATION_DB_URL`` to a
``sqlite+aiosqlite:///`` URL.

Which backend is used is determined by :pyclass:`ServiceSettings`:

1. ``db_url`` starting with ``sqlite`` → SQLite
2. ``gcp_db_instance`` set → GCP Cloud SQL (PostgreSQL)
3. Otherwise → direct PostgreSQL via ``db_host`` / ``db_port`` / …
"""

import logging
Expand All @@ -11,6 +20,7 @@
from typing import Any

from fastapi import Request
from sqlalchemy import event as sa_event
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand Down Expand Up @@ -40,25 +50,63 @@ async def dispose(self) -> None:


async def create_engine(settings: ServiceSettings | None = None) -> EngineResult:
"""Create a new PostgreSQL database engine based on settings.
"""Create a database engine based on settings.

Returns an EngineResult containing the engine and optional GCP connector.
Call result.dispose() on shutdown to properly clean up resources.
"""
if settings is None:
settings = get_config().service

if settings.is_sqlite:
return _create_sqlite_engine(settings)

if settings.gcp_db_instance:
return await _create_gcp_engine(settings)

url = URL.create(
"postgresql+asyncpg",
username=settings.db_user,
password=settings.db_pass,
host=settings.db_host,
port=settings.db_port,
database=settings.db_name,
return _create_pg_engine(settings)


# -- SQLite ----------------------------------------------------------------

def _create_sqlite_engine(settings: ServiceSettings) -> EngineResult:
"""Create an async SQLite engine via aiosqlite.

Enables WAL journal mode and foreign key enforcement on every new
connection so SQLite behaves closer to PostgreSQL for our use case.
"""
engine = create_async_engine(
settings.db_url,
echo=False,
# SQLite does not support pool_size/max_overflow the same way
# but StaticPool is fine for single-process usage
)

@sa_event.listens_for(engine.sync_engine, "connect")
def _set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()

return EngineResult(engine=engine)


# -- PostgreSQL (direct) ---------------------------------------------------

def _create_pg_engine(settings: ServiceSettings) -> EngineResult:
"""Create a direct asyncpg PostgreSQL engine."""
if settings.db_url:
url = settings.db_url
else:
url = URL.create(
"postgresql+asyncpg",
username=settings.db_user,
password=settings.db_pass,
host=settings.db_host,
port=settings.db_port,
database=settings.db_name,
)
engine = create_async_engine(
url,
pool_size=settings.db_pool_size,
Expand All @@ -69,6 +117,8 @@ async def create_engine(settings: ServiceSettings | None = None) -> EngineResult
return EngineResult(engine=engine)


# -- GCP Cloud SQL ---------------------------------------------------------

async def _create_gcp_engine(settings: ServiceSettings) -> EngineResult:
"""Create engine using GCP Cloud SQL connector (async).

Expand Down
13 changes: 11 additions & 2 deletions automation/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

Polls the automation_runs table for PENDING jobs and dispatches them
to sandboxes via the SaaS API. Uses FOR UPDATE SKIP LOCKED for
multi-worker safety.
multi-worker safety (PostgreSQL). On SQLite the row-level lock is
skipped since SQLite deployments are single-process.

Completion is handled asynchronously: the SDK running inside the sandbox
POSTs to ``/v1/runs/{id}/complete`` when the entry-point
Expand Down Expand Up @@ -68,6 +69,11 @@ async def _download_internal_tarball(
return store.read(upload.storage_path)


def _is_sqlite(session: AsyncSession) -> bool:
"""Return True when the bound engine is SQLite."""
return session.bind.dialect.name == "sqlite" if session.bind else False


async def _poll_pending_runs(
session: AsyncSession,
batch_size: int,
Expand All @@ -76,15 +82,18 @@ async def _poll_pending_runs(

Eagerly loads the ``automation`` relationship so that ``user_id``,
``org_id``, and tarball config are available for dispatch.

On SQLite the ``FOR UPDATE SKIP LOCKED`` clause is omitted.
"""
select_query = (
select(AutomationRun)
.options(selectinload(AutomationRun.automation))
.where(AutomationRun.status == AutomationRunStatus.PENDING)
.order_by(AutomationRun.created_at.asc())
.limit(batch_size)
.with_for_update(skip_locked=True)
)
if not _is_sqlite(session):
select_query = select_query.with_for_update(skip_locked=True)
result = await session.execute(select_query)
return list(result.scalars().all())

Expand Down
19 changes: 14 additions & 5 deletions automation/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""SQLAlchemy ORM models for the automations service."""
"""SQLAlchemy ORM models for the automations service.

The models use :class:`sqlalchemy.types.JSON` so they work with both
PostgreSQL (which transparently maps ``JSON`` to its native ``json`` /
``jsonb`` type) and SQLite. PostgreSQL-only constructs such as partial
indexes are guarded behind a dialect check so that SQLite simply skips
them.
"""

import enum
import uuid
Expand All @@ -10,12 +17,12 @@
Enum,
ForeignKey,
Index,
JSON,
String,
Text,
Uuid,
text,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

from automation.utils import utcnow
Expand Down Expand Up @@ -56,7 +63,7 @@ class Automation(Base):
prompt: Mapped[str | None] = mapped_column(Text, nullable=True)

# Trigger config — for MVP, only cron is supported.
trigger: Mapped[dict] = mapped_column(JSONB, nullable=False)
trigger: Mapped[dict] = mapped_column(JSON, nullable=False)

# Path to SDK code tarball (e.g., S3 or GCS URL)
tarball_path: Mapped[str] = mapped_column(Text, nullable=False)
Expand Down Expand Up @@ -151,7 +158,7 @@ class AutomationRun(Base):
# Contains the webhook payload that triggered this run.
# For GitHub events: model_dump() of the parsed Pydantic event
# For custom webhooks: the raw payload dict
event_payload: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
event_payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)

# Timestamps
created_at: Mapped[datetime] = mapped_column(
Expand All @@ -171,7 +178,9 @@ class AutomationRun(Base):

__table_args__ = (
# Partial index for efficient PENDING polling.
# This service uses PostgreSQL exclusively in all environments.
# The postgresql_where clause is a PostgreSQL-specific optimization;
# on SQLite it is silently ignored by SQLAlchemy and a regular
# (non-partial) index is created instead.
Index(
"ix_automation_runs_pending",
"created_at",
Expand Down
13 changes: 12 additions & 1 deletion automation/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
enabled cron automations whose next fire time is due.

Uses FOR UPDATE SKIP LOCKED for multi-worker safety in PostgreSQL.
On SQLite (single-process deployments) the row-level lock is skipped.
"""

import asyncio
Expand All @@ -28,6 +29,11 @@
POLL_INTERVAL_SECONDS = 60


def _is_sqlite(session: AsyncSession) -> bool:
"""Return True when the bound engine is SQLite."""
return session.bind.dialect.name == "sqlite" if session.bind else False


async def _fetch_enabled_automations(
session: AsyncSession,
batch_size: int,
Expand All @@ -41,6 +47,10 @@ async def _fetch_enabled_automations(
The poll_threshold filters out automations that were recently polled,
ensuring fair rotation through all automations when using batching.

On SQLite the ``FOR UPDATE SKIP LOCKED`` clause is omitted because
SQLite only supports database-level locking and does not have
row-level locks. This is safe for single-process deployments.

Args:
session: Database session
batch_size: Maximum number of automations to fetch
Expand All @@ -59,8 +69,9 @@ async def _fetch_enabled_automations(
)
.order_by(Automation.last_polled_at.asc().nulls_first())
.limit(batch_size)
.with_for_update(skip_locked=True)
)
if not _is_sqlite(session):
select_query = select_query.with_for_update(skip_locked=True)

result = await session.execute(select_query)
return list(result.scalars().all())
Expand Down
Loading
Loading