feat: Add [logging] task_log_to_stdout config to forward task logs to worker stdout#64481
feat: Add [logging] task_log_to_stdout config to forward task logs to worker stdout#64481mtraynham wants to merge 2 commits intoapache:mainfrom
Conversation
|
This does have the side-effect that all logs are tagged with |
|
@mtraynham This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
There was a problem hiding this comment.
Pull request overview
Adds a new [logging] task_log_to_stdout configuration option to allow forwarding task subprocess logs to the worker’s stdout (in addition to existing task log sinks), wiring it through both LocalExecutor and CeleryExecutor by passing subprocess_logs_to_stdout into the task supervisor.
Changes:
- Add
[logging] task_log_to_stdoutboolean config option (defaultFalse) toconfig.yml. - Pass the config through to
supervise()fromLocalExecutortask execution. - Pass the config through to
supervise()from Celery’sexecute_workloadtask execution.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py | Wires the new config into Celery worker task execution by passing subprocess_logs_to_stdout to supervise(). |
| airflow-core/src/airflow/executors/local_executor.py | Wires the new config into LocalExecutor task execution by passing subprocess_logs_to_stdout to supervise(). |
| airflow-core/src/airflow/config_templates/config.yml | Documents the new [logging] task_log_to_stdout option in the generated config schema. |
| token=workload.token, | ||
| server=team_conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), | ||
| log_path=workload.log_path, | ||
| subprocess_logs_to_stdout=team_conf.getboolean("logging", "task_log_to_stdout", fallback=False), | ||
| ) |
There was a problem hiding this comment.
This new wiring is not covered by a regression/unit test. Please add a test that sets [logging] task_log_to_stdout to True/False (e.g., via conf_vars) and asserts _execute_work()/supervise() is called with subprocess_logs_to_stdout matching the config value, so future refactors don’t silently break stdout log forwarding.
| token=workload.token, | ||
| server=conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), | ||
| log_path=workload.log_path, | ||
| subprocess_logs_to_stdout=conf.getboolean("logging", "task_log_to_stdout", fallback=False), | ||
| ) |
There was a problem hiding this comment.
Please add/extend a unit test for execute_workload that toggles [logging] task_log_to_stdout and asserts supervise() receives subprocess_logs_to_stdout accordingly. Without this, the new config-to-supervisor plumbing is easy to regress (especially since supervise is mocked in existing tests).
| task_log_to_stdout: | ||
| description: | | ||
| When True, task log messages are duplicated to the worker process's stdout | ||
| in addition to the task log file (and any configured remote log handler). | ||
| This is useful for container-based deployments where a log shipper (e.g. | ||
| CloudWatch agent, Fluentd, Fluent Bit) captures stdout. | ||
| version_added: 3.2.1 | ||
| type: boolean | ||
| example: "True" | ||
| default: "False" |
There was a problem hiding this comment.
This introduces a new user-facing config option; please add a Towncrier newsfragment in airflow-core/newsfragments/ describing [logging] task_log_to_stdout so it appears in the release notes (the PR template indicates this is expected for user-facing changes).
Summary
[logging] task_log_to_stdoutboolean config option (defaultFalse) that forwards task subprocess log messages to the worker's stdout, in addition to the task log file and any configured remote log handler.subprocess_logs_to_stdoutin bothLocalExecutorandCeleryExecutorcall sites tosupervise().Motivation
In Airflow 3, tasks run in a forked subprocess managed by
WatchedSubprocess. The subprocess uses structlog over a JSON socket channel back to the supervisor, which writes logs only to the task log file by default. The user'sLOGGING_CONFIGdict (viaAIRFLOW__LOGGING__LOGGING_CONFIG_CLASS) is never applied inside the task subprocess, so the Airflow 2 pattern of adding aStreamHandlerto theairflow.tasklogger no longer works (https://stackoverflow.com/a/52261504).The
subprocess_logs_to_stdoutflag already exists onWatchedSubprocessandsupervise(), but defaults toFalseand is not exposed as a config option. NeitherCeleryExecutornorLocalExecutorpassesTrue. There is no way for users to enable it without monkey-patching.Container-based deployments (ECS, Kubernetes, Cloud Run, etc.) commonly rely on stdout-based log collection (CloudWatch, Fluentd, Fluent Bit, Datadog, etc.). Without this option, task execution logs are invisible to these collectors.
In case of an existing issue, reference it using one of the following:
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.