Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions providers/common/ai/docs/operators/agent.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,30 @@ fails mid-run (network error, timeout, transient API failure), a plain retry
re-executes every LLM call and tool call from scratch -- repeating work that
already succeeded and incurring additional cost.

Setting ``durable=True`` caches each LLM response and tool result to
ObjectStorage as it completes. On retry, completed steps are replayed from the
cache and only the remaining steps run against the live model and tools. The
cache is deleted after successful completion.
Setting ``durable=True`` caches each LLM response and tool result as it
completes. On retry, completed steps are replayed from the cache and only the
remaining steps run against the live model and tools. The cache is deleted
after successful completion.

Durable execution only helps when the task has retries configured. Without
retries there is nothing to replay.

**Configuration**

Set the cache location in ``airflow.cfg``. The task raises ``ValueError`` at
runtime if ``durable=True`` and the option is missing.
On **Airflow >= 3.3** the cache is stored in the
:doc:`task state store <apache-airflow:core-concepts/task-state-store>`,
scoped to the task instance. No configuration is required; the store handles
persistence across retries.

By default each cached step is written to the Airflow metadata database. Model
responses and large tool results can be sizable, so for agents with large
payloads configure ``[workers] state_store_backend`` to offload step values to
external storage (e.g. object storage) instead of the metadata database; the
provider then stores only a reference in the database.

On **Airflow < 3.3** the cache is persisted to ObjectStorage and the location
must be set in ``airflow.cfg``. The task raises ``ValueError`` at runtime if
``durable=True`` and the option is missing.

.. code-block:: ini

Expand Down Expand Up @@ -251,10 +263,10 @@ cache:

**How it works**

1. On first execution, each LLM response and tool result is saved to a JSON
file as the agent progresses, together with a fingerprint of the request
that produced it (model, message history, settings, and tools for LLM
steps; tool name, arguments, and call id for tool steps).
1. On first execution, each LLM response and tool result is saved as the agent
progresses, together with a fingerprint of the request that produced it
(model, message history, settings, and tools for LLM steps; tool name,
arguments, and call id for tool steps).
2. If the task fails and Airflow retries it, completed steps are loaded from
the cache and returned without calling the model or tool. Steps not yet in
the cache proceed normally.
Expand All @@ -266,29 +278,31 @@ cache:
an LLM step produces fresh tool call ids, so tool results recorded under
the old conversation no longer match. A changed agent costs a re-run; it
never replays responses that belong to a different conversation.
4. After successful completion, the cache file is deleted.
4. After successful completion, the cached steps are deleted.

Replay verification compares the **requests** sent to models and tools, not
the code behind them. Editing a tool's implementation between attempts does
not invalidate an already-cached result for an identical call, and pointing
``llm_conn_id`` at a different endpoint serving the same model name does not
invalidate cached responses -- delete the cache file to force a fully fresh
run.
invalidate cached responses -- clear the cache to force a fully fresh run.

After the run, a single INFO summary line reports how many steps were
replayed vs executed fresh. Per-step detail is available at DEBUG level.

The cache file is named ``{dag_id}_{task_id}_{run_id}.json`` (with
``_{map_index}`` appended for mapped tasks) and stored under the configured
``durable_cache_path``. To force a completely fresh run, delete the cache file
for that task.
The cache is scoped to a single task instance (DAG id, run id, task id, and
map index), so each run replays only its own steps. On Airflow >= 3.3 the cache
lives in the task state store and is removed when the DAG run is cleaned up; on
Airflow < 3.3 it is a JSON file named ``{dag_id}_{task_id}_{run_id}.json`` (with
``_{map_index}`` appended for mapped tasks) under the configured
``durable_cache_path``.

.. note::

Runs that fail permanently (exhaust all retries) leave their cache file
behind. These orphaned files do not affect future DAG runs (each run gets
its own file) but will consume storage. Clean them up periodically or add
a lifecycle policy to the storage backend.
Runs that fail permanently (exhaust all retries) leave their cached steps
behind. These do not affect future DAG runs (each run is scoped separately).
On Airflow >= 3.3 they are reclaimed when the DAG run is removed; on Airflow
< 3.3 the orphaned JSON files consume storage until cleaned up, so add a
lifecycle policy to the storage backend or remove them periodically.

**Side effects and idempotency**

Expand Down Expand Up @@ -443,9 +457,10 @@ Parameters
prone to runaway tool loops, so ``tool_calls_limit`` is a useful guardrail.
See :ref:`howto/operator:llm` for an example. Default ``None``.
- ``durable``: When ``True``, enables step-level caching of model responses and
tool results via ObjectStorage. On retry, cached steps are replayed instead of
re-executing expensive LLM calls. Requires the ``[common.ai] durable_cache_path``
config option to be set. Default ``False``.
tool results. On retry, cached steps are replayed instead of re-executing
expensive LLM calls. On Airflow >= 3.3 the cache uses the task state store (no
configuration needed); on older cores it requires the ``[common.ai]
durable_cache_path`` config option to be set. Default ``False``.
- ``code_mode``: When ``True``, wraps the agent's tools in a single ``run_code``
tool that the model drives by writing Python, executed in the Monty sandbox.
Requires the ``code-mode`` extra. Default ``False``. See :ref:`code-mode`.
Expand Down
18 changes: 10 additions & 8 deletions providers/common/ai/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,17 @@ config:
durable_cache_path:
description: |
ObjectStorage URI used to persist per-step caches when running
``AgentOperator`` / ``@task.agent`` with ``durable=True``. Each task
execution writes a single JSON file under this path containing its
cached model responses and tool results, so that on retry the agent
can replay completed steps instead of re-issuing LLM calls and tool
invocations. The file is deleted on successful task completion.
``AgentOperator`` / ``@task.agent`` with ``durable=True`` on Airflow
**< 3.3**. Each task execution writes a single JSON file under this
path containing its cached model responses and tool results, so that
on retry the agent can replay completed steps instead of re-issuing
LLM calls and tool invocations. The file is deleted on successful task
completion.

Required when ``durable=True`` is used. Any scheme supported by
``airflow.sdk.ObjectStoragePath`` is accepted (``file://``, ``s3://``,
``gs://``, ``azure://``, ...).
Required for ``durable=True`` only on Airflow < 3.3. On Airflow >= 3.3
the cache is stored in the AIP-103 task state store and this option is
ignored. Any scheme supported by ``airflow.sdk.ObjectStoragePath`` is
accepted (``file://``, ``s3://``, ``gs://``, ``azure://``, ...).
version_added: 0.1.0
type: string
example: "file:///tmp/airflow_durable_cache"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Shared interface for durable execution storage backends."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

if TYPE_CHECKING:
from pydantic_ai.messages import ModelResponse

# Marks a stored entry as a cached tool result; lets ``load_tool_result``
# tell a cached ``None`` apart from a missing entry. Single source of truth so
# the two backends cannot drift on the envelope shape.
TOOL_RESULT_SENTINEL = "__durable_cached__"

# Prefix for durable cache keys. On the task state store backend (>= 3.3) the
# cache shares the task instance's key namespace with anything user code writes
# via ``context["task_state_store"]``; the reserved prefix keeps durable steps
# from colliding with user keys. No ``/`` -- task state store keys are a single,
# un-encoded URL path segment.
DURABLE_KEY_PREFIX = "__commonai_durable__"


@runtime_checkable
class DurableStorageProtocol(Protocol):
"""
Persistence contract shared by the durable execution storage backends.

Implemented by both :class:`~airflow.providers.common.ai.durable.storage.DurableStorage`
(ObjectStorage, Airflow < 3.3) and
:class:`~airflow.providers.common.ai.durable.task_state_store.TaskStateStoreDurableStorage`
(AIP-103 task state store, Airflow >= 3.3). ``CachingModel`` and
``CachingToolset`` depend on this interface, not a concrete backend.
"""

def save_model_response(self, key: str, response: ModelResponse, *, fingerprint: str | None) -> None: ...

def load_model_response(self, key: str) -> tuple[ModelResponse | None, str | None]: ...

def save_tool_result(self, key: str, result: Any, *, fingerprint: str | None) -> None: ...

def load_tool_result(self, key: str) -> tuple[bool, Any, str | None]: ...

def cleanup(self) -> None: ...
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import structlog
from pydantic_ai.models.wrapper import WrapperModel

from airflow.providers.common.ai.durable.base import DURABLE_KEY_PREFIX
from airflow.providers.common.ai.durable.fingerprint import fingerprint_model_request

log = structlog.get_logger(logger_name="task")
Expand All @@ -33,8 +34,8 @@
from pydantic_ai.models import ModelRequestParameters
from pydantic_ai.settings import ModelSettings

from airflow.providers.common.ai.durable.base import DurableStorageProtocol
from airflow.providers.common.ai.durable.step_counter import DurableStepCounter
from airflow.providers.common.ai.durable.storage import DurableStorage


@dataclass(init=False)
Expand All @@ -51,14 +52,14 @@ class CachingModel(WrapperModel):
discarded and the step re-runs live.
"""

storage: DurableStorage = field(repr=False)
storage: DurableStorageProtocol = field(repr=False)
counter: DurableStepCounter = field(repr=False)

def __init__(
self,
wrapped: Any,
*,
storage: DurableStorage,
storage: DurableStorageProtocol,
counter: DurableStepCounter,
) -> None:
super().__init__(wrapped)
Expand All @@ -72,7 +73,7 @@ async def request(
model_request_parameters: ModelRequestParameters,
) -> ModelResponse:
step = self.counter.next_step()
key = f"model_step_{step}"
key = f"{DURABLE_KEY_PREFIX}model_step_{step}"
# Fingerprint the *prepared* request, not the raw arguments. Concrete
# models call ``prepare_request()`` at the start of ``request()`` to merge
# their model-level ``settings`` and apply profile-specific transforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import structlog
from pydantic_ai.toolsets.wrapper import WrapperToolset

from airflow.providers.common.ai.durable.base import DURABLE_KEY_PREFIX
from airflow.providers.common.ai.durable.fingerprint import fingerprint_tool_call

if TYPE_CHECKING:
from pydantic_ai.toolsets.abstract import ToolsetTool

from airflow.providers.common.ai.durable.base import DurableStorageProtocol
from airflow.providers.common.ai.durable.step_counter import DurableStepCounter
from airflow.providers.common.ai.durable.storage import DurableStorage

log = structlog.get_logger(logger_name="task")

Expand All @@ -53,7 +54,7 @@ class CachingToolset(WrapperToolset[Any]):
executing their synchronous preamble in creation order).
"""

storage: DurableStorage = field(repr=False)
storage: DurableStorageProtocol = field(repr=False)
counter: DurableStepCounter = field(repr=False)

async def call_tool(
Expand All @@ -66,7 +67,7 @@ async def call_tool(
# Grab step index BEFORE any await -- ensures deterministic ordering
# even when multiple tool calls run concurrently via asyncio.gather.
step = self.counter.next_step()
key = f"tool_step_{step}"
key = f"{DURABLE_KEY_PREFIX}tool_step_{step}"
fingerprint = fingerprint_tool_call(name, tool_args, ctx.tool_call_id)

found, cached, cached_fingerprint = self.storage.load_tool_result(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import structlog
from pydantic_ai.messages import ModelMessagesTypeAdapter, ModelResponse

log = structlog.get_logger(logger_name="task")

# Sentinel to distinguish "cached None" from "no cache entry" for tool results.
_SENTINEL = "__durable_cached__"
# Shared with the task state store backend so the envelope shape cannot drift.
from airflow.providers.common.ai.durable.base import TOOL_RESULT_SENTINEL as _SENTINEL

log = structlog.get_logger(logger_name="task")

SECTION = "common.ai"

Expand Down
Loading
Loading