Skip to content

Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues#60108

Open
anishgirianish wants to merge 7 commits intoapache:mainfrom
anishgirianish:fix/token-expiration-worker
Open

Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues#60108
anishgirianish wants to merge 7 commits intoapache:mainfrom
anishgirianish:fix/token-expiration-worker

Conversation

@anishgirianish
Copy link
Contributor

@anishgirianish anishgirianish commented Jan 4, 2026


Summary

Tasks waiting in executor queues (Celery, Kubernetes) can have their JWT tokens expire before execution starts, causing auth failures on the Execution API. This is a real problem in production, when queues back up or workers are slow to pick up tasks, the original short-lived token expires and the worker gets a 403 when it finally tries to start the task.

Fixes: #53713
Related: #59553

Approach

Two-token mechanism: a long-lived workload token (24h default, configurable) travels with the task through the queue, and a short-lived execution token is issued when the task actually starts running.

The workload token carries a scope: "workload" claim and is restricted to the /run endpoint only, enforced via FastAPI SecurityScopes and a custom ExecutionAPIRoute. When /run succeeds, it returns an execution token via X-Execution-Token header. The SDK client picks it up and uses it for all subsequent API calls. The existing JWTReissueMiddleware handles refreshing execution tokens near expiry and skips workload tokens.

For dag.test() / InProcessExecutionAPI, auth is bypassed and a stub JWTGenerator with a random secret is used so no signing key configuration is needed.

New config: execution_api.jwt_workload_token_expiration_time (default 86400s)

Built on @ashb's SecurityScopes foundation.

Security considerations

Even if a workload token is intercepted, it can only call /run which already guards against running a task more than once (returns 409 if the task isn't in QUEUED/RESTARTING state). All other endpoints reject workload tokens , they require execution scope. The execution token issued by /run is short-lived and automatically refreshed, keeping the existing security posture for all API calls during task execution.

Testing

Tested end-to-end with CeleryExecutor in Breeze, triggered a DAG, confirmed tasks completed successfully with the token swap happening transparently. Unit tests cover token generation, scope enforcement (accepted on /run, rejected elsewhere), invalid scope handling, execution token header in response, SDK client token swap and priority, and registry teardown to prevent test pollution.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:task-sdk labels Jan 4, 2026
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from b183c74 to 9c31417 Compare January 4, 2026 21:05
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch 3 times, most recently from c707ddc to 4ef9dfe Compare January 4, 2026 22:45
@eladkal eladkal added this to the Airflow 3.1.6 milestone Jan 6, 2026
@tirkarthi
Copy link
Contributor

As per my understanding this was removed in #55506 to use a middleware that refreshes token. Are you running an instance with execution api only separately with api-server? Could this middleware approach be extended for task-sdk calls too?

cc: @vincbeck @pierrejeambrun

@anishgirianish
Copy link
Contributor Author

Hi @tirkarthi,
Thanks for pointing out the middleware approach from #55506 - that's helpful context.

I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it.

Totally happy to go with whichever approach the team feels is better!

cc: @vincbeck @pierrejeambrun

@vincbeck
Copy link
Contributor

vincbeck commented Jan 7, 2026

Hi @tirkarthi, Thanks for pointing out the middleware approach from #55506 - that's helpful context.

I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it.

Totally happy to go with whichever approach the team feels is better!

cc: @vincbeck @pierrejeambrun

Would love to hear @ashb or @amoghrajesh 's opinion on this one

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this approach. It lets any Execution API token be resurrected which fundamentally breaks lots of security assumptions -- it amounts to having tokens not expire. That is bad.

Instead what we should do is generate a new token (i.e. ones with extra/different set of JWT claims) that is only valid for the /run endpoint and valid for longer (say 24hours, make it configurable) and this is what gets sent in the workload.

The run endpoint then would set the header to give the running task a "short lived" token (the one we have right now basically) that is usable on the rest of the Execution API. This approach is safer as the existing controls in the /run endpoint already prevent a task being run one than once, which should also prevent against "resurrecting" an expired token and using it to access things like connections etc. And we should validate that the token used on all endpoints but run is explicitly lacking this new claim.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better approach, and on the right track, thanks.

Some changes though:

  • "queue" is not the right thing to use, as these tokens could be used for executing other workloads soon (for instance we have already talked about wanting Dag level callbacks to be executed on the workers, not in the dag processor, which would be done by having a new type from the ExecuteTaskWorkload).

    so maybe we have "scope": "ExecuteTaskWorkload"?

  • A little bit of refactoring is needed before we are ready to merge this.

@ashb ashb self-requested a review January 9, 2026 12:09
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from e7e3ae1 to e879863 Compare January 9, 2026 23:52
@anishgirianish anishgirianish changed the title Add token refresh mechanism for Execution API (#59553) Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues (#59553) Jan 10, 2026
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from b511b8f to 57ac225 Compare January 10, 2026 07:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements a two-token flow for the Execution API to avoid auth failures when tasks sit in executor queues long enough for their original JWT to expire.

Changes:

  • Add workload token generation for task workloads (long-lived) and issue an execution token (short-lived) from PATCH /run via X-Execution-Token.
  • Update the Task SDK client to swap its bearer token when X-Execution-Token is received (and prioritize it over Refreshed-API-Token).
  • Add configuration for workload-token expiration and expand tests/mocks to cover the new token behavior.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
task-sdk/src/airflow/sdk/api/client.py Swaps client auth when X-Execution-Token header is returned by the server.
task-sdk/tests/task_sdk/api/test_client.py Adds tests validating execution-token swap and precedence over refreshed tokens.
airflow-core/src/airflow/api_fastapi/auth/tokens.py Adds generate_workload_token() and adds a valid_for override to JWT generation.
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py Allows workload tokens on /run and returns X-Execution-Token on success.
airflow-core/src/airflow/executors/workloads/base.py Switches workload token generation to the new workload-scoped token method.
airflow-core/src/airflow/config_templates/config.yml Introduces execution_api.jwt_workload_token_expiration_time with a 24h default.
airflow-core/tests/unit/api_fastapi/auth/test_tokens.py Adds tests for workload token scope and JWT valid_for override.
airflow-core/tests/unit/api_fastapi/execution_api/conftest.py Registers a mocked JWTGenerator for execution API tests.
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py Adds a test asserting /run returns X-Execution-Token.
airflow-core/tests/unit/jobs/test_scheduler_job.py Extends JWT generator mocks to include generate_workload_token().
devel-common/src/tests_common/test_utils/mock_executor.py Extends JWT generator mock to include generate_workload_token().

You can also share your feedback on Copilot code review. Take the survey.

@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch 8 times, most recently from ae3141d to e1f8725 Compare March 19, 2026 23:20
@anishgirianish anishgirianish requested a review from kaxil March 20, 2026 18:15
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from e1f8725 to ff8f59c Compare March 20, 2026 18:16
@anishgirianish
Copy link
Contributor Author

Hi @kaxil @ashb, Thank you very much for the detailed review. I have addresed the feeback in the latest push. Would like to request you for your re-reveiw when ever you get a chance. Thank you so much

Seconds until workload JWT tokens expire. These long-lived tokens are sent
with task workloads to executors and can only call the /run endpoint.
Set long enough to cover maximum expected queue wait time.
version_added: 3.2.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll merge this for 3.2.0, so

Suggested change
version_added: 3.2.0
version_added: 3.2.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

assert resp.status_code == 403
assert "Invalid token scope" in resp.json()["detail"]

def test_workload_scope_accepted_on_run_endpoint(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we already have a test for this, but I think the negative test is important too -- that the workload scoped token is not accepted on other endpoints. You don't have to exhaustively check all endpoints, but testing at least one other to ensure it gives a 4xx is worthwhile I think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we should check this once or twice on other routers too -- the workload scoped tokens should not work anywhere but the /run TI end point.

Comment on lines +955 to +958
if new_token := response.headers.get("X-Execution-Token"):
log.debug("Received execution token, swapping auth")
self.auth = BearerAuth(new_token)
elif new_token := response.headers.get("Refreshed-API-Token"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we even need a new header? Couldn't we use Refreshed-API-Token in both cases? Also if you do think a new header is worth it then remove the X- prefix -- that is not recommended by HTTP standards anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree too.

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good now, just a few basic qns / feedback otherwise I am good.

Comment on lines +295 to +297
generator: JWTGenerator = services.get(JWTGenerator)
execution_token = generator.generate(extras={"sub": str(task_instance_id)})
response.headers["X-Execution-Token"] = execution_token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If services.get(JWTGenerator) raises or fails for whatever reason / if generate raises, the API would return 500 here, but we committed the TI as running earlier. To avoid this, I suggest to move this up in the try/except itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can ever fali really.

Seconds until workload JWT tokens expire. These long-lived tokens are sent
with task workloads to executors and can only call the /run endpoint.
Set long enough to cover maximum expected queue wait time.
version_added: 3.2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

)

generator: JWTGenerator = services.get(JWTGenerator)
execution_token = generator.generate(extras={"sub": str(task_instance_id)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set the scope as execution here to be explicit?

Comment on lines +955 to +958
if new_token := response.headers.get("X-Execution-Token"):
log.debug("Received execution token, swapping auth")
self.auth = BearerAuth(new_token)
elif new_token := response.headers.get("Refreshed-API-Token"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree too.

@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from ff8f59c to 4afd940 Compare March 26, 2026 00:18
"exp": 9999999999,
"iat": 1000000000,
}
lifespan.registry.register_value(JWTValidator, validator)
Copy link
Member

@kaxil kaxil Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JWTValidator registration is dead code -- the client fixture's mock_jwt_bearer overrides _jwt_bearer via FastAPI dependency overrides, so FastAPI never calls the real _jwt_bearer (which would use JWTValidator from the registry). Every request through client gets scope: "execution" regardless of what's registered here.

The test passes because execution-scoped tokens are allowed on /run, not because workload-scoped tokens are. To actually test workload token acceptance, the test needs to either:

  1. Remove the _jwt_bearer dependency override for this test and let the real auth flow use this JWTValidator, or
  2. Override mock_jwt_bearer to return TIToken(..., claims={..., "scope": "workload"}) instead of the conftest's hardcoded "scope": "execution".

@kaxil kaxil modified the milestones: Airflow 3.1.9, Airflow 3.2.1 Mar 26, 2026

lifespan.registry.close()

exec_app.dependency_overrides.pop(_jwt_bearer, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lifespan.registry.close() is new here (no other test file does this), and the registry is shared across all tests via cached_app. Closing it could break subsequent tests that try to look up services from the same registry. The existing pattern in other test files (e.g., test_task_instances.py, test_router.py) registers values on lifespan.registry without closing it afterward. I'd drop this close() call to match what the rest of the test suite does.

)

generator: JWTGenerator = services.get(JWTGenerator)
execution_token = generator.generate(extras={"sub": str(task_instance_id)})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token generation happens outside the try...except SQLAlchemyError block. If services.get(JWTGenerator) or generator.generate() raises (missing service, crypto error, etc.), the client gets a raw 500 with no useful detail. Worth wrapping this in its own try/except, or at minimum a log line, so operators can tell the difference between "database error" and "token generation failed".


kid: str = attrs.field(default=attrs.Factory(_generate_kid, takes_self=True))
valid_for: float
workload_valid_for: float = attrs.field(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workload_valid_for default reads from config via _conf_factory, and _jwt_generator() in app.py also reads the same config key and passes it explicitly. The explicit kwarg takes precedence, so the default factory never runs in production. Having two code paths that reference the same config key is easy to get out of sync -- consider dropping the attrs default (make it required like valid_for) and always passing it explicitly, or drop the explicit kwarg in _jwt_generator() and let the default handle it.

# wait times so avoid refreshing them.
if claims.get("scope") == "workload":
return response

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The early return for workload tokens skips the refresh logic (correct), but it also skips the except block below. If avalidated_claims raises for a workload token, execution falls into the outer except and the response still gets returned (with a warning log). Might be worth a comment clarifying that workload token validation errors are handled by the outer catch.

# Cadwyn's versioned sub-apps don't inherit the main app's state,
# so lookups raise ServiceNotFoundError. This registry provides
# services needed by routes called during dag.test().
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stub JWTGenerator uses secrets.token_urlsafe(32) as the secret key, so a new key is generated every time InProcessExecutionAPI.app is accessed. Since app is a cached_property, the key is stable for the lifetime of the object. But the token generated here by ti_run won't be validated by anything (since _jwt_bearer is also overridden with always_allow), so this stub only exists to satisfy the services.get(JWTGenerator) call. A brief comment noting that these tokens are never validated in dag.test() mode would help future readers.

@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from 4afd940 to ec7e290 Compare March 26, 2026 03:38
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from ec7e290 to b6e5713 Compare March 26, 2026 04:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ExecuteTask activity token can expire before the task starts running

10 participants