Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 58 additions & 21 deletions openhands/automation/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import logging
from datetime import datetime, timedelta

from pydantic import ValidationError
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from openhands.automation.db import using_sqlite
from openhands.automation.models import Automation, AutomationRun
from openhands.automation.utils import is_automation_due, utcnow
from openhands.automation.utils.run import create_pending_run
from openhands.automation.schemas import TriggerAdapter
from openhands.automation.utils import utcnow


logger = logging.getLogger("automation.scheduler")
Expand Down Expand Up @@ -74,6 +75,60 @@ async def _fetch_enabled_automations(
return list(result.scalars().all())


async def _create_pending_runs(
session: AsyncSession,
automations: list[Automation],
now: datetime,
) -> list[AutomationRun]:
"""Ask each automation's trigger to create a PENDING run if it's due.

Each automation's ``trigger`` JSON is parsed into a typed model
(``CronTrigger``/``EventTrigger``/``GithubTrigger``) and its
:meth:`_TriggerBase.create_pending_run` is awaited. A return value of
``None`` means "not due"; any other return is appended to the result.

Calls run **sequentially** because they share a single
:class:`AsyncSession` (SQLAlchemy async sessions are not safe for
concurrent use). Triggers that need to fan out external I/O — e.g.
:class:`~openhands.automation.schemas.GithubTrigger` polling multiple
repos — do so internally.

Per-trigger exceptions are logged and skipped so one bad trigger cannot
starve the rest of the batch.
"""
created: list[AutomationRun] = []
for automation in automations:
try:
trigger = TriggerAdapter.validate_python(automation.trigger)
except ValidationError:
logger.exception("Invalid trigger config for automation %s", automation.id)
continue

try:
run = await trigger.create_pending_run(session, automation, now)
except Exception:
logger.exception(
"Trigger %s raised while processing automation %s",
type(trigger).__name__,
automation.id,
)
continue

if run is None:
continue

created.append(run)
logger.info(
"Created pending run: run_id=%s automation_id=%s name=%s trigger_type=%s",
run.id,
automation.id,
automation.name,
automation.trigger.get("type"),
)

return created


async def poll_and_schedule(
session_factory: async_sessionmaker[AsyncSession],
batch_size: int = DEFAULT_BATCH_SIZE,
Expand Down Expand Up @@ -117,25 +172,7 @@ async def poll_and_schedule(
for automation in automations:
automation.last_polled_at = now

due_automations = [a for a in automations if is_automation_due(a, now)]

for automation in due_automations:
try:
run = await create_pending_run(session, automation)
created_runs.append(run)
logger.info(
"Created pending run: run_id=%s automation_id=%s "
"name=%s schedule=%s",
run.id,
automation.id,
automation.name,
automation.trigger.get("schedule"),
)
except Exception:
logger.exception(
"Failed to create run for automation %s",
automation.id,
)
created_runs.extend(await _create_pending_runs(session, automations, now))

# Always commit to release row locks from FOR UPDATE SKIP LOCKED,
# even if no runs were created
Expand Down
Loading
Loading