Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions migrations/versions/005_session_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Add session-based event routing tables.

This migration adds two tables to support session-based conversation reuse for
event-triggered automations:

1. ``automation_sessions`` — tracks active sessions (sandbox + session key).
An ACTIVE session routes incoming events to its running sandbox rather than
creating a new run for each event.

2. ``pending_session_events`` — queues events when a session's sandbox is alive
or when a sandbox has died and ``on_sandbox_death`` is set to "queue"/"restart".

Cross-database compatible: works with both PostgreSQL and SQLite.

Revision ID: 005
Revises: 004
Create Date: 2026-05-15
"""

from collections.abc import Sequence

from alembic import op
from sqlalchemy import JSON, Column, DateTime, String, Uuid, text


revision: str = "005"
down_revision: str = "004"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# 1. Create automation_sessions table
op.create_table(
"automation_sessions",
Column("id", Uuid, primary_key=True),
Column("automation_id", Uuid, nullable=False),
Column("session_key", String(255), nullable=False),
Column("run_id", Uuid, nullable=False),
# Sandbox identifier (populated by the dispatcher after sandbox creation)
Column("sandbox_id", String(255), nullable=True),
# Status: ACTIVE, EXPIRED, or DEAD
# Using String instead of Enum for cross-database compatibility
Column("status", String(20), nullable=False, server_default="ACTIVE"),
Column(
"started_at",
DateTime(timezone=True),
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
),
# Pre-computed expiry deadline: started_at + session_timeout_seconds
Column("expires_at", DateTime(timezone=True), nullable=False),
Column(
"last_event_at",
DateTime(timezone=True),
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
),
)
op.create_index(
"ix_automation_sessions_automation_id",
"automation_sessions",
["automation_id"],
)
op.create_index(
"ix_automation_sessions_status",
"automation_sessions",
["status"],
)
# Compound index for the primary lookup pattern:
# SELECT ... WHERE automation_id = ? AND session_key = ? AND status = 'ACTIVE'
op.create_index(
"ix_session_lookup",
"automation_sessions",
["automation_id", "session_key", "status"],
)

# 2. Create pending_session_events table
op.create_table(
"pending_session_events",
Column("id", Uuid, primary_key=True),
Column("automation_id", Uuid, nullable=False),
Column("session_key", String(255), nullable=False),
# Event payload (same format as automation_runs.event_payload)
Column("event_payload", JSON, nullable=False),
Column(
"created_at",
DateTime(timezone=True),
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
),
)
op.create_index(
"ix_pending_session_events_automation_id",
"pending_session_events",
["automation_id"],
)
op.create_index(
"ix_pending_session_events_session_key",
"pending_session_events",
["session_key"],
)
# Compound index for fetching queued events for a specific session
op.create_index(
"ix_pending_session_events_lookup",
"pending_session_events",
["automation_id", "session_key"],
)


def downgrade() -> None:
op.drop_table("pending_session_events")
op.drop_table("automation_sessions")
71 changes: 71 additions & 0 deletions openhands/automation/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
mark_run_terminal,
update_sandbox_id,
)
from openhands.automation.utils.session import create_session, extract_session_key
from openhands.automation.utils.tarball_validation import (
is_http_url,
parse_internal_upload_id,
Expand Down Expand Up @@ -114,6 +115,72 @@ async def _poll_pending_runs(
return list(result.scalars().all())


async def _maybe_create_session(
run: AutomationRun,
sandbox_id: str | None,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Create an AutomationSession if the run's trigger has session config.

Called after a run is successfully dispatched. If the trigger has a
``session`` configuration, extract the session key from the event payload
and record the session so subsequent events are routed here.

Failures are logged but do not affect the run (best-effort).
"""
from openhands.automation.schemas import EventTrigger

automation = run.automation
if not automation:
return

try:
trigger_data = automation.trigger
if trigger_data.get("type") != "event":
return

trigger = EventTrigger.model_validate(trigger_data)
if not trigger.session:
return

session_cfg = trigger.session
event_payload = run.event_payload or {}
session_key = extract_session_key(session_cfg.key_expr, event_payload)

if session_key is None:
logger.warning(
"Could not extract session key for run %s "
"using expr=%r; session not created",
run.id,
session_cfg.key_expr,
)
return

async with session_factory() as db_session:
session_record = await create_session(
automation_id=automation.id,
session_key=session_key,
run_id=run.id,
session_timeout_seconds=session_cfg.session_timeout_seconds,
db_session=db_session,
)
if sandbox_id:
session_record.sandbox_id = sandbox_id
await db_session.commit()

logger.info(
"Created session key=%s for run %s automation %s",
session_key,
run.id,
automation.id,
)
except Exception:
logger.exception(
"Failed to create session for run %s; continuing without session",
run.id,
)


async def _execute_run(
run: AutomationRun,
settings: ServiceSettings,
Expand Down Expand Up @@ -268,6 +335,10 @@ async def _fail(error: str, disable: bool = False) -> None:
if result.success:
if ctx.sandbox_id:
await update_sandbox_id(session_factory, run.id, ctx.sandbox_id)
# If this run was triggered by an event with session config, create a
# session record so subsequent events with the same session key are
# routed to this sandbox instead of starting a new run.
await _maybe_create_session(run, ctx.sandbox_id, session_factory)
logger.info(
"Automation dispatched successfully, waiting for callback",
extra=_log_ctx(sandbox_id=ctx.sandbox_id),
Expand Down
66 changes: 57 additions & 9 deletions openhands/automation/event_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
from openhands.automation.event_schemas import WebhookEvent, parse_event
from openhands.automation.schemas import EventResponse
from openhands.automation.trigger_matcher import matches_trigger
from openhands.automation.utils.session import (
extract_session_key,
get_active_session,
queue_pending_event,
)
from openhands.automation.utils.webhook import (
create_automation_run,
get_event_automations,
Expand Down Expand Up @@ -151,33 +156,75 @@ async def receive_event(
org_id,
)

# 6. Find matching automations
# 6. Find matching automations (preserve trigger alongside each automation)
automations = await get_event_automations(org_id, source, session)
matched_automations = []
matched: list[tuple] = [] # (Automation, EventTrigger)

for automation, trigger in automations:
# Match trigger against webhook payload using JMESPath filter
if matches_trigger(trigger, source, event.event_key, webhook_payload):
matched_automations.append(automation)
matched.append((automation, trigger))

logger.info(
"Event matched %d/%d automations for org=%s",
len(matched_automations),
len(matched),
len(automations),
org_id,
)

# 7. Create PENDING runs for matched automations
# For Pydantic-parsed events (GitHub), use model_dump() for typed fields
# For custom webhooks, use the webhook payload directly
# 7. Create PENDING runs or queue events for matched automations.
# For Pydantic-parsed events (GitHub), use model_dump() for typed fields.
# For custom webhooks, use the webhook payload directly.
event_payload = (
event.model_dump(mode="json")
if isinstance(event, BaseModel)
else webhook_payload
)

run_ids: list[str] = []
for automation in matched_automations:
events_queued: int = 0

for automation, trigger in matched:
session_cfg = trigger.session

if session_cfg:
# Session mode: route to existing session or start a new one
session_key = extract_session_key(session_cfg.key_expr, event_payload)

if session_key is None:
logger.warning(
"Could not extract session key for automation %s "
"using expr=%r; falling back to new run",
automation.id,
session_cfg.key_expr,
)
else:
active_session = await get_active_session(
automation.id, session_key, session
)

if active_session is not None:
# Route to existing session — queue event for the running sandbox
await queue_pending_event(
automation.id, session_key, event_payload, session
)
events_queued += 1
logger.info(
"Event queued to existing session key=%s "
"automation=%s session_id=%s",
session_key,
automation.id,
active_session.id,
)
continue # Skip new run creation

# No active session — fall through to create a new run below
logger.info(
"No active session for key=%s automation=%s; creating new run",
session_key,
automation.id,
)

run = await create_automation_run(
automation, session, event_payload=event_payload
)
Expand All @@ -187,6 +234,7 @@ async def receive_event(

return EventResponse(
received=True,
matched=len(matched_automations),
matched=len(matched),
runs_created=run_ids,
events_queued=events_queued,
)
Loading
Loading