Use task state store for common.ai durable execution on Airflow 3.3+#68926
Open
kaxil wants to merge 2 commits into
Open
Use task state store for common.ai durable execution on Airflow 3.3+#68926kaxil wants to merge 2 commits into
common.ai durable execution on Airflow 3.3+#68926kaxil wants to merge 2 commits into
Conversation
durable=True caching is per-task-instance, per-run, and cleared on success -- exactly the scope of the AIP-103 task state store. On Airflow >= 3.3 the cache now lives there instead of an ObjectStorage JSON file, so durable execution no longer needs the [common.ai] durable_cache_path config, and large step payloads are offloaded automatically through the configured worker state store backend. The ObjectStorage backend is kept as the fallback for Airflow < 3.3, selected at runtime by the operator. Both backends share one storage interface, so the caching model/toolset wrappers are unchanged.
common.ai durable execution on Airflow 3.3+
…pelling
Three CI failures on this branch, all in the durable execution code:
- Compat 3.0.6/3.1.8/3.2.2: ``test_task_state_store`` used
``pytest.importorskip("airflow.sdk.execution_time.context")`` to skip on
pre-3.3 cores, but that module exists on older cores -- only the
``NEVER_EXPIRE`` name it pulls in (via ``task_state_store``) is 3.3+. Collection
blew up before the skip could fire. Gate on ``AIRFLOW_V_3_3_PLUS`` instead,
matching how ``AgentOperator._build_durable_storage`` guards the import.
- Non-DB / Low-dep core: ``test_providers_modules_should_have_tests`` flagged the
new ``durable/base.py`` as having no test. Add ``test_base.py`` covering the
reserved-key invariants and the ``DurableStorageProtocol`` runtime contract.
- Docs spellcheck: "sizeable" -> "sizable" in the agent operator docs.
amoghrajesh
approved these changes
Jun 24, 2026
|
|
||
| This module is imported only on Airflow >= 3.3 (see | ||
| ``AgentOperator._build_durable_storage``); ``NEVER_EXPIRE`` does not exist on | ||
| older cores. |
Contributor
There was a problem hiding this comment.
Suggested change
| older cores. | |
| older airflow versions. |
Comment on lines
+560
to
+584
| @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", True) | ||
| def test_build_durable_storage_uses_task_state_store_on_3_3(self): | ||
| """On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed.""" | ||
| from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage | ||
|
|
||
| accessor = MagicMock() | ||
| op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True) | ||
|
|
||
| storage = op._build_durable_storage({"task_state_store": accessor}) | ||
|
|
||
| assert isinstance(storage, TaskStateStoreDurableStorage) | ||
| assert storage._store is accessor | ||
|
|
||
| @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", False) | ||
| def test_build_durable_storage_falls_back_to_object_storage_below_3_3(self): | ||
| """On Airflow < 3.3 the cache falls back to the ObjectStorage backend.""" | ||
| from airflow.providers.common.ai.durable.storage import DurableStorage | ||
|
|
||
| ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1) | ||
| op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True) | ||
|
|
||
| storage = op._build_durable_storage({"task_instance": ti}) | ||
|
|
||
| assert isinstance(storage, DurableStorage) | ||
| assert storage._cache_id == "d_t_r" |
Contributor
There was a problem hiding this comment.
Isn't it better to just run the respective tests on their versions? ie: skip test_build_durable_storage_uses_task_state_store_on_3_3 for < 3.3?
| """On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed.""" | ||
| from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage | ||
|
|
||
| accessor = MagicMock() |
Contributor
There was a problem hiding this comment.
nit: add a TaskStateStoreAccessor spec
| """On Airflow < 3.3 the cache falls back to the ObjectStorage backend.""" | ||
| from airflow.providers.common.ai.durable.storage import DurableStorage | ||
|
|
||
| ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1) |
Contributor
There was a problem hiding this comment.
nit: add a spec here and elsewhere where its used like this
Contributor
There was a problem hiding this comment.
This file verifies the protocol with two artificial backends (_CompleteBackend, _PartialBackend) but never checks that the real DurableStorage (the ObjectStorage backend) satisfies DurableStorageProtocol. Since it's @runtime_checkable, we could do that to catch any future method signature drift.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
durable=TrueonAgentOperator/@task.agentcaches each model response and toolresult so a retry replays completed steps instead of re-running expensive LLM calls.
Until now that cache was always an ObjectStorage JSON file, which required
[common.ai] durable_cache_pathto be set.The durable cache is per-task-instance, per-run, and cleared on success, which is exactly
the scope of the AIP-103 task state store.
On Airflow >= 3.3 the cache now lives there:
durable_cache_pathconfiguration needed.[workers] state_store_backendis set.On Airflow < 3.3 the ObjectStorage backend is unchanged and selected automatically.
Design
DurableStorageProtocol, soCachingModelandCachingToolsetdepend on the interface, not a concrete backend.AgentOperator._build_durable_storagepicks the backend by Airflow version.__commonai_durable__...) so they cannot collidewith keys user code writes via
context["task_state_store"]in the same task instance.NEVER_EXPIREso a retry can replay them regardless ofretry_delayor the global retention config;cleanup()removes only the keys this run touched.Gotchas
state_store_backend), cached step values are stored inthe metadata database. For agents with large responses, configure
[workers] state_store_backendto offload them to external storage.agent_params={"tools": [...]}is not wrapped by the caching toolset(only
toolsets=are), so only its model steps replay. This is existing behavior, unchanged here.Verification
Verified end-to-end on a real Airflow 3.3 standalone: a durable
AgentOperatorwhose toolfails on the first attempt replays the cached model step from the task state store on the
retry, and a direct round-trip task confirms cache, replay, and cleanup against the live
Execution API and metadata DB.