Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues#60108
Conversation
b183c74 to
9c31417
Compare
c707ddc to
4ef9dfe
Compare
|
As per my understanding this was removed in #55506 to use a middleware that refreshes token. Are you running an instance with execution api only separately with api-server? Could this middleware approach be extended for task-sdk calls too? |
|
Hi @tirkarthi, I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it. Totally happy to go with whichever approach the team feels is better! |
Would love to hear @ashb or @amoghrajesh 's opinion on this one |
ashb
left a comment
There was a problem hiding this comment.
We can't do this approach. It lets any Execution API token be resurrected which fundamentally breaks lots of security assumptions -- it amounts to having tokens not expire. That is bad.
Instead what we should do is generate a new token (i.e. ones with extra/different set of JWT claims) that is only valid for the /run endpoint and valid for longer (say 24hours, make it configurable) and this is what gets sent in the workload.
The run endpoint then would set the header to give the running task a "short lived" token (the one we have right now basically) that is usable on the rest of the Execution API. This approach is safer as the existing controls in the /run endpoint already prevent a task being run one than once, which should also prevent against "resurrecting" an expired token and using it to access things like connections etc. And we should validate that the token used on all endpoints but run is explicitly lacking this new claim.
4ef9dfe to
b32da6b
Compare
14a516a to
5915391
Compare
ashb
left a comment
There was a problem hiding this comment.
Much better approach, and on the right track, thanks.
Some changes though:
-
"queue" is not the right thing to use, as these tokens could be used for executing other workloads soon (for instance we have already talked about wanting Dag level callbacks to be executed on the workers, not in the dag processor, which would be done by having a new type from the ExecuteTaskWorkload).
so maybe we have
"scope": "ExecuteTaskWorkload"? -
A little bit of refactoring is needed before we are ready to merge this.
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
e7e3ae1 to
e879863
Compare
b511b8f to
57ac225
Compare
ae3141d to
e1f8725
Compare
e1f8725 to
ff8f59c
Compare
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Show resolved
Hide resolved
amoghrajesh
left a comment
There was a problem hiding this comment.
Mostly looks good now, just a few basic qns / feedback otherwise I am good.
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
ff8f59c to
4afd940
Compare
| "exp": 9999999999, | ||
| "iat": 1000000000, | ||
| } | ||
| lifespan.registry.register_value(JWTValidator, validator) |
There was a problem hiding this comment.
This JWTValidator registration is dead code -- the client fixture's mock_jwt_bearer overrides _jwt_bearer via FastAPI dependency overrides, so FastAPI never calls the real _jwt_bearer (which would use JWTValidator from the registry). Every request through client gets scope: "execution" regardless of what's registered here.
The test passes because execution-scoped tokens are allowed on /run, not because workload-scoped tokens are. To actually test workload token acceptance, the test needs to either:
- Remove the
_jwt_bearerdependency override for this test and let the real auth flow use thisJWTValidator, or - Override
mock_jwt_bearerto returnTIToken(..., claims={..., "scope": "workload"})instead of the conftest's hardcoded"scope": "execution".
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
|
|
||
| kid: str = attrs.field(default=attrs.Factory(_generate_kid, takes_self=True)) | ||
| valid_for: float | ||
| workload_valid_for: float = attrs.field( |
There was a problem hiding this comment.
The workload_valid_for default reads from config via _conf_factory, and _jwt_generator() in app.py also reads the same config key and passes it explicitly. The explicit kwarg takes precedence, so the default factory never runs in production. Having two code paths that reference the same config key is easy to get out of sync -- consider dropping the attrs default (make it required like valid_for) and always passing it explicitly, or drop the explicit kwarg in _jwt_generator() and let the default handle it.
9fc3fa3 to
bb96119
Compare
bb96119 to
9eaf6dd
Compare
Summary
Tasks waiting in executor queues (Celery, Kubernetes) can have their JWT tokens expire before execution starts, causing auth failures on the Execution API. This is a real problem in production, when queues back up or workers are slow to pick up tasks, the original short-lived token expires and the worker gets a 403 when it finally tries to start the task.
Fixes: #53713
Related: #59553
closes: #62129
Approach
Two-token mechanism: a long-lived workload token (24h default, configurable) travels with the task through the queue, and a short-lived execution token is issued when the task actually starts running.
The workload token carries a scope: "workload" claim and is restricted to the /run endpoint only, enforced via FastAPI SecurityScopes and a custom ExecutionAPIRoute. When /run succeeds, it returns an execution token via X-Execution-Token header. The SDK client picks it up and uses it for all subsequent API calls. The existing JWTReissueMiddleware handles refreshing execution tokens near expiry and skips workload tokens.
For dag.test() / InProcessExecutionAPI, auth is bypassed and a stub JWTGenerator with a random secret is used so no signing key configuration is needed.
New config: execution_api.jwt_workload_token_expiration_time (default 86400s)
Built on @ashb's SecurityScopes foundation.
Security considerations
Even if a workload token is intercepted, it can only call /run which already guards against running a task more than once (returns 409 if the task isn't in QUEUED/RESTARTING state). All other endpoints reject workload tokens , they require execution scope. The execution token issued by /run is short-lived and automatically refreshed, keeping the existing security posture for all API calls during task execution.
Testing
Tested end-to-end with CeleryExecutor in Breeze, triggered a DAG, confirmed tasks completed successfully with the token swap happening transparently. Unit tests cover token generation, scope enforcement (accepted on /run, rejected elsewhere), invalid scope handling, execution token header in response, SDK client token swap and priority, and registry teardown to prevent test pollution.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.