Skip to content

Use task state store for common.ai durable execution on Airflow 3.3+#68926

Open
kaxil wants to merge 2 commits into
apache:mainfrom
astronomer:common-ai-durable-task-state-store
Open

Use task state store for common.ai durable execution on Airflow 3.3+#68926
kaxil wants to merge 2 commits into
apache:mainfrom
astronomer:common-ai-durable-task-state-store

Conversation

@kaxil

@kaxil kaxil commented Jun 24, 2026

Copy link
Copy Markdown
Member

durable=True on AgentOperator / @task.agent caches each model response and tool
result so a retry replays completed steps instead of re-running expensive LLM calls.
Until now that cache was always an ObjectStorage JSON file, which required
[common.ai] durable_cache_path to be set.

The durable cache is per-task-instance, per-run, and cleared on success, which is exactly
the scope of the AIP-103 task state store.
On Airflow >= 3.3 the cache now lives there:

  • No durable_cache_path configuration needed.
  • Large step payloads offload automatically when [workers] state_store_backend is set.
  • Cached steps are deleted on success, and reclaimed by the DAG-run cascade if a run fails permanently.

On Airflow < 3.3 the ObjectStorage backend is unchanged and selected automatically.

Design

  • Both backends implement a shared DurableStorageProtocol, so CachingModel and
    CachingToolset depend on the interface, not a concrete backend.
  • AgentOperator._build_durable_storage picks the backend by Airflow version.
  • Durable keys use a reserved prefix (__commonai_durable__...) so they cannot collide
    with keys user code writes via context["task_state_store"] in the same task instance.
  • Entries are written with NEVER_EXPIRE so a retry can replay them regardless of
    retry_delay or the global retention config; cleanup() removes only the keys this run touched.

Gotchas

  • On a default 3.3+ install (no state_store_backend), cached step values are stored in
    the metadata database. For agents with large responses, configure
    [workers] state_store_backend to offload them to external storage.
  • A tool passed via agent_params={"tools": [...]} is not wrapped by the caching toolset
    (only toolsets= are), so only its model steps replay. This is existing behavior, unchanged here.

Verification

Verified end-to-end on a real Airflow 3.3 standalone: a durable AgentOperator whose tool
fails on the first attempt replays the cached model step from the task state store on the
retry, and a direct round-trip task confirms cache, replay, and cleanup against the live
Execution API and metadata DB.

ui_02_durable_agent_logs ui_01_grid

durable=True caching is per-task-instance, per-run, and cleared on success --
exactly the scope of the AIP-103 task state store. On Airflow >= 3.3 the cache
now lives there instead of an ObjectStorage JSON file, so durable execution no
longer needs the [common.ai] durable_cache_path config, and large step payloads
are offloaded automatically through the configured worker state store backend.

The ObjectStorage backend is kept as the fallback for Airflow < 3.3, selected at
runtime by the operator. Both backends share one storage interface, so the
caching model/toolset wrappers are unchanged.
@kaxil kaxil requested a review from gopidesupavan as a code owner June 24, 2026 00:40
@kaxil kaxil requested a review from amoghrajesh June 24, 2026 00:41
@kaxil kaxil changed the title Use task state store for common.ai durable execution on Airflow 3.3+ Use task state store for common.ai durable execution on Airflow 3.3+ Jun 24, 2026
…pelling

Three CI failures on this branch, all in the durable execution code:

- Compat 3.0.6/3.1.8/3.2.2: ``test_task_state_store`` used
  ``pytest.importorskip("airflow.sdk.execution_time.context")`` to skip on
  pre-3.3 cores, but that module exists on older cores -- only the
  ``NEVER_EXPIRE`` name it pulls in (via ``task_state_store``) is 3.3+. Collection
  blew up before the skip could fire. Gate on ``AIRFLOW_V_3_3_PLUS`` instead,
  matching how ``AgentOperator._build_durable_storage`` guards the import.

- Non-DB / Low-dep core: ``test_providers_modules_should_have_tests`` flagged the
  new ``durable/base.py`` as having no test. Add ``test_base.py`` covering the
  reserved-key invariants and the ``DurableStorageProtocol`` runtime contract.

- Docs spellcheck: "sizeable" -> "sizable" in the agent operator docs.

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!


This module is imported only on Airflow >= 3.3 (see
``AgentOperator._build_durable_storage``); ``NEVER_EXPIRE`` does not exist on
older cores.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
older cores.
older airflow versions.

Comment on lines +560 to +584
@patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", True)
def test_build_durable_storage_uses_task_state_store_on_3_3(self):
"""On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed."""
from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage

accessor = MagicMock()
op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True)

storage = op._build_durable_storage({"task_state_store": accessor})

assert isinstance(storage, TaskStateStoreDurableStorage)
assert storage._store is accessor

@patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", False)
def test_build_durable_storage_falls_back_to_object_storage_below_3_3(self):
"""On Airflow < 3.3 the cache falls back to the ObjectStorage backend."""
from airflow.providers.common.ai.durable.storage import DurableStorage

ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1)
op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True)

storage = op._build_durable_storage({"task_instance": ti})

assert isinstance(storage, DurableStorage)
assert storage._cache_id == "d_t_r"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to just run the respective tests on their versions? ie: skip test_build_durable_storage_uses_task_state_store_on_3_3 for < 3.3?

"""On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed."""
from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage

accessor = MagicMock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a TaskStateStoreAccessor spec

"""On Airflow < 3.3 the cache falls back to the ObjectStorage backend."""
from airflow.providers.common.ai.durable.storage import DurableStorage

ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a spec here and elsewhere where its used like this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file verifies the protocol with two artificial backends (_CompleteBackend, _PartialBackend) but never checks that the real DurableStorage (the ObjectStorage backend) satisfies DurableStorageProtocol. Since it's @runtime_checkable, we could do that to catch any future method signature drift.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants