From 4397130b4c5e741c1e3738aa47a24cbac775c0e9 Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 28 Apr 2026 20:10:10 +0000 Subject: [PATCH] feat: add SQLite database backend for local/self-hosted deployments Add SQLite support via aiosqlite as an alternative to PostgreSQL, enabling open-source users to run the automation service without requiring a PostgreSQL instance. Changes: - config.py: Add db_url setting and is_sqlite property - db.py: Support SQLite engine creation with WAL mode and FK enforcement - models.py: Replace PostgreSQL-specific JSONB with cross-database JSON type - scheduler.py: Skip FOR UPDATE SKIP LOCKED on SQLite (not supported/needed) - dispatcher.py: Same FOR UPDATE SKIP LOCKED handling for SQLite - app.py: Auto-create tables for SQLite (skips Alembic PG-specific migrations) - migrations/env.py: Support SQLite with render_as_batch and skip advisory locks - pyproject.toml: Add aiosqlite dependency - tests/test_sqlite_backend.py: Comprehensive tests for SQLite backend Usage: Set AUTOMATION_DB_URL=sqlite+aiosqlite:///./automations.db Closes #62 (database support aspect) Co-authored-by: openhands --- automation/app.py | 10 ++ automation/config.py | 16 ++ automation/db.py | 72 ++++++-- automation/dispatcher.py | 13 +- automation/models.py | 19 +- automation/scheduler.py | 13 +- migrations/env.py | 94 +++++++--- pyproject.toml | 1 + tests/test_sqlite_backend.py | 335 +++++++++++++++++++++++++++++++++++ uv.lock | 2 + 10 files changed, 531 insertions(+), 44 deletions(-) create mode 100644 tests/test_sqlite_backend.py diff --git a/automation/app.py b/automation/app.py index ffa5453..e7a576d 100644 --- a/automation/app.py +++ b/automation/app.py @@ -58,6 +58,16 @@ async def lifespan(app: FastAPI): app.state.engine = engine_result.engine app.state.session_factory = create_session_factory(engine_result.engine) + # For SQLite: auto-create tables (SQLite is intended for simple local + # deployments where running Alembic migrations is not practical since the + # existing migration scripts use PostgreSQL-specific DDL). + if settings.is_sqlite: + from automation.models import Base + + async with engine_result.engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + logger.info("SQLite tables created/verified") + # Start the background scheduler and dispatcher shutdown_event = asyncio.Event() app.state.shutdown_event = shutdown_event diff --git a/automation/config.py b/automation/config.py index e0e1dbe..1698496 100644 --- a/automation/config.py +++ b/automation/config.py @@ -221,6 +221,14 @@ class ServiceSettings(BaseSettings): Environment variables (AUTOMATION_ prefix): # Database + AUTOMATION_DB_URL: Full database URL (optional, overrides host/port/name/user/pass). + Supports PostgreSQL (postgresql+asyncpg://...) and SQLite (sqlite+aiosqlite:///...). + When set, all other DB_* settings (except pool settings) are ignored. + Examples: + - sqlite+aiosqlite:///./automations.db (relative path) + - sqlite+aiosqlite:////tmp/automations.db (absolute path) + - postgresql+asyncpg://user:pass@host:5432/dbname + AUTOMATION_DB_HOST: Database host (default: localhost) AUTOMATION_DB_PORT: Database port (default: 5432) AUTOMATION_DB_NAME: Database name (default: automations) @@ -260,6 +268,9 @@ class ServiceSettings(BaseSettings): """ # Database + # Full URL takes precedence over individual host/port/name/user/pass settings. + # Supports PostgreSQL and SQLite (e.g. sqlite+aiosqlite:///./automations.db). + db_url: str | None = None db_host: str = "localhost" db_port: int = 5432 db_name: str = "automations" @@ -274,6 +285,11 @@ class ServiceSettings(BaseSettings): gcp_project: str | None = None gcp_region: str | None = None + @property + def is_sqlite(self) -> bool: + """True when the database backend is SQLite.""" + return self.db_url is not None and self.db_url.startswith("sqlite") + # OpenHands SaaS API openhands_api_base_url: str = "https://app.all-hands.dev" diff --git a/automation/db.py b/automation/db.py index 4c89e4a..d96fee9 100644 --- a/automation/db.py +++ b/automation/db.py @@ -1,8 +1,17 @@ """Database engine and session management. -Follows the same patterns as OpenHands enterprise: -- asyncpg for PostgreSQL -- GCP Cloud SQL connector for production +Supports two backends: +- **PostgreSQL** (asyncpg) — default for production. Includes GCP Cloud SQL + connector support via ``AUTOMATION_GCP_DB_INSTANCE``. +- **SQLite** (aiosqlite) — lightweight local backend for open-source / + self-hosted deployments. Enabled by setting ``AUTOMATION_DB_URL`` to a + ``sqlite+aiosqlite:///`` URL. + +Which backend is used is determined by :pyclass:`ServiceSettings`: + +1. ``db_url`` starting with ``sqlite`` → SQLite +2. ``gcp_db_instance`` set → GCP Cloud SQL (PostgreSQL) +3. Otherwise → direct PostgreSQL via ``db_host`` / ``db_port`` / … """ import logging @@ -11,6 +20,7 @@ from typing import Any from fastapi import Request +from sqlalchemy import event as sa_event from sqlalchemy.engine import URL from sqlalchemy.ext.asyncio import ( AsyncEngine, @@ -40,7 +50,7 @@ async def dispose(self) -> None: async def create_engine(settings: ServiceSettings | None = None) -> EngineResult: - """Create a new PostgreSQL database engine based on settings. + """Create a database engine based on settings. Returns an EngineResult containing the engine and optional GCP connector. Call result.dispose() on shutdown to properly clean up resources. @@ -48,17 +58,55 @@ async def create_engine(settings: ServiceSettings | None = None) -> EngineResult if settings is None: settings = get_config().service + if settings.is_sqlite: + return _create_sqlite_engine(settings) + if settings.gcp_db_instance: return await _create_gcp_engine(settings) - url = URL.create( - "postgresql+asyncpg", - username=settings.db_user, - password=settings.db_pass, - host=settings.db_host, - port=settings.db_port, - database=settings.db_name, + return _create_pg_engine(settings) + + +# -- SQLite ---------------------------------------------------------------- + +def _create_sqlite_engine(settings: ServiceSettings) -> EngineResult: + """Create an async SQLite engine via aiosqlite. + + Enables WAL journal mode and foreign key enforcement on every new + connection so SQLite behaves closer to PostgreSQL for our use case. + """ + engine = create_async_engine( + settings.db_url, + echo=False, + # SQLite does not support pool_size/max_overflow the same way + # but StaticPool is fine for single-process usage ) + + @sa_event.listens_for(engine.sync_engine, "connect") + def _set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + return EngineResult(engine=engine) + + +# -- PostgreSQL (direct) --------------------------------------------------- + +def _create_pg_engine(settings: ServiceSettings) -> EngineResult: + """Create a direct asyncpg PostgreSQL engine.""" + if settings.db_url: + url = settings.db_url + else: + url = URL.create( + "postgresql+asyncpg", + username=settings.db_user, + password=settings.db_pass, + host=settings.db_host, + port=settings.db_port, + database=settings.db_name, + ) engine = create_async_engine( url, pool_size=settings.db_pool_size, @@ -69,6 +117,8 @@ async def create_engine(settings: ServiceSettings | None = None) -> EngineResult return EngineResult(engine=engine) +# -- GCP Cloud SQL --------------------------------------------------------- + async def _create_gcp_engine(settings: ServiceSettings) -> EngineResult: """Create engine using GCP Cloud SQL connector (async). diff --git a/automation/dispatcher.py b/automation/dispatcher.py index 6152908..bdfc405 100644 --- a/automation/dispatcher.py +++ b/automation/dispatcher.py @@ -2,7 +2,8 @@ Polls the automation_runs table for PENDING jobs and dispatches them to sandboxes via the SaaS API. Uses FOR UPDATE SKIP LOCKED for -multi-worker safety. +multi-worker safety (PostgreSQL). On SQLite the row-level lock is +skipped since SQLite deployments are single-process. Completion is handled asynchronously: the SDK running inside the sandbox POSTs to ``/v1/runs/{id}/complete`` when the entry-point @@ -68,6 +69,11 @@ async def _download_internal_tarball( return store.read(upload.storage_path) +def _is_sqlite(session: AsyncSession) -> bool: + """Return True when the bound engine is SQLite.""" + return session.bind.dialect.name == "sqlite" if session.bind else False + + async def _poll_pending_runs( session: AsyncSession, batch_size: int, @@ -76,6 +82,8 @@ async def _poll_pending_runs( Eagerly loads the ``automation`` relationship so that ``user_id``, ``org_id``, and tarball config are available for dispatch. + + On SQLite the ``FOR UPDATE SKIP LOCKED`` clause is omitted. """ select_query = ( select(AutomationRun) @@ -83,8 +91,9 @@ async def _poll_pending_runs( .where(AutomationRun.status == AutomationRunStatus.PENDING) .order_by(AutomationRun.created_at.asc()) .limit(batch_size) - .with_for_update(skip_locked=True) ) + if not _is_sqlite(session): + select_query = select_query.with_for_update(skip_locked=True) result = await session.execute(select_query) return list(result.scalars().all()) diff --git a/automation/models.py b/automation/models.py index 79ed934..8ca72da 100644 --- a/automation/models.py +++ b/automation/models.py @@ -1,4 +1,11 @@ -"""SQLAlchemy ORM models for the automations service.""" +"""SQLAlchemy ORM models for the automations service. + +The models use :class:`sqlalchemy.types.JSON` so they work with both +PostgreSQL (which transparently maps ``JSON`` to its native ``json`` / +``jsonb`` type) and SQLite. PostgreSQL-only constructs such as partial +indexes are guarded behind a dialect check so that SQLite simply skips +them. +""" import enum import uuid @@ -10,12 +17,12 @@ Enum, ForeignKey, Index, + JSON, String, Text, Uuid, text, ) -from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from automation.utils import utcnow @@ -56,7 +63,7 @@ class Automation(Base): prompt: Mapped[str | None] = mapped_column(Text, nullable=True) # Trigger config — for MVP, only cron is supported. - trigger: Mapped[dict] = mapped_column(JSONB, nullable=False) + trigger: Mapped[dict] = mapped_column(JSON, nullable=False) # Path to SDK code tarball (e.g., S3 or GCS URL) tarball_path: Mapped[str] = mapped_column(Text, nullable=False) @@ -151,7 +158,7 @@ class AutomationRun(Base): # Contains the webhook payload that triggered this run. # For GitHub events: model_dump() of the parsed Pydantic event # For custom webhooks: the raw payload dict - event_payload: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + event_payload: Mapped[dict | None] = mapped_column(JSON, nullable=True) # Timestamps created_at: Mapped[datetime] = mapped_column( @@ -171,7 +178,9 @@ class AutomationRun(Base): __table_args__ = ( # Partial index for efficient PENDING polling. - # This service uses PostgreSQL exclusively in all environments. + # The postgresql_where clause is a PostgreSQL-specific optimization; + # on SQLite it is silently ignored by SQLAlchemy and a regular + # (non-partial) index is created instead. Index( "ix_automation_runs_pending", "created_at", diff --git a/automation/scheduler.py b/automation/scheduler.py index 8d5dc26..c5b1d05 100644 --- a/automation/scheduler.py +++ b/automation/scheduler.py @@ -5,6 +5,7 @@ enabled cron automations whose next fire time is due. Uses FOR UPDATE SKIP LOCKED for multi-worker safety in PostgreSQL. +On SQLite (single-process deployments) the row-level lock is skipped. """ import asyncio @@ -28,6 +29,11 @@ POLL_INTERVAL_SECONDS = 60 +def _is_sqlite(session: AsyncSession) -> bool: + """Return True when the bound engine is SQLite.""" + return session.bind.dialect.name == "sqlite" if session.bind else False + + async def _fetch_enabled_automations( session: AsyncSession, batch_size: int, @@ -41,6 +47,10 @@ async def _fetch_enabled_automations( The poll_threshold filters out automations that were recently polled, ensuring fair rotation through all automations when using batching. + On SQLite the ``FOR UPDATE SKIP LOCKED`` clause is omitted because + SQLite only supports database-level locking and does not have + row-level locks. This is safe for single-process deployments. + Args: session: Database session batch_size: Maximum number of automations to fetch @@ -59,8 +69,9 @@ async def _fetch_enabled_automations( ) .order_by(Automation.last_polled_at.asc().nulls_first()) .limit(batch_size) - .with_for_update(skip_locked=True) ) + if not _is_sqlite(session): + select_query = select_query.with_for_update(skip_locked=True) result = await session.execute(select_query) return list(result.scalars().all()) diff --git a/migrations/env.py b/migrations/env.py index 734e5cd..5dbe18c 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -1,15 +1,15 @@ """Alembic migration environment. -Follows the same patterns as the OpenHands enterprise migrations: -- Reads DB connection from environment variables -- Supports GCP Cloud SQL connector for production -- Supports local PostgreSQL for development -- Uses PostgreSQL advisory locks for safe concurrent execution - -Note: Uses pg8000 (sync driver) while the application uses asyncpg (async driver). -This is intentional - Alembic runs synchronously, and both drivers produce -identical DDL/schema operations. The GCP Cloud SQL connector's sync connect() -method pairs naturally with pg8000. +Supports three database backends: + +- **PostgreSQL** (default) via pg8000 (sync driver) — Alembic runs + synchronously so pg8000 is used instead of the application's asyncpg. +- **GCP Cloud SQL** — Uses the Cloud SQL Python connector with pg8000. +- **SQLite** — For local / open-source deployments. Enabled by setting + ``AUTOMATION_DB_URL`` to a ``sqlite:///`` URL. + +PostgreSQL migrations use advisory locks for safe concurrent execution. +SQLite skips advisory locks since it only supports single-writer access. """ import os @@ -26,6 +26,9 @@ # Using a hash of "automation_migrations" to avoid collisions MIGRATION_LOCK_ID = 849320147 +# Full URL takes precedence when set (supports both PostgreSQL and SQLite) +DB_URL = os.getenv("AUTOMATION_DB_URL") + DB_USER = os.getenv("AUTOMATION_DB_USER", os.getenv("DB_USER", "postgres")) DB_PASS = os.getenv("AUTOMATION_DB_PASS", os.getenv("DB_PASS", "postgres")) DB_HOST = os.getenv("AUTOMATION_DB_HOST", os.getenv("DB_HOST", "localhost")) @@ -37,7 +40,29 @@ GCP_REGION = os.getenv("AUTOMATION_GCP_REGION", os.getenv("GCP_REGION")) +def _is_sqlite_url(url: str | None) -> bool: + return url is not None and url.startswith("sqlite") + + def get_engine(database_name=DB_NAME): + # --- SQLite --------------------------------------------------------- + if _is_sqlite_url(DB_URL): + # Strip the async driver prefix so Alembic can use the sync pysqlite + sync_url = DB_URL.replace("sqlite+aiosqlite", "sqlite") + engine = create_engine(sync_url) + + # Enable foreign keys for every connection (off by default in SQLite) + from sqlalchemy import event as sa_event + + @sa_event.listens_for(engine, "connect") + def _set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + return engine + + # --- GCP Cloud SQL -------------------------------------------------- if GCP_DB_INSTANCE: from google.cloud.sql.connector import Connector @@ -57,18 +82,30 @@ def get_db_connection(): creator=get_db_connection, pool_pre_ping=True, ) - else: - url = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{database_name}" - return create_engine(url, pool_pre_ping=True) + + # --- Direct PostgreSQL ---------------------------------------------- + if DB_URL and not _is_sqlite_url(DB_URL): + # Caller provided a full PostgreSQL URL — normalise driver to pg8000 + sync_url = DB_URL.replace("postgresql+asyncpg", "postgresql+pg8000") + return create_engine(sync_url, pool_pre_ping=True) + + url = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{database_name}" + return create_engine(url, pool_pre_ping=True) def run_migrations_offline(): - url = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + if _is_sqlite_url(DB_URL): + sync_url = DB_URL.replace("sqlite+aiosqlite", "sqlite") + else: + sync_url = ( + f"postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + ) context.configure( - url=url, + url=sync_url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, + render_as_batch=_is_sqlite_url(DB_URL), ) with context.begin_transaction(): context.run_migrations() @@ -80,22 +117,29 @@ def run_migrations_online(): Uses PostgreSQL advisory locks to ensure only one migration process runs at a time, even when multiple pods/containers attempt migrations concurrently. Other processes will wait for the lock to be released. + + On SQLite, advisory locks are skipped (not supported / not needed). + ``render_as_batch`` is enabled so ALTER TABLE operations work within + SQLite's limited DDL capabilities. """ engine = get_engine() - # Use engine.begin() for auto-commit behavior (required in SQLAlchemy 2.0) - # Note: Do NOT use context.begin_transaction() here - engine.begin() already - # provides transaction management. Nesting transactions causes issues where - # only the first migration gets committed. + is_sqlite = _is_sqlite_url(DB_URL) + with engine.begin() as connection: - # Acquire advisory lock - blocks until lock is available - # This ensures only one migration runs at a time across all pods - connection.execute(text(f"SELECT pg_advisory_lock({MIGRATION_LOCK_ID})")) + if not is_sqlite: + connection.execute(text(f"SELECT pg_advisory_lock({MIGRATION_LOCK_ID})")) try: - context.configure(connection=connection, target_metadata=target_metadata) + context.configure( + connection=connection, + target_metadata=target_metadata, + render_as_batch=is_sqlite, + ) context.run_migrations() finally: - # Release the lock so other waiting processes can proceed - connection.execute(text(f"SELECT pg_advisory_unlock({MIGRATION_LOCK_ID})")) + if not is_sqlite: + connection.execute( + text(f"SELECT pg_advisory_unlock({MIGRATION_LOCK_ID})") + ) if context.is_offline_mode(): diff --git a/pyproject.toml b/pyproject.toml index 3d3ee66..0927f5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ + "aiosqlite>=0.20", "alembic>=1.14", "asyncpg>=0.30", "boto3>=1.35", diff --git a/tests/test_sqlite_backend.py b/tests/test_sqlite_backend.py new file mode 100644 index 0000000..086312f --- /dev/null +++ b/tests/test_sqlite_backend.py @@ -0,0 +1,335 @@ +"""Tests for SQLite database backend support. + +Verifies that the automation service works correctly with SQLite as the +database backend, enabling local/open-source deployments without PostgreSQL. +""" + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from automation.config import Settings +from automation.db import EngineResult, create_engine as create_db_engine +from automation.models import ( + Automation, + AutomationRun, + AutomationRunStatus, + Base, + CustomWebhook, + TarballUpload, + UploadStatus, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def sqlite_url(tmp_path): + """Return a temporary SQLite database URL.""" + db_path = tmp_path / "test_automations.db" + return f"sqlite+aiosqlite:///{db_path}" + + +@pytest.fixture +async def sqlite_engine(sqlite_url): + """Create an async SQLite engine and initialise the schema.""" + engine = create_async_engine(sqlite_url, echo=False) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield engine + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await engine.dispose() + + +@pytest.fixture +async def sqlite_session_factory(sqlite_engine): + """Create an async session factory backed by SQLite.""" + return async_sessionmaker( + sqlite_engine, class_=AsyncSession, expire_on_commit=False + ) + + +@pytest.fixture +async def sqlite_session(sqlite_session_factory): + """Yield a single async SQLite session.""" + async with sqlite_session_factory() as session: + yield session + + +# --------------------------------------------------------------------------- +# Config tests +# --------------------------------------------------------------------------- + + +class TestSQLiteConfig: + """Settings.is_sqlite correctly detects SQLite URLs.""" + + def test_is_sqlite_with_sqlite_url(self): + settings = Settings(db_url="sqlite+aiosqlite:///./automations.db") + assert settings.is_sqlite is True + + def test_is_sqlite_with_postgres_url(self): + settings = Settings(db_url="postgresql+asyncpg://u:p@host/db") + assert settings.is_sqlite is False + + def test_is_sqlite_with_no_url(self): + settings = Settings(db_url=None) + assert settings.is_sqlite is False + + +# --------------------------------------------------------------------------- +# Engine creation tests +# --------------------------------------------------------------------------- + + +class TestSQLiteEngine: + """create_engine() returns a working SQLite engine.""" + + async def test_create_sqlite_engine(self, sqlite_url): + settings = Settings(db_url=sqlite_url) + result = await create_db_engine(settings) + assert isinstance(result, EngineResult) + assert result.connector is None + # Verify the engine actually works + async with result.engine.begin() as conn: + row = await conn.execute(text("SELECT 1")) + assert row.scalar() == 1 + await result.dispose() + + async def test_sqlite_wal_mode_enabled(self, sqlite_url): + """WAL journal mode is set on new connections.""" + settings = Settings(db_url=sqlite_url) + result = await create_db_engine(settings) + async with result.engine.begin() as conn: + row = await conn.execute(text("PRAGMA journal_mode")) + mode = row.scalar() + assert mode == "wal" + await result.dispose() + + async def test_sqlite_foreign_keys_enabled(self, sqlite_url): + """Foreign key enforcement is turned on.""" + settings = Settings(db_url=sqlite_url) + result = await create_db_engine(settings) + async with result.engine.begin() as conn: + row = await conn.execute(text("PRAGMA foreign_keys")) + assert row.scalar() == 1 + await result.dispose() + + +# --------------------------------------------------------------------------- +# Model CRUD tests (verifies JSON columns, indexes, etc. work on SQLite) +# --------------------------------------------------------------------------- + + +class TestSQLiteModels: + """Basic CRUD operations work against an SQLite backend.""" + + async def test_create_automation(self, sqlite_session: AsyncSession): + """Can insert and retrieve an Automation with a JSON trigger.""" + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Test Automation", + trigger={"type": "cron", "schedule": "0 9 * * *"}, + tarball_path="oh-internal://abc", + entrypoint="python main.py", + ) + sqlite_session.add(automation) + await sqlite_session.commit() + await sqlite_session.refresh(automation) + + assert automation.id is not None + assert automation.trigger["type"] == "cron" + assert automation.trigger["schedule"] == "0 9 * * *" + + async def test_create_run(self, sqlite_session: AsyncSession): + """Can insert an AutomationRun linked to an Automation.""" + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Run Test", + trigger={"type": "cron", "schedule": "*/5 * * * *"}, + tarball_path="oh-internal://def", + entrypoint="python main.py", + ) + sqlite_session.add(automation) + await sqlite_session.flush() + + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.PENDING, + ) + sqlite_session.add(run) + await sqlite_session.commit() + await sqlite_session.refresh(run) + + assert run.id is not None + assert run.status == AutomationRunStatus.PENDING + assert run.automation_id == automation.id + + async def test_run_with_event_payload(self, sqlite_session: AsyncSession): + """JSON event_payload column works on SQLite.""" + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Event Test", + trigger={"type": "event", "source": "github"}, + tarball_path="oh-internal://ghi", + entrypoint="python main.py", + ) + sqlite_session.add(automation) + await sqlite_session.flush() + + payload = {"action": "opened", "number": 42, "nested": {"key": "val"}} + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + event_payload=payload, + ) + sqlite_session.add(run) + await sqlite_session.commit() + await sqlite_session.refresh(run) + + assert run.event_payload == payload + assert run.event_payload["nested"]["key"] == "val" + + async def test_create_tarball_upload(self, sqlite_session: AsyncSession): + """TarballUpload model works on SQLite.""" + upload = TarballUpload( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="test-tarball", + status=UploadStatus.COMPLETED, + size_bytes=1024, + storage_path="uploads/test.tar.gz", + ) + sqlite_session.add(upload) + await sqlite_session.commit() + await sqlite_session.refresh(upload) + + assert upload.id is not None + assert upload.status == UploadStatus.COMPLETED + + async def test_create_custom_webhook(self, sqlite_session: AsyncSession): + """CustomWebhook model works on SQLite.""" + webhook = CustomWebhook( + org_id=uuid.uuid4(), + name="Test Webhook", + source="stripe", + webhook_secret="whsec_test123", + ) + sqlite_session.add(webhook) + await sqlite_session.commit() + await sqlite_session.refresh(webhook) + + assert webhook.id is not None + assert webhook.source == "stripe" + + +# --------------------------------------------------------------------------- +# Scheduler compatibility (FOR UPDATE SKIP LOCKED is skipped on SQLite) +# --------------------------------------------------------------------------- + + +class TestSQLiteScheduler: + """Scheduler polling works on SQLite without FOR UPDATE SKIP LOCKED.""" + + async def test_poll_and_schedule(self, sqlite_session_factory): + """poll_and_schedule creates PENDING runs on SQLite.""" + from automation.scheduler import poll_and_schedule + + # Create a due automation + now = datetime.now(timezone.utc) + async with sqlite_session_factory() as session: + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Scheduler Test", + trigger={ + "type": "cron", + "schedule": "* * * * *", + "timezone": "UTC", + }, + tarball_path="oh-internal://test", + entrypoint="python main.py", + enabled=True, + last_triggered_at=now - timedelta(hours=1), + last_polled_at=None, + ) + session.add(automation) + await session.commit() + + runs = await poll_and_schedule(sqlite_session_factory) + assert len(runs) >= 1 + assert all(r.status == AutomationRunStatus.PENDING for r in runs) + + async def test_fetch_enabled_automations_no_lock(self, sqlite_session_factory): + """_fetch_enabled_automations works on SQLite (no row locking).""" + from automation.scheduler import _fetch_enabled_automations + + now = datetime.now(timezone.utc) + async with sqlite_session_factory() as session: + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Fetch Test", + trigger={"type": "cron", "schedule": "0 0 * * *", "timezone": "UTC"}, + tarball_path="oh-internal://test", + entrypoint="python main.py", + enabled=True, + last_polled_at=None, + ) + session.add(automation) + await session.commit() + + async with sqlite_session_factory() as session: + poll_threshold = now - timedelta(seconds=60) + automations = await _fetch_enabled_automations( + session, batch_size=10, poll_threshold=poll_threshold + ) + assert len(automations) >= 1 + + +# --------------------------------------------------------------------------- +# Dispatcher compatibility +# --------------------------------------------------------------------------- + + +class TestSQLiteDispatcher: + """Dispatcher polling works on SQLite without FOR UPDATE SKIP LOCKED.""" + + async def test_poll_pending_runs_no_lock(self, sqlite_session_factory): + """_poll_pending_runs works on SQLite (no row locking).""" + from automation.dispatcher import _poll_pending_runs + + async with sqlite_session_factory() as session: + automation = Automation( + user_id=uuid.uuid4(), + org_id=uuid.uuid4(), + name="Dispatcher Test", + trigger={"type": "cron", "schedule": "0 0 * * *"}, + tarball_path="oh-internal://test", + entrypoint="python main.py", + enabled=True, + ) + session.add(automation) + await session.flush() + + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.PENDING, + ) + session.add(run) + await session.commit() + + async with sqlite_session_factory() as session: + runs = await _poll_pending_runs(session, batch_size=10) + assert len(runs) == 1 + assert runs[0].status == AutomationRunStatus.PENDING diff --git a/uv.lock b/uv.lock index 7cf1ba5..59ade2a 100644 --- a/uv.lock +++ b/uv.lock @@ -2161,6 +2161,7 @@ name = "openhands-automation" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "aiosqlite" }, { name = "alembic" }, { name = "asyncpg" }, { name = "boto3" }, @@ -2198,6 +2199,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiosqlite", specifier = ">=0.20" }, { name = "alembic", specifier = ">=1.14" }, { name = "asyncpg", specifier = ">=0.30" }, { name = "boto3", specifier = ">=1.35" },