From 2c7045f080f4cd2c147cf820715c0f77920e1f56 Mon Sep 17 00:00:00 2001 From: evertonsa Date: Mon, 30 Mar 2026 23:21:12 +0200 Subject: [PATCH 1/6] feat: grafana loki backend --- providers/grafana/LICENSE | 0 providers/grafana/NOTICE | 0 providers/grafana/README.rst | 51 ++++ providers/grafana/provider.yaml | 22 ++ providers/grafana/pyproject.toml | 62 ++++ .../src/airflow/providers/grafana/__init__.py | 1 + .../providers/grafana/get_provider_info.py | 42 +++ .../providers/grafana/loki/__init__.py | 1 + .../providers/grafana/loki/log/__init__.py | 1 + .../grafana/loki/log/loki_task_handler.py | 265 ++++++++++++++++++ .../loki/log/test_loki_task_handler.py | 79 ++++++ pyproject.toml | 16 +- uv.lock | 62 +++- 13 files changed, 590 insertions(+), 12 deletions(-) create mode 100644 providers/grafana/LICENSE create mode 100644 providers/grafana/NOTICE create mode 100644 providers/grafana/README.rst create mode 100644 providers/grafana/provider.yaml create mode 100644 providers/grafana/pyproject.toml create mode 100644 providers/grafana/src/airflow/providers/grafana/__init__.py create mode 100644 providers/grafana/src/airflow/providers/grafana/get_provider_info.py create mode 100644 providers/grafana/src/airflow/providers/grafana/loki/__init__.py create mode 100644 providers/grafana/src/airflow/providers/grafana/loki/log/__init__.py create mode 100644 providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py create mode 100644 providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py diff --git a/providers/grafana/LICENSE b/providers/grafana/LICENSE new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/providers/grafana/NOTICE b/providers/grafana/NOTICE new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/providers/grafana/README.rst b/providers/grafana/README.rst new file mode 100644 index 0000000000000..dad54d315f0c4 --- /dev/null +++ b/providers/grafana/README.rst @@ -0,0 +1,51 @@ +==================================================== +apache-airflow-providers-grafana +==================================================== + +Content +------- + +This package provides integrations with Grafana ecosystem, specifically Loki for remote task logging in Airflow. + +Architecture and Design Decisions +--------------------------------- + +Loki Logging Backend +~~~~~~~~~~~~~~~~~~~~ + +The Loki logging backend for Apache Airflow is engineered to scale massively without degrading the performance of your Grafana Loki cluster. It achieves this by carefully sidestepping **cardinality explosions** and leveraging modern TSDB (Time Series Database) features like **Bloom filters**. + +The Cardinality Trap +^^^^^^^^^^^^^^^^^^^^ + +Unlike Elasticsearch, which indexes every field by default, Grafana Loki relies on a minimalistic index. In Loki, an *Index Stream* is created for every unique combination of labels. + +- **Good Labels**: ``{job="airflow_tasks", dag_id="my_dag"}`` (Yields a small, stable number of streams—typically one per DAG). +- **Bad Labels**: ``{job="airflow_tasks", dag_id="my_dag", task_id="extract", run_id="scheduled__2023...", try_number="1"}`` + +If high-cardinality metadata like ``run_id``, ``task_id``, and ``try_number`` are used as Loki labels, Airflow will generate an infinitely growing number of unique index streams. This triggers a cardinality explosion: Loki's ingesters run out of memory tracking millions of short-lived streams, the global index inflates, and search performance collapses. + +The Solution: JSON payloads and TSDB Bloom Filters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To keep the index small and performant, the Airflow Loki Provider drops high-cardinality identifiers from the labels entirely. Instead, it embeds them directly into the JSON log payload prior to uploading chunks to the Loki Push API: + +.. code-block:: json + + {"event": "Fetching data...", "task_id": "extract", "run_id": "scheduled__...", "try_number": "1", "map_index": "-1"} + +When the Airflow UI requests logs for a specific task run, the ``LokiTaskHandler`` constructs an optimized LogQL query utilizing the ``| json`` parser: + +.. code-block:: text + + {job="airflow_tasks", dag_id="my_dag"} | json | task_id="extract" | try_number="1" + +Behind the scenes, Loki's modern TSDB engine automatically builds mathematical **Bloom filters** for the structured data inside the log chunks during ingestion. + +When executing the query, Loki instantly resolves the ``{job="airflow_tasks", dag_id="my_dag"}`` stream. Then, before downloading or decompressing any log chunk from object storage, Loki consults the chunk's Bloom filter: + +1. *Are the strings ``"extract"`` and ``"1"`` anywhere within this block?* +2. The Bloom filter answers "No" with 100% certainty in microseconds, allowing Loki to skip the entire block of data. +3. Loki only decompresses and evaluates the JSON parser on the specific chunks that mathematically *must* contain the target task's logs. + +**Operating Philosophy**: By embedding dynamic metadata into the JSON payload rather than stream labels, this provider guarantees full-text indexing performance while keeping Grafana Loki infrastructure costs near zero, ensuring stability regardless of how many millions of tasks Airflow executes. diff --git a/providers/grafana/provider.yaml b/providers/grafana/provider.yaml new file mode 100644 index 0000000000000..335dabd8fb01d --- /dev/null +++ b/providers/grafana/provider.yaml @@ -0,0 +1,22 @@ +package-name: apache-airflow-providers-grafana +name: Grafana +description: | + `Grafana `__ +state: ready +source-date-epoch: 1714241088 +versions: + - 1.0.0 + +dependencies: + - apache-airflow>=2.11.0 + - requests>=2.27.0 + - tenacity>=8.0.0 + +integrations: + - integration-name: Grafana Loki + external-doc-url: https://grafana.com/oss/loki/ + logo: /integration-logos/grafana/Loki.png + tags: [logging] + +logging: + - airflow.providers.grafana.loki.log.loki_task_handler.LokiTaskHandler diff --git a/providers/grafana/pyproject.toml b/providers/grafana/pyproject.toml new file mode 100644 index 0000000000000..6911727466f57 --- /dev/null +++ b/providers/grafana/pyproject.toml @@ -0,0 +1,62 @@ +[build-system] +requires = ["flit_core==3.12.0"] +build-backend = "flit_core.buildapi" + +[project] +name = "apache-airflow-providers-grafana" +version = "1.0.0" +description = "Provider package apache-airflow-providers-grafana for Apache Airflow" +readme = "README.rst" +license = "Apache-2.0" +license-files = ['LICENSE', 'NOTICE'] +authors = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +maintainers = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +keywords = [ "airflow-provider", "grafana", "loki", "airflow", "integration" ] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Framework :: Apache Airflow", + "Framework :: Apache Airflow :: Provider", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +requires-python = ">=3.10" + +dependencies = [ + "apache-airflow>=2.11.0", + "apache-airflow-providers-common-compat>=1.12.0", + "requests>=2.27.0", + "tenacity>=8.0.0", +] + +[dependency-groups] +dev = [ + "apache-airflow", + "apache-airflow-task-sdk", + "apache-airflow-devel-common", + "apache-airflow-providers-common-compat", +] + +docs = [ + "apache-airflow-devel-common[docs]" +] + +[tool.uv.sources] +apache-airflow = {workspace = true} +apache-airflow-devel-common = {workspace = true} +apache-airflow-task-sdk = {workspace = true} + +[project.entry-points."apache_airflow_provider"] +provider_info = "airflow.providers.grafana.get_provider_info:get_provider_info" + +[tool.flit.module] +name = "airflow.providers.grafana" diff --git a/providers/grafana/src/airflow/providers/grafana/__init__.py b/providers/grafana/src/airflow/providers/grafana/__init__.py new file mode 100644 index 0000000000000..8364f889b8cd4 --- /dev/null +++ b/providers/grafana/src/airflow/providers/grafana/__init__.py @@ -0,0 +1 @@ +# Licensed to the Apache Software Foundation (ASF) under one diff --git a/providers/grafana/src/airflow/providers/grafana/get_provider_info.py b/providers/grafana/src/airflow/providers/grafana/get_provider_info.py new file mode 100644 index 0000000000000..8a5207d5ffcad --- /dev/null +++ b/providers/grafana/src/airflow/providers/grafana/get_provider_info.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + +# IF YOU WANT TO MODIFY THIS FILE EXCEPT DEPENDENCIES, YOU SHOULD MODIFY THE TEMPLATE +# `get_provider_info_TEMPLATE.py.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + + +def get_provider_info(): + return { + "package-name": "apache-airflow-providers-grafana", + "name": "Grafana", + "description": "`Grafana `__\n", + "state": "ready", + "source-date-epoch": 1714241088, + "versions": ["1.0.0"], + "integrations": [ + { + "integration-name": "Grafana Loki", + "external-doc-url": "https://grafana.com/oss/loki/", + "logo": "/integration-logos/grafana/Loki.png", + "tags": ["logging"], + } + ], + "logging": ["airflow.providers.grafana.loki.log.loki_task_handler.LokiTaskHandler"], + "dependencies": ["apache-airflow>=2.11.0", "requests>=2.27.0", "tenacity>=8.0.0"], + } diff --git a/providers/grafana/src/airflow/providers/grafana/loki/__init__.py b/providers/grafana/src/airflow/providers/grafana/loki/__init__.py new file mode 100644 index 0000000000000..8364f889b8cd4 --- /dev/null +++ b/providers/grafana/src/airflow/providers/grafana/loki/__init__.py @@ -0,0 +1 @@ +# Licensed to the Apache Software Foundation (ASF) under one diff --git a/providers/grafana/src/airflow/providers/grafana/loki/log/__init__.py b/providers/grafana/src/airflow/providers/grafana/loki/log/__init__.py new file mode 100644 index 0000000000000..8364f889b8cd4 --- /dev/null +++ b/providers/grafana/src/airflow/providers/grafana/loki/log/__init__.py @@ -0,0 +1 @@ +# Licensed to the Apache Software Foundation (ASF) under one diff --git a/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py new file mode 100644 index 0000000000000..98ac01a8b58d5 --- /dev/null +++ b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file + +from __future__ import annotations + +import json +import logging +import os +import sys +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import attrs +import pendulum +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# Attempt to load standard log structures according to Airflow 3 requirements +from airflow.providers.common.compat.sdk import conf +from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin + +# Try mapping for StructuredLogMessage available in 3.x +try: + from airflow.utils.log.file_task_handler import StructuredLogMessage +except ImportError: + StructuredLogMessage = dict # Fallback for compilation matching + +# Try loading version compat constants +try: + from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS +except ImportError: + AIRFLOW_V_3_0_PLUS = True + AIRFLOW_V_3_2_PLUS = True + +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI + from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, LogSourceInfo + + +def _render_log_labels(ti) -> dict[str, str]: + """ + Helper to extract low-cardinality labels for Loki streams. + High-cardinality fields (like task_id, run_id) are omitted here + to prevent stream explosion and will be indexed via Bloom filters instead. + """ + return { + "job": "airflow_tasks", + "dag_id": ti.dag_id, + } + +@attrs.define(kw_only=True) +class LokiRemoteLogIO(LoggingMixin): + """ + Handles the actual communication with Loki API. + Used by Task Supervisor to bulk-upload logs and by UI to read remote logs. + """ + host: str = "http://localhost:3100" + base_log_folder: Path = attrs.field(converter=Path) + delete_local_copy: bool = False + + @property + def session(self) -> requests.Session: + if not hasattr(self, "_session"): + self._session = requests.Session() + # Implementing Retries, Jitter, and Exponential Backoff via urllib3's Retry + retries = Retry( + total=5, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET", "POST"] + ) + # Efficient scaling with TCP connection pooling + adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=100) + self._session.mount("http://", adapter) + self._session.mount("https://", adapter) + return self._session + + def upload(self, path: os.PathLike | str, ti: RuntimeTI): + """Called by Airflow Task Supervisor after task finishes (or during) to push logs.""" + path = Path(path) + local_loc = path if path.is_absolute() else self.base_log_folder.joinpath(path) + + if not local_loc.is_file(): + return + + # Read the raw JSON log lines produced by the Airflow 3 Task Supervisor + raw_logs = local_loc.read_text().splitlines() + + # Prepare the payload for Loki (Loki Push API) + labels = _render_log_labels(ti) + values = [] + for line in raw_logs: + if not line.strip(): + continue + + try: + # Log line content from Task Supervisor + log_data = json.loads(line) + + # Inject high-cardinality contextual fields into the JSON payload. + log_data["task_id"] = ti.task_id + log_data["run_id"] = getattr(ti, "run_id", "") + log_data["try_number"] = str(ti.try_number) + log_data["map_index"] = str(getattr(ti, "map_index", -1)) + + # Loki expects Timestamp in nanoseconds as string + timestamp_ns = str(int(time.time() * 1e9)) + values.append([timestamp_ns, json.dumps(log_data)]) + except Exception: + pass + + payload = { + "streams": [ + { + "stream": labels, + "values": values + } + ] + } + + # Push to Loki using configured reliable session + try: + resp = self.session.post(f"{self.host}/loki/api/v1/push", json=payload, timeout=(3.0, 15.0)) + resp.raise_for_status() + + # Clean up local file just like ElasticsearchRemoteLogIO does + if self.delete_local_copy: + import shutil + shutil.rmtree(local_loc.parent, ignore_errors=True) + except Exception as e: + self.log.exception("Failed to upload logs to Loki: %s", e) + + def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]: + """Fetch logs from Loki using LogQL for streaming or retrieval.""" + labels = _render_log_labels(ti) + + # 1. Base stream selector (hits low-cardinality index) + stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in labels.items()]) + "}" + + # 2. Line filters (leveraging Loki Bloom filters) + run_id = getattr(ti, "run_id", "") + try_num = str(ti.try_number) + map_idx = str(getattr(ti, "map_index", -1)) + + # Utilizing Loki's `| json` parser and exact match filters for maximum TSDB optimization + logQL = ( + f"{stream_selector} " + f'| json ' + f'| task_id="{ti.task_id}" ' + f'| run_id="{run_id}" ' + f'| try_number="{try_num}" ' + f'| map_index="{map_idx}"' + ) + + # Query Loki API using configured reliable session + resp = self.session.get(f"{self.host}/loki/api/v1/query_range", params={"query": logQL}, timeout=(3.0, 15.0)) + + message = [] + if resp.ok: + data = resp.json().get("data", {}).get("result", []) + for stream in data: + for val in stream.get("values", []): + # parse the underlying JSON structured log we uploaded + log_entry = json.loads(val[1]) + message.append(json.dumps(log_entry)) + + return ["loki-remote"], message + + +class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): + """ + The main logging handler injected into Airflow configuration. + """ + LOG_NAME = "Loki" + + def __init__(self, base_log_folder: str, host: str, frontend: str = "", **kwargs): + super().__init__(base_log_folder=base_log_folder, **kwargs) + self.host = host + self.frontend = frontend + self.io = LokiRemoteLogIO( + host=self.host, + base_log_folder=base_log_folder, + delete_local_copy=kwargs.get("delete_local_copy", False), + ) + + # Register Remote Log IO globally for Airflow 3 Task Supervisor + if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_PLUS: + try: + from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log + if get_remote_task_log() is None: + _ActiveLoggingConfig.set(self.io, None) + except ImportError: + pass + else: + try: + import airflow.logging_config as alc + if getattr(alc, "REMOTE_TASK_LOG", None) is None: + alc.REMOTE_TASK_LOG = self.io + except ImportError: + pass + + def _read( + self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None = None + ) -> tuple[list[Any] | str, dict[str, Any]]: + """ + Implementation of the log read handler invoked by the Web UI. + Returns a list of StructuredLogMessage objects in Airflow 3+. + """ + metadata = metadata or {"offset": 0} + + headers, messages = self.io.read("", ti) + + structured_messages = [] + structured_messages.append(StructuredLogMessage(event="::group::Loki logs", sources=headers)) + for msg in messages: + try: + log_data = json.loads(msg) + if "event" not in log_data and "message" in log_data: + log_data["event"] = log_data.pop("message") + structured_messages.append(StructuredLogMessage(**log_data)) + except Exception: + structured_messages.append(StructuredLogMessage(event=msg)) + + structured_messages.append(StructuredLogMessage(event="::endgroup::")) + + # Mark end of log if task is done and no more records + metadata["end_of_log"] = True + + return structured_messages, metadata + + @property + def supports_external_link(self) -> bool: + """Let Airflow API Server know if we can return a link to Grafana.""" + return bool(self.frontend) + + def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: + """ + Used by `airflow-api-server` when users request the external log URL. + Constructs a direct link to Grafana Explorer view for these logs. + """ + if not self.frontend: + return "" + + import urllib.parse + + labels = _render_log_labels(task_instance) + stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in labels.items()]) + "}" + logQL = ( + f'{stream_selector} ' + f'| json ' + f'| task_id="{task_instance.task_id}" ' + f'| try_number="{try_number}" ' + ) + if hasattr(task_instance, "run_id"): + logQL += f'| run_id="{task_instance.run_id}" ' + + params = urllib.parse.urlencode({"left": json.dumps(["now-1h", "now", "Loki", {"expr": logQL}])}) + + grafana_url = self.frontend if self.frontend.endswith("/") else self.frontend + "/" + return f"{grafana_url}explore?{params}" diff --git a/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py b/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py new file mode 100644 index 0000000000000..eced9a22f72c2 --- /dev/null +++ b/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +import json +import uuid +import datetime +from logging import LogRecord +from unittest.mock import MagicMock + +import pytest +import requests_mock + +from airflow.providers.grafana.loki.log.loki_task_handler import LokiRemoteLogIO, LokiTaskHandler + +@pytest.fixture +def mock_runtime_ti(): + ti = MagicMock() + ti.dag_id = "test_dag" + ti.task_id = "test_task" + ti.run_id = "test_run" + ti.try_number = 1 + ti.map_index = -1 + return ti + +@pytest.fixture +def tmp_log_folder(tmp_path): + log_dir = tmp_path / "logs" + log_dir.mkdir() + return log_dir + +def test_loki_upload_deletes_local_copy_on_success(tmp_log_folder, mock_runtime_ti): + # Mocking a JSON log output exactly like the Task SDK + log_file = tmp_log_folder / "test.log" + log_file.write_text(json.dumps({"event": "hello world"})) + + io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder, delete_local_copy=True) + + with requests_mock.Mocker() as m: + m.post("http://localhost:3100/loki/api/v1/push", status_code=204) + io.upload(log_file, mock_runtime_ti) + + # Verify the file was cleaned up correctly after successful push + assert not log_file.exists() + +def test_loki_bloom_filter_labels_and_payload(tmp_log_folder, mock_runtime_ti): + # Validates that streams are not polluted with high cardinality labels + log_file = tmp_log_folder / "test2.log" + log_file.write_text(json.dumps({"event": "another line"})) + + io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder) + + with requests_mock.Mocker() as m: + m.post("http://localhost:3100/loki/api/v1/push", status_code=204) + io.upload(log_file, mock_runtime_ti) + + assert m.called + request_body = json.loads(m.last_request.text) + stream_labels = request_body["streams"][0]["stream"] + + # Crucial Assertion: Stream labels strictly avoid `task_id` and `try_number` + assert stream_labels == {"job": "airflow_tasks", "dag_id": "test_dag"} + + # Crucial Assertion: TSDB JSON log values contain the task_id explicitly embedded + log_json = json.loads(request_body["streams"][0]["values"][0][1]) + assert log_json["task_id"] == "test_task" + assert log_json["try_number"] == "1" + +def test_read_generates_optimized_logql(tmp_log_folder, mock_runtime_ti): + io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder) + + with requests_mock.Mocker() as m: + # Mock Loki response + m.get("http://localhost:3100/loki/api/v1/query_range", json={"data": {"result": []}}) + io.read("", mock_runtime_ti) + + assert m.called + # Crucial Assertion: The generated LogQL strictly utilizes JSON Bloom Filters mapping to the embedded keys + query_param = m.last_request.qs["query"][0] + assert query_param == '{job="airflow_tasks",dag_id="test_dag"} | json | task_id="test_task" | run_id="test_run" | try_number="1" | map_index="-1"' diff --git a/pyproject.toml b/pyproject.toml index 03c4241557b4d..a58542d67e9d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,7 +108,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-amazon>=9.0.0" ] "apache.cassandra" = [ - "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"" + "apache-airflow-providers-apache-cassandra>=3.7.0" ] "apache.drill" = [ "apache-airflow-providers-apache-drill>=2.8.1" @@ -168,7 +168,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-celery>=3.8.3" ] "cloudant" = [ - "apache-airflow-providers-cloudant>=4.0.1" + "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\"" ] "cncf.kubernetes" = [ "apache-airflow-providers-cncf-kubernetes>=9.0.0" @@ -236,6 +236,9 @@ apache-airflow = "airflow.__main__:main" "google" = [ "apache-airflow-providers-google>=10.24.0" ] +"grafana" = [ + "apache-airflow-providers-grafana>=1.0.0" # Set from local provider pyproject.toml +] "grpc" = [ "apache-airflow-providers-grpc>=3.7.0" ] @@ -398,7 +401,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-airbyte>=5.0.0", "apache-airflow-providers-alibaba>=3.0.0", "apache-airflow-providers-amazon>=9.0.0", - "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"", + "apache-airflow-providers-apache-cassandra>=3.7.0", "apache-airflow-providers-apache-drill>=2.8.1", "apache-airflow-providers-apache-druid>=3.12.0", "apache-airflow-providers-apache-flink>=1.6.0", @@ -418,7 +421,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-asana>=2.7.0", "apache-airflow-providers-atlassian-jira>=2.7.1", "apache-airflow-providers-celery>=3.8.3", - "apache-airflow-providers-cloudant>=4.0.1", + "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\"", "apache-airflow-providers-cncf-kubernetes>=9.0.0", "apache-airflow-providers-cohere>=1.4.0", "apache-airflow-providers-common-ai>=0.0.1", # Set from local provider pyproject.toml @@ -441,6 +444,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-git>=0.0.2", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-github>=2.8.0", "apache-airflow-providers-google>=10.24.0", + "apache-airflow-providers-grafana>=1.0.0", # Set from local provider pyproject.toml "apache-airflow-providers-grpc>=3.7.0", "apache-airflow-providers-hashicorp>=4.0.0", "apache-airflow-providers-http>=4.13.2", @@ -1142,6 +1146,8 @@ mypy_path = [ "$MYPY_CONFIG_FILE_DIR/providers/github/tests", "$MYPY_CONFIG_FILE_DIR/providers/google/src", "$MYPY_CONFIG_FILE_DIR/providers/google/tests", + "$MYPY_CONFIG_FILE_DIR/providers/grafana/src", + "$MYPY_CONFIG_FILE_DIR/providers/grafana/tests", "$MYPY_CONFIG_FILE_DIR/providers/grpc/src", "$MYPY_CONFIG_FILE_DIR/providers/grpc/tests", "$MYPY_CONFIG_FILE_DIR/providers/hashicorp/src", @@ -1428,6 +1434,7 @@ apache-airflow-providers-ftp = { workspace = true } apache-airflow-providers-git = { workspace = true } apache-airflow-providers-github = { workspace = true } apache-airflow-providers-google = { workspace = true } +apache-airflow-providers-grafana = { workspace = true } apache-airflow-providers-grpc = { workspace = true } apache-airflow-providers-hashicorp = { workspace = true } apache-airflow-providers-http = { workspace = true } @@ -1561,6 +1568,7 @@ members = [ "providers/git", "providers/github", "providers/google", + "providers/grafana", "providers/grpc", "providers/hashicorp", "providers/http", diff --git a/uv.lock b/uv.lock index 0eb3bb23bd68f..860e2eae13e80 100644 --- a/uv.lock +++ b/uv.lock @@ -12,7 +12,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-03-25T09:25:36.898664504Z" +exclude-newer = "2026-03-26T21:12:11.341378713Z" exclude-newer-span = "P4D" [manifest] @@ -75,6 +75,7 @@ members = [ "apache-airflow-providers-git", "apache-airflow-providers-github", "apache-airflow-providers-google", + "apache-airflow-providers-grafana", "apache-airflow-providers-grpc", "apache-airflow-providers-hashicorp", "apache-airflow-providers-http", @@ -826,7 +827,7 @@ all = [ { name = "apache-airflow-providers-airbyte" }, { name = "apache-airflow-providers-alibaba" }, { name = "apache-airflow-providers-amazon", extra = ["aiobotocore", "python3-saml", "s3fs"] }, - { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*'" }, + { name = "apache-airflow-providers-apache-cassandra" }, { name = "apache-airflow-providers-apache-drill" }, { name = "apache-airflow-providers-apache-druid" }, { name = "apache-airflow-providers-apache-flink" }, @@ -869,6 +870,7 @@ all = [ { name = "apache-airflow-providers-git" }, { name = "apache-airflow-providers-github" }, { name = "apache-airflow-providers-google" }, + { name = "apache-airflow-providers-grafana" }, { name = "apache-airflow-providers-grpc" }, { name = "apache-airflow-providers-hashicorp" }, { name = "apache-airflow-providers-http" }, @@ -945,7 +947,7 @@ apache-atlas = [ { name = "atlasclient" }, ] apache-cassandra = [ - { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*'" }, + { name = "apache-airflow-providers-apache-cassandra" }, ] apache-drill = [ { name = "apache-airflow-providers-apache-drill" }, @@ -1091,6 +1093,9 @@ google-auth = [ { name = "apache-airflow-providers-fab" }, { name = "authlib" }, ] +grafana = [ + { name = "apache-airflow-providers-grafana" }, +] graphviz = [ { name = "apache-airflow-core", extra = ["graphviz"] }, ] @@ -1348,8 +1353,8 @@ requires-dist = [ { name = "apache-airflow-providers-amazon", extras = ["aiobotocore"], marker = "extra == 'aiobotocore'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["python3-saml"], marker = "extra == 'amazon-aws-auth'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["s3fs"], marker = "extra == 's3fs'", editable = "providers/amazon" }, - { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*' and extra == 'all'", editable = "providers/apache/cassandra" }, - { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*' and extra == 'apache-cassandra'", editable = "providers/apache/cassandra" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "extra == 'all'", editable = "providers/apache/cassandra" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "extra == 'apache-cassandra'", editable = "providers/apache/cassandra" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'all'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'apache-drill'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-druid", marker = "extra == 'all'", editable = "providers/apache/druid" }, @@ -1389,8 +1394,8 @@ requires-dist = [ { name = "apache-airflow-providers-atlassian-jira", marker = "extra == 'atlassian-jira'", editable = "providers/atlassian/jira" }, { name = "apache-airflow-providers-celery", marker = "extra == 'all'", editable = "providers/celery" }, { name = "apache-airflow-providers-celery", marker = "extra == 'celery'", editable = "providers/celery" }, - { name = "apache-airflow-providers-cloudant", marker = "extra == 'all'", editable = "providers/cloudant" }, - { name = "apache-airflow-providers-cloudant", marker = "extra == 'cloudant'", editable = "providers/cloudant" }, + { name = "apache-airflow-providers-cloudant", marker = "python_full_version != '3.9.*' and extra == 'all'", editable = "providers/cloudant" }, + { name = "apache-airflow-providers-cloudant", marker = "python_full_version != '3.9.*' and extra == 'cloudant'", editable = "providers/cloudant" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'all'", editable = "providers/cncf/kubernetes" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'cncf-kubernetes'", editable = "providers/cncf/kubernetes" }, { name = "apache-airflow-providers-cohere", marker = "extra == 'all'", editable = "providers/cohere" }, @@ -1439,6 +1444,8 @@ requires-dist = [ { name = "apache-airflow-providers-github", marker = "extra == 'github'", editable = "providers/github" }, { name = "apache-airflow-providers-google", marker = "extra == 'all'", editable = "providers/google" }, { name = "apache-airflow-providers-google", marker = "extra == 'google'", editable = "providers/google" }, + { name = "apache-airflow-providers-grafana", marker = "extra == 'all'", editable = "providers/grafana" }, + { name = "apache-airflow-providers-grafana", marker = "extra == 'grafana'", editable = "providers/grafana" }, { name = "apache-airflow-providers-grpc", marker = "extra == 'all'", editable = "providers/grpc" }, { name = "apache-airflow-providers-grpc", marker = "extra == 'grpc'", editable = "providers/grpc" }, { name = "apache-airflow-providers-hashicorp", marker = "extra == 'all'", editable = "providers/hashicorp" }, @@ -1553,7 +1560,7 @@ requires-dist = [ { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.30.0" }, { name = "uv", marker = "extra == 'uv'", specifier = ">=0.11.1" }, ] -provides-extras = ["all-core", "async", "graphviz", "gunicorn", "kerberos", "memray", "otel", "statsd", "all-task-sdk", "airbyte", "alibaba", "amazon", "apache-cassandra", "apache-drill", "apache-druid", "apache-flink", "apache-hdfs", "apache-hive", "apache-iceberg", "apache-impala", "apache-kafka", "apache-kylin", "apache-livy", "apache-pig", "apache-pinot", "apache-spark", "apache-tinkerpop", "apprise", "arangodb", "asana", "atlassian-jira", "celery", "cloudant", "cncf-kubernetes", "cohere", "common-ai", "common-compat", "common-io", "common-messaging", "common-sql", "databricks", "datadog", "dbt-cloud", "dingding", "discord", "docker", "edge3", "elasticsearch", "exasol", "fab", "facebook", "ftp", "git", "github", "google", "grpc", "hashicorp", "http", "imap", "influxdb", "informatica", "jdbc", "jenkins", "keycloak", "microsoft-azure", "microsoft-mssql", "microsoft-psrp", "microsoft-winrm", "mongo", "mysql", "neo4j", "odbc", "openai", "openfaas", "openlineage", "opensearch", "opsgenie", "oracle", "pagerduty", "papermill", "pgvector", "pinecone", "postgres", "presto", "qdrant", "redis", "salesforce", "samba", "segment", "sendgrid", "sftp", "singularity", "slack", "smtp", "snowflake", "sqlite", "ssh", "standard", "tableau", "telegram", "teradata", "trino", "vertica", "weaviate", "yandex", "ydb", "zendesk", "all", "aiobotocore", "apache-atlas", "apache-webhdfs", "amazon-aws-auth", "cloudpickle", "github-enterprise", "google-auth", "ldap", "pandas", "polars", "rabbitmq", "sentry", "s3fs", "uv"] +provides-extras = ["all-core", "async", "graphviz", "gunicorn", "kerberos", "memray", "otel", "statsd", "all-task-sdk", "airbyte", "alibaba", "amazon", "apache-cassandra", "apache-drill", "apache-druid", "apache-flink", "apache-hdfs", "apache-hive", "apache-iceberg", "apache-impala", "apache-kafka", "apache-kylin", "apache-livy", "apache-pig", "apache-pinot", "apache-spark", "apache-tinkerpop", "apprise", "arangodb", "asana", "atlassian-jira", "celery", "cloudant", "cncf-kubernetes", "cohere", "common-ai", "common-compat", "common-io", "common-messaging", "common-sql", "databricks", "datadog", "dbt-cloud", "dingding", "discord", "docker", "edge3", "elasticsearch", "exasol", "fab", "facebook", "ftp", "git", "github", "google", "grafana", "grpc", "hashicorp", "http", "imap", "influxdb", "informatica", "jdbc", "jenkins", "keycloak", "microsoft-azure", "microsoft-mssql", "microsoft-psrp", "microsoft-winrm", "mongo", "mysql", "neo4j", "odbc", "openai", "openfaas", "openlineage", "opensearch", "opsgenie", "oracle", "pagerduty", "papermill", "pgvector", "pinecone", "postgres", "presto", "qdrant", "redis", "salesforce", "samba", "segment", "sendgrid", "sftp", "singularity", "slack", "smtp", "snowflake", "sqlite", "ssh", "standard", "tableau", "telegram", "teradata", "trino", "vertica", "weaviate", "yandex", "ydb", "zendesk", "all", "aiobotocore", "apache-atlas", "apache-webhdfs", "amazon-aws-auth", "cloudpickle", "github-enterprise", "google-auth", "ldap", "pandas", "polars", "rabbitmq", "sentry", "s3fs", "uv"] [package.metadata.requires-dev] dev = [ @@ -5169,6 +5176,45 @@ dev = [ ] docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = "devel-common" }] +[[package]] +name = "apache-airflow-providers-grafana" +version = "1.0.0" +source = { editable = "providers/grafana" } +dependencies = [ + { name = "apache-airflow" }, + { name = "apache-airflow-providers-common-compat" }, + { name = "requests" }, + { name = "tenacity" }, +] + +[package.dev-dependencies] +dev = [ + { name = "apache-airflow" }, + { name = "apache-airflow-devel-common" }, + { name = "apache-airflow-providers-common-compat" }, + { name = "apache-airflow-task-sdk" }, +] +docs = [ + { name = "apache-airflow-devel-common", extra = ["docs"] }, +] + +[package.metadata] +requires-dist = [ + { name = "apache-airflow", editable = "." }, + { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, + { name = "requests", specifier = ">=2.27.0" }, + { name = "tenacity", specifier = ">=8.0.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "apache-airflow", editable = "." }, + { name = "apache-airflow-devel-common", editable = "devel-common" }, + { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, + { name = "apache-airflow-task-sdk", editable = "task-sdk" }, +] +docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = "devel-common" }] + [[package]] name = "apache-airflow-providers-grpc" version = "3.9.3" From 83641854b23ddf7693e06e4bf7db006bc86c1af5 Mon Sep 17 00:00:00 2001 From: evertonsa Date: Mon, 30 Mar 2026 23:45:41 +0200 Subject: [PATCH 2/6] missing file --- .../airflow_local_settings.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 06639e0e85545..88a9cb32993ca 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -281,6 +281,36 @@ def _default_conn_name_from(mod_path, hook_name): ) ) remote_task_handler_kwargs = {} + elif remote_base_log_folder.startswith("loki://"): + from airflow.providers.grafana.loki.log.loki_task_handler import LokiRemoteLogIO + + url_parts = urlsplit(remote_base_log_folder) + loki_host = f"http://{url_parts.netloc}" + if url_parts.port: + pass # netloc already includes port + + REMOTE_TASK_LOG = LokiRemoteLogIO( + **( + { + "base_log_folder": BASE_LOG_FOLDER, + "host": loki_host, + "delete_local_copy": delete_local_copy, + } + | remote_task_handler_kwargs + ) + ) + # Configure logging dictionary to use LokiTaskHandler for the webserver reads + LOKI_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.grafana.loki.log.loki_task_handler.LokiTaskHandler", + "formatter": "airflow", + "base_log_folder": BASE_LOG_FOLDER, + "host": loki_host, + "frontend": conf.get("logging", "loki_frontend_url", fallback=""), + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(LOKI_REMOTE_HANDLERS) + remote_task_handler_kwargs = {} elif ELASTICSEARCH_HOST: from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO From 9afe5a9c53a8e38d79c3a37f2a0d13577ae1a7cc Mon Sep 17 00:00:00 2001 From: evertonsa Date: Thu, 2 Apr 2026 00:21:56 +0200 Subject: [PATCH 3/6] feat: chunk before pushing --- providers/grafana/README.rst | 7 + .../grafana/loki/log/loki_task_handler.py | 153 +++++++++++------- 2 files changed, 99 insertions(+), 61 deletions(-) diff --git a/providers/grafana/README.rst b/providers/grafana/README.rst index dad54d315f0c4..f23dec39b3693 100644 --- a/providers/grafana/README.rst +++ b/providers/grafana/README.rst @@ -49,3 +49,10 @@ When executing the query, Loki instantly resolves the ``{job="airflow_tasks", da 3. Loki only decompresses and evaluates the JSON parser on the specific chunks that mathematically *must* contain the target task's logs. **Operating Philosophy**: By embedding dynamic metadata into the JSON payload rather than stream labels, this provider guarantees full-text indexing performance while keeping Grafana Loki infrastructure costs near zero, ensuring stability regardless of how many millions of tasks Airflow executes. + +Upload Chunking Strategy +^^^^^^^^^^^^^^^^^^^^^^^^ + +Instead of streaming logs line-by-line via HTTP (which would bottleneck the Airflow worker) or sending gigantic multi-gigabyte files in a single request (which causes worker Out-Of-Memory crashes and reverse-proxy ``413 Entity Too Large`` HTTP rejections), the provider natively chunks log payloads by **Byte Size**. + +Every time a task finishes, the ``LokiRemoteLogIO`` handler iterates over the local structured JSON file and calculates the string payload length incrementally. Once reaching the standard Promtail sweet spot of ``1 MiB`` per-batch, the payload is immediately POSTed to the Loki API. This guarantees optimal network throughput and 100% compliance with default ingestion limits without sacrificing worker stability. diff --git a/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py index 98ac01a8b58d5..c0e51dce9fcba 100644 --- a/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py +++ b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py @@ -18,7 +18,6 @@ from urllib3.util.retry import Retry # Attempt to load standard log structures according to Airflow 3 requirements -from airflow.providers.common.compat.sdk import conf from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin @@ -61,6 +60,7 @@ class LokiRemoteLogIO(LoggingMixin): host: str = "http://localhost:3100" base_log_folder: Path = attrs.field(converter=Path) delete_local_copy: bool = False + processors: list = attrs.field(factory=list) @property def session(self) -> requests.Session: @@ -87,52 +87,76 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): if not local_loc.is_file(): return - # Read the raw JSON log lines produced by the Airflow 3 Task Supervisor - raw_logs = local_loc.read_text().splitlines() - - # Prepare the payload for Loki (Loki Push API) labels = _render_log_labels(ti) values = [] - for line in raw_logs: - if not line.strip(): - continue - + payload_size = 0 + MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits + + def _push_chunk(): + if not values: + return True + payload = { + "streams": [ + { + "stream": labels, + "values": values + } + ] + } try: - # Log line content from Task Supervisor - log_data = json.loads(line) - - # Inject high-cardinality contextual fields into the JSON payload. - log_data["task_id"] = ti.task_id - log_data["run_id"] = getattr(ti, "run_id", "") - log_data["try_number"] = str(ti.try_number) - log_data["map_index"] = str(getattr(ti, "map_index", -1)) + resp = self.session.post(f"{self.host}/loki/api/v1/push", json=payload, timeout=(3.0, 15.0)) + resp.raise_for_status() + return True + except Exception as e: + self.log.exception("Failed to upload chunk of logs to Loki: %s", e) + return False - # Loki expects Timestamp in nanoseconds as string - timestamp_ns = str(int(time.time() * 1e9)) - values.append([timestamp_ns, json.dumps(log_data)]) - except Exception: - pass - - payload = { - "streams": [ - { - "stream": labels, - "values": values - } - ] - } - - # Push to Loki using configured reliable session - try: - resp = self.session.post(f"{self.host}/loki/api/v1/push", json=payload, timeout=(3.0, 15.0)) - resp.raise_for_status() - - # Clean up local file just like ElasticsearchRemoteLogIO does - if self.delete_local_copy: + has_error = False + + with open(local_loc, "r") as f: + for line in f: + if not line.strip(): + continue + + try: + # Log line content from Task Supervisor + log_data = json.loads(line) + + # Inject high-cardinality contextual fields into the JSON payload. + log_data["task_id"] = ti.task_id + log_data["run_id"] = getattr(ti, "run_id", "") + log_data["try_number"] = str(ti.try_number) + log_data["map_index"] = str(getattr(ti, "map_index", -1)) + + # Loki expects Timestamp in nanoseconds as string + timestamp_ns = str(int(time.time() * 1e9)) + log_str = json.dumps(log_data) + values.append([timestamp_ns, log_str]) + + # Estimate the byte size of this entry in the payload + payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 bytes overhead per value + + if payload_size >= MAX_PAYLOAD_SIZE: + if not _push_chunk(): + has_error = True + values.clear() + payload_size = 0 + + except Exception: + pass + + # Push any remaining logs + if values: + if not _push_chunk(): + has_error = True + + # Clean up local file just like ElasticsearchRemoteLogIO does if fully successful + if self.delete_local_copy and not has_error: + try: import shutil shutil.rmtree(local_loc.parent, ignore_errors=True) - except Exception as e: - self.log.exception("Failed to upload logs to Loki: %s", e) + except Exception: + pass def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]: """Fetch logs from Loki using LogQL for streaming or retrieval.""" @@ -177,6 +201,10 @@ class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ LOG_NAME = "Loki" + @property + def log_name(self) -> str: + return self.LOG_NAME + def __init__(self, base_log_folder: str, host: str, frontend: str = "", **kwargs): super().__init__(base_log_folder=base_log_folder, **kwargs) self.host = host @@ -191,9 +219,13 @@ def __init__(self, base_log_folder: str, host: str, frontend: str = "", **kwargs if AIRFLOW_V_3_0_PLUS: if AIRFLOW_V_3_2_PLUS: try: - from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log - if get_remote_task_log() is None: - _ActiveLoggingConfig.set(self.io, None) + from airflow.logging_config import _ActiveLoggingConfig + try: + from airflow.logging_config import get_remote_task_log + if callable(get_remote_task_log) and get_remote_task_log() is None: + _ActiveLoggingConfig.set(self.io, None) + except ImportError: + pass except ImportError: pass else: @@ -204,34 +236,33 @@ def __init__(self, base_log_folder: str, host: str, frontend: str = "", **kwargs except ImportError: pass + def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata: dict | None = None) -> tuple[list[str], list[str]]: + """ + Called by Airflow 3.x FileTaskHandler._read to fetch remote logs. + Airflow 3 native FileTaskHandler manages interleaving these with locally streaming worker logs. + """ + return self.io.read("", ti) + def _read( self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None = None ) -> tuple[list[Any] | str, dict[str, Any]]: """ Implementation of the log read handler invoked by the Web UI. - Returns a list of StructuredLogMessage objects in Airflow 3+. + In Airflow 3+, we defer to the super() class so it can serve logs from the active worker + and intelligently interleave them with `_read_remote_logs`. """ + if AIRFLOW_V_3_0_PLUS: + return super()._read(ti, try_number, metadata) + + # Fallback for Airflow 2.x metadata = metadata or {"offset": 0} - headers, messages = self.io.read("", ti) - structured_messages = [] - structured_messages.append(StructuredLogMessage(event="::group::Loki logs", sources=headers)) - for msg in messages: - try: - log_data = json.loads(msg) - if "event" not in log_data and "message" in log_data: - log_data["event"] = log_data.pop("message") - structured_messages.append(StructuredLogMessage(**log_data)) - except Exception: - structured_messages.append(StructuredLogMessage(event=msg)) - - structured_messages.append(StructuredLogMessage(event="::endgroup::")) - - # Mark end of log if task is done and no more records + # Build raw messages (no StructuredLogMessage required in Airflow 2) + log_str = "\n".join(messages) metadata["end_of_log"] = True - return structured_messages, metadata + return [log_str], metadata @property def supports_external_link(self) -> bool: From fef8830e6b4b4838262d1cb9599ec04e69830734 Mon Sep 17 00:00:00 2001 From: evertonsa Date: Thu, 2 Apr 2026 19:56:16 +0200 Subject: [PATCH 4/6] feat: pr reviews --- .../grafana/loki/log/loki_task_handler.py | 197 +++++++----------- .../loki/log/test_loki_task_handler.py | 59 +++--- 2 files changed, 114 insertions(+), 142 deletions(-) diff --git a/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py index c0e51dce9fcba..bcf0a09cd145b 100644 --- a/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py +++ b/providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py @@ -4,45 +4,38 @@ from __future__ import annotations import json -import logging import os -import sys +import shutil import time +import urllib.parse from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import attrs -import pendulum import requests +from packaging.version import Version from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -# Attempt to load standard log structures according to Airflow 3 requirements -from airflow.utils.log.file_task_handler import FileTaskHandler -from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin +import airflow -# Try mapping for StructuredLogMessage available in 3.x -try: - from airflow.utils.log.file_task_handler import StructuredLogMessage -except ImportError: - StructuredLogMessage = dict # Fallback for compilation matching +if Version(airflow.__version__) < Version("3.2.0.dev0"): + raise RuntimeError("The Grafana provider requires Apache Airflow 3.2.0+") -# Try loading version compat constants -try: - from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS -except ImportError: - AIRFLOW_V_3_0_PLUS = True - AIRFLOW_V_3_2_PLUS = True +# Attempt to load standard log structures according to Airflow 3 requirements +from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo def _render_log_labels(ti) -> dict[str, str]: """ - Helper to extract low-cardinality labels for Loki streams. + Extract low-cardinality labels for Loki streams. + High-cardinality fields (like task_id, run_id) are omitted here to prevent stream explosion and will be indexed via Bloom filters instead. """ @@ -51,17 +44,20 @@ def _render_log_labels(ti) -> dict[str, str]: "dag_id": ti.dag_id, } + @attrs.define(kw_only=True) class LokiRemoteLogIO(LoggingMixin): """ - Handles the actual communication with Loki API. + Handle the actual communication with Loki API. + Used by Task Supervisor to bulk-upload logs and by UI to read remote logs. """ + host: str = "http://localhost:3100" base_log_folder: Path = attrs.field(converter=Path) delete_local_copy: bool = False processors: list = attrs.field(factory=list) - + @property def session(self) -> requests.Session: if not hasattr(self, "_session"): @@ -71,16 +67,16 @@ def session(self) -> requests.Session: total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], - allowed_methods=["GET", "POST"] + allowed_methods=["GET", "POST"], ) - # Efficient scaling with TCP connection pooling + # Efficient scaling with TCP connection pooling adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=100) self._session.mount("http://", adapter) self._session.mount("https://", adapter) return self._session def upload(self, path: os.PathLike | str, ti: RuntimeTI): - """Called by Airflow Task Supervisor after task finishes (or during) to push logs.""" + """Push logs directly from the running Task Supervisor process.""" path = Path(path) local_loc = path if path.is_absolute() else self.base_log_folder.joinpath(path) @@ -95,14 +91,7 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): def _push_chunk(): if not values: return True - payload = { - "streams": [ - { - "stream": labels, - "values": values - } - ] - } + payload = {"streams": [{"stream": labels, "values": values}]} try: resp = self.session.post(f"{self.host}/loki/api/v1/push", json=payload, timeout=(3.0, 15.0)) resp.raise_for_status() @@ -112,8 +101,10 @@ def _push_chunk(): return False has_error = False + # Calculate a single stable ns wall-clock timestamp per file upload mapping + stable_timestamp_ns = str(time.time_ns()) - with open(local_loc, "r") as f: + with open(local_loc) as f: for line in f: if not line.strip(): continue @@ -121,20 +112,20 @@ def _push_chunk(): try: # Log line content from Task Supervisor log_data = json.loads(line) - + # Inject high-cardinality contextual fields into the JSON payload. log_data["task_id"] = ti.task_id log_data["run_id"] = getattr(ti, "run_id", "") log_data["try_number"] = str(ti.try_number) log_data["map_index"] = str(getattr(ti, "map_index", -1)) - # Loki expects Timestamp in nanoseconds as string - timestamp_ns = str(int(time.time() * 1e9)) log_str = json.dumps(log_data) - values.append([timestamp_ns, log_str]) - + values.append([stable_timestamp_ns, log_str]) + # Estimate the byte size of this entry in the payload - payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 bytes overhead per value + payload_size += ( + len(stable_timestamp_ns) + len(log_str) + 10 + ) # 10 bytes overhead per value if payload_size >= MAX_PAYLOAD_SIZE: if not _push_chunk(): @@ -142,8 +133,12 @@ def _push_chunk(): values.clear() payload_size = 0 - except Exception: - pass + except json.JSONDecodeError as err: + self.log.debug("Loki upload skipped invalid JSON log line: %s", err) + has_error = True + except Exception as err: + self.log.exception("Unexpected error parsing log line for Loki upload: %s", err) + has_error = True # Push any remaining logs if values: @@ -153,36 +148,36 @@ def _push_chunk(): # Clean up local file just like ElasticsearchRemoteLogIO does if fully successful if self.delete_local_copy and not has_error: try: - import shutil shutil.rmtree(local_loc.parent, ignore_errors=True) - except Exception: - pass + except Exception as e: + self.log.debug("Failed to delete local copy after Loki upload: %s", e) - def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]: + def read(self, relative_path: str, try_number: int, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]: """Fetch logs from Loki using LogQL for streaming or retrieval.""" labels = _render_log_labels(ti) - + # 1. Base stream selector (hits low-cardinality index) stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in labels.items()]) + "}" - + # 2. Line filters (leveraging Loki Bloom filters) run_id = getattr(ti, "run_id", "") - try_num = str(ti.try_number) map_idx = str(getattr(ti, "map_index", -1)) - + # Utilizing Loki's `| json` parser and exact match filters for maximum TSDB optimization logQL = ( f"{stream_selector} " - f'| json ' + f"| json " f'| task_id="{ti.task_id}" ' f'| run_id="{run_id}" ' - f'| try_number="{try_num}" ' + f'| try_number="{try_number}" ' f'| map_index="{map_idx}"' ) - + # Query Loki API using configured reliable session - resp = self.session.get(f"{self.host}/loki/api/v1/query_range", params={"query": logQL}, timeout=(3.0, 15.0)) - + resp = self.session.get( + f"{self.host}/loki/api/v1/query_range", params={"query": logQL}, timeout=(3.0, 15.0) + ) + message = [] if resp.ok: data = resp.json().get("data", {}).get("result", []) @@ -191,14 +186,14 @@ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMe # parse the underlying JSON structured log we uploaded log_entry = json.loads(val[1]) message.append(json.dumps(log_entry)) - - return ["loki-remote"], message + + # Return structured LogSourceInfo and the list of log messages + return {"source": "loki-remote"}, message class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): - """ - The main logging handler injected into Airflow configuration. - """ + """The main logging handler injected into Airflow configuration.""" + LOG_NAME = "Loki" @property @@ -214,55 +209,29 @@ def __init__(self, base_log_folder: str, host: str, frontend: str = "", **kwargs base_log_folder=base_log_folder, delete_local_copy=kwargs.get("delete_local_copy", False), ) - + # Register Remote Log IO globally for Airflow 3 Task Supervisor - if AIRFLOW_V_3_0_PLUS: - if AIRFLOW_V_3_2_PLUS: - try: - from airflow.logging_config import _ActiveLoggingConfig - try: - from airflow.logging_config import get_remote_task_log - if callable(get_remote_task_log) and get_remote_task_log() is None: - _ActiveLoggingConfig.set(self.io, None) - except ImportError: - pass - except ImportError: - pass - else: - try: - import airflow.logging_config as alc - if getattr(alc, "REMOTE_TASK_LOG", None) is None: - alc.REMOTE_TASK_LOG = self.io - except ImportError: - pass - - def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata: dict | None = None) -> tuple[list[str], list[str]]: - """ - Called by Airflow 3.x FileTaskHandler._read to fetch remote logs. - Airflow 3 native FileTaskHandler manages interleaving these with locally streaming worker logs. - """ - return self.io.read("", ti) + try: + from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log + + if callable(get_remote_task_log) and get_remote_task_log() is None: + _ActiveLoggingConfig.set(self.io, None) + except ImportError: + pass - def _read( - self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None = None - ) -> tuple[list[Any] | str, dict[str, Any]]: + def _read_remote_logs( + self, ti: TaskInstance, try_number: int, metadata: dict | None = None + ) -> tuple[list[str], list[str]]: """ - Implementation of the log read handler invoked by the Web UI. - In Airflow 3+, we defer to the super() class so it can serve logs from the active worker - and intelligently interleave them with `_read_remote_logs`. + Fetch remote logs for Airflow 3.x FileTaskHandler._read. + + Airflow 3 native FileTaskHandler manages interleaving these with locally streaming worker logs. """ - if AIRFLOW_V_3_0_PLUS: - return super()._read(ti, try_number, metadata) - - # Fallback for Airflow 2.x - metadata = metadata or {"offset": 0} - headers, messages = self.io.read("", ti) - - # Build raw messages (no StructuredLogMessage required in Airflow 2) - log_str = "\n".join(messages) - metadata["end_of_log"] = True - - return [log_str], metadata + if hasattr(ti, "log_relative_path"): + path = ti.log_relative_path(try_number=try_number) + else: + path = "" + return self.io.read(path, try_number, ti) @property def supports_external_link(self) -> bool: @@ -271,26 +240,20 @@ def supports_external_link(self) -> bool: def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: """ - Used by `airflow-api-server` when users request the external log URL. + Return the external log URL when requested by users. + Constructs a direct link to Grafana Explorer view for these logs. """ if not self.frontend: return "" - - import urllib.parse - + labels = _render_log_labels(task_instance) stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in labels.items()]) + "}" - logQL = ( - f'{stream_selector} ' - f'| json ' - f'| task_id="{task_instance.task_id}" ' - f'| try_number="{try_number}" ' - ) + logQL = f'{stream_selector} | json | task_id="{task_instance.task_id}" | try_number="{try_number}" ' if hasattr(task_instance, "run_id"): logQL += f'| run_id="{task_instance.run_id}" ' - + params = urllib.parse.urlencode({"left": json.dumps(["now-1h", "now", "Loki", {"expr": logQL}])}) - + grafana_url = self.frontend if self.frontend.endswith("/") else self.frontend + "/" return f"{grafana_url}explore?{params}" diff --git a/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py b/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py index eced9a22f72c2..5863c1f47ed4f 100644 --- a/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py +++ b/providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py @@ -1,26 +1,29 @@ from __future__ import annotations -import logging import json -import uuid -import datetime -from logging import LogRecord -from unittest.mock import MagicMock import pytest import requests_mock -from airflow.providers.grafana.loki.log.loki_task_handler import LokiRemoteLogIO, LokiTaskHandler +from airflow.providers.grafana.loki.log.loki_task_handler import LokiRemoteLogIO + + +class StubRuntimeTI: + def __init__(self): + self.dag_id = "test_dag" + self.task_id = "test_task" + self.run_id = "test_run" + self.try_number = 1 + self.map_index = -1 + + def log_relative_path(self, try_number=1): + return f"test_dag/test_task/test_run/{try_number}.log" + @pytest.fixture def mock_runtime_ti(): - ti = MagicMock() - ti.dag_id = "test_dag" - ti.task_id = "test_task" - ti.run_id = "test_run" - ti.try_number = 1 - ti.map_index = -1 - return ti + return StubRuntimeTI() + @pytest.fixture def tmp_log_folder(tmp_path): @@ -28,52 +31,58 @@ def tmp_log_folder(tmp_path): log_dir.mkdir() return log_dir + def test_loki_upload_deletes_local_copy_on_success(tmp_log_folder, mock_runtime_ti): # Mocking a JSON log output exactly like the Task SDK log_file = tmp_log_folder / "test.log" log_file.write_text(json.dumps({"event": "hello world"})) - + io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder, delete_local_copy=True) - + with requests_mock.Mocker() as m: m.post("http://localhost:3100/loki/api/v1/push", status_code=204) io.upload(log_file, mock_runtime_ti) - + # Verify the file was cleaned up correctly after successful push assert not log_file.exists() + def test_loki_bloom_filter_labels_and_payload(tmp_log_folder, mock_runtime_ti): # Validates that streams are not polluted with high cardinality labels log_file = tmp_log_folder / "test2.log" log_file.write_text(json.dumps({"event": "another line"})) - + io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder) - + with requests_mock.Mocker() as m: m.post("http://localhost:3100/loki/api/v1/push", status_code=204) io.upload(log_file, mock_runtime_ti) - + assert m.called request_body = json.loads(m.last_request.text) stream_labels = request_body["streams"][0]["stream"] - + # Crucial Assertion: Stream labels strictly avoid `task_id` and `try_number` assert stream_labels == {"job": "airflow_tasks", "dag_id": "test_dag"} - + # Crucial Assertion: TSDB JSON log values contain the task_id explicitly embedded log_json = json.loads(request_body["streams"][0]["values"][0][1]) assert log_json["task_id"] == "test_task" assert log_json["try_number"] == "1" + def test_read_generates_optimized_logql(tmp_log_folder, mock_runtime_ti): io = LokiRemoteLogIO(host="http://localhost:3100", base_log_folder=tmp_log_folder) - + with requests_mock.Mocker() as m: # Mock Loki response m.get("http://localhost:3100/loki/api/v1/query_range", json={"data": {"result": []}}) - io.read("", mock_runtime_ti) - + io.read("", 1, mock_runtime_ti) + assert m.called # Crucial Assertion: The generated LogQL strictly utilizes JSON Bloom Filters mapping to the embedded keys query_param = m.last_request.qs["query"][0] - assert query_param == '{job="airflow_tasks",dag_id="test_dag"} | json | task_id="test_task" | run_id="test_run" | try_number="1" | map_index="-1"' + assert ( + query_param + == '{job="airflow_tasks",dag_id="test_dag"} | json | task_id="test_task" | run_id="test_run" | try_number="1" | map_index="-1"' + ) From 58c0b1ee4d241916aecef8be5de85dbe1f2335ba Mon Sep 17 00:00:00 2001 From: evertonsa Date: Thu, 2 Apr 2026 20:25:55 +0200 Subject: [PATCH 5/6] fix: revert wrong changes --- pyproject.toml | 4 +- uv.lock | 110 ++++++++++++++++++++++++++----------------------- 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bc66ebcf645ad..f26e54af57a81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,7 +108,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-amazon>=9.0.0" ] "apache.cassandra" = [ - "apache-airflow-providers-apache-cassandra>=3.7.0" + "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"", ] "apache.drill" = [ "apache-airflow-providers-apache-drill>=2.8.1" @@ -168,7 +168,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-celery>=3.8.3" ] "cloudant" = [ - "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\"" + "apache-airflow-providers-cloudant>=4.0.1", ] "cncf.kubernetes" = [ "apache-airflow-providers-cncf-kubernetes>=9.0.0" diff --git a/uv.lock b/uv.lock index 860e2eae13e80..b4a4ad0e0fbc8 100644 --- a/uv.lock +++ b/uv.lock @@ -12,7 +12,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-03-26T21:12:11.341378713Z" +exclude-newer = "2026-03-29T18:25:19.975988101Z" exclude-newer-span = "P4D" [manifest] @@ -947,7 +947,7 @@ apache-atlas = [ { name = "atlasclient" }, ] apache-cassandra = [ - { name = "apache-airflow-providers-apache-cassandra" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*'" }, ] apache-drill = [ { name = "apache-airflow-providers-apache-drill" }, @@ -1353,8 +1353,8 @@ requires-dist = [ { name = "apache-airflow-providers-amazon", extras = ["aiobotocore"], marker = "extra == 'aiobotocore'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["python3-saml"], marker = "extra == 'amazon-aws-auth'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["s3fs"], marker = "extra == 's3fs'", editable = "providers/amazon" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*' and extra == 'apache-cassandra'", editable = "providers/apache/cassandra" }, { name = "apache-airflow-providers-apache-cassandra", marker = "extra == 'all'", editable = "providers/apache/cassandra" }, - { name = "apache-airflow-providers-apache-cassandra", marker = "extra == 'apache-cassandra'", editable = "providers/apache/cassandra" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'all'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'apache-drill'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-druid", marker = "extra == 'all'", editable = "providers/apache/druid" }, @@ -1395,7 +1395,7 @@ requires-dist = [ { name = "apache-airflow-providers-celery", marker = "extra == 'all'", editable = "providers/celery" }, { name = "apache-airflow-providers-celery", marker = "extra == 'celery'", editable = "providers/celery" }, { name = "apache-airflow-providers-cloudant", marker = "python_full_version != '3.9.*' and extra == 'all'", editable = "providers/cloudant" }, - { name = "apache-airflow-providers-cloudant", marker = "python_full_version != '3.9.*' and extra == 'cloudant'", editable = "providers/cloudant" }, + { name = "apache-airflow-providers-cloudant", marker = "extra == 'cloudant'", editable = "providers/cloudant" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'all'", editable = "providers/cncf/kubernetes" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'cncf-kubernetes'", editable = "providers/cncf/kubernetes" }, { name = "apache-airflow-providers-cohere", marker = "extra == 'all'", editable = "providers/cohere" }, @@ -1558,7 +1558,7 @@ requires-dist = [ { name = "cloudpickle", marker = "extra == 'cloudpickle'", specifier = ">=2.2.1" }, { name = "python-ldap", marker = "extra == 'ldap'", specifier = ">=3.4.4" }, { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.30.0" }, - { name = "uv", marker = "extra == 'uv'", specifier = ">=0.11.1" }, + { name = "uv", marker = "extra == 'uv'", specifier = ">=0.11.2" }, ] provides-extras = ["all-core", "async", "graphviz", "gunicorn", "kerberos", "memray", "otel", "statsd", "all-task-sdk", "airbyte", "alibaba", "amazon", "apache-cassandra", "apache-drill", "apache-druid", "apache-flink", "apache-hdfs", "apache-hive", "apache-iceberg", "apache-impala", "apache-kafka", "apache-kylin", "apache-livy", "apache-pig", "apache-pinot", "apache-spark", "apache-tinkerpop", "apprise", "arangodb", "asana", "atlassian-jira", "celery", "cloudant", "cncf-kubernetes", "cohere", "common-ai", "common-compat", "common-io", "common-messaging", "common-sql", "databricks", "datadog", "dbt-cloud", "dingding", "discord", "docker", "edge3", "elasticsearch", "exasol", "fab", "facebook", "ftp", "git", "github", "google", "grafana", "grpc", "hashicorp", "http", "imap", "influxdb", "informatica", "jdbc", "jenkins", "keycloak", "microsoft-azure", "microsoft-mssql", "microsoft-psrp", "microsoft-winrm", "mongo", "mysql", "neo4j", "odbc", "openai", "openfaas", "openlineage", "opensearch", "opsgenie", "oracle", "pagerduty", "papermill", "pgvector", "pinecone", "postgres", "presto", "qdrant", "redis", "salesforce", "samba", "segment", "sendgrid", "sftp", "singularity", "slack", "smtp", "snowflake", "sqlite", "ssh", "standard", "tableau", "telegram", "teradata", "trino", "vertica", "weaviate", "yandex", "ydb", "zendesk", "all", "aiobotocore", "apache-atlas", "apache-webhdfs", "amazon-aws-auth", "cloudpickle", "github-enterprise", "google-auth", "ldap", "pandas", "polars", "rabbitmq", "sentry", "s3fs", "uv"] @@ -2374,7 +2374,7 @@ requires-dist = [ { name = "rich", specifier = ">=13.6.0" }, { name = "rich-click", marker = "extra == 'devscripts'", specifier = ">=1.9.7" }, { name = "rich-click", marker = "extra == 'docs'", specifier = ">=1.9.7" }, - { name = "ruff", specifier = "==0.15.7" }, + { name = "ruff", specifier = "==0.15.8" }, { name = "semver", specifier = ">=3.0.2" }, { name = "semver", marker = "extra == 'devscripts'", specifier = ">=3.0.2" }, { name = "setuptools", marker = "extra == 'docs'", specifier = "<82.0.0" }, @@ -2734,7 +2734,7 @@ docs = [ [package.metadata] requires-dist = [ - { name = "aiobotocore", extras = ["boto3"], marker = "extra == 'aiobotocore'", specifier = ">=2.26.0" }, + { name = "aiobotocore", marker = "extra == 'aiobotocore'", specifier = ">=2.26.0" }, { name = "apache-airflow", editable = "." }, { name = "apache-airflow-providers-apache-hive", marker = "extra == 'apache-hive'", editable = "providers/apache/hive" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'cncf-kubernetes'", editable = "providers/cncf/kubernetes" }, @@ -3865,7 +3865,7 @@ docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = "d [[package]] name = "apache-airflow-providers-common-ai" -version = "0.0.1" +version = "0.1.0" source = { editable = "providers/common/ai" } dependencies = [ { name = "apache-airflow" }, @@ -4672,6 +4672,9 @@ dependencies = [ kerberos = [ { name = "kerberos" }, ] +oauth = [ + { name = "authlib" }, +] [package.dev-dependencies] dev = [ @@ -4679,6 +4682,7 @@ dev = [ { name = "apache-airflow-devel-common" }, { name = "apache-airflow-providers-common-compat" }, { name = "apache-airflow-task-sdk" }, + { name = "authlib" }, { name = "kerberos" }, { name = "requests-kerberos" }, ] @@ -4690,6 +4694,7 @@ docs = [ requires-dist = [ { name = "apache-airflow", editable = "." }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, + { name = "authlib", marker = "extra == 'oauth'", specifier = ">=1.0.0" }, { name = "blinker", specifier = ">=1.6.2" }, { name = "cachetools", specifier = ">=6.0" }, { name = "flask", specifier = ">=2.2.1" }, @@ -4709,7 +4714,7 @@ requires-dist = [ { name = "werkzeug", marker = "python_full_version >= '3.14'", specifier = ">=3.1.6" }, { name = "wtforms", specifier = ">=3.0" }, ] -provides-extras = ["kerberos"] +provides-extras = ["kerberos", "oauth"] [package.metadata.requires-dev] dev = [ @@ -4717,6 +4722,7 @@ dev = [ { name = "apache-airflow-devel-common", editable = "devel-common" }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, { name = "apache-airflow-task-sdk", editable = "task-sdk" }, + { name = "authlib", specifier = ">=1.0.0" }, { name = "kerberos", specifier = ">=1.3.0" }, { name = "requests-kerberos", specifier = ">=0.14.0" }, ] @@ -19539,27 +19545,27 @@ wheels = [ [[package]] name = "ruff" -version = "0.15.7" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/a1/22/9e4f66ee588588dc6c9af6a994e12d26e19efbe874d1a909d09a6dac7a59/ruff-0.15.7.tar.gz", hash = "sha256:04f1ae61fc20fe0b148617c324d9d009b5f63412c0b16474f3d5f1a1a665f7ac", size = 4601277, upload-time = "2026-03-19T16:26:22.605Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/41/2f/0b08ced94412af091807b6119ca03755d651d3d93a242682bf020189db94/ruff-0.15.7-py3-none-linux_armv6l.whl", hash = "sha256:a81cc5b6910fb7dfc7c32d20652e50fa05963f6e13ead3c5915c41ac5d16668e", size = 10489037, upload-time = "2026-03-19T16:26:32.47Z" }, - { url = "https://files.pythonhosted.org/packages/91/4a/82e0fa632e5c8b1eba5ee86ecd929e8ff327bbdbfb3c6ac5d81631bef605/ruff-0.15.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:722d165bd52403f3bdabc0ce9e41fc47070ac56d7a91b4e0d097b516a53a3477", size = 10955433, upload-time = "2026-03-19T16:27:00.205Z" }, - { url = "https://files.pythonhosted.org/packages/ab/10/12586735d0ff42526ad78c049bf51d7428618c8b5c467e72508c694119df/ruff-0.15.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:7fbc2448094262552146cbe1b9643a92f66559d3761f1ad0656d4991491af49e", size = 10269302, upload-time = "2026-03-19T16:26:26.183Z" }, - { url = "https://files.pythonhosted.org/packages/eb/5d/32b5c44ccf149a26623671df49cbfbd0a0ae511ff3df9d9d2426966a8d57/ruff-0.15.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b39329b60eba44156d138275323cc726bbfbddcec3063da57caa8a8b1d50adf", size = 10607625, upload-time = "2026-03-19T16:27:03.263Z" }, - { url = "https://files.pythonhosted.org/packages/5d/f1/f0001cabe86173aaacb6eb9bb734aa0605f9a6aa6fa7d43cb49cbc4af9c9/ruff-0.15.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:87768c151808505f2bfc93ae44e5f9e7c8518943e5074f76ac21558ef5627c85", size = 10324743, upload-time = "2026-03-19T16:27:09.791Z" }, - { url = "https://files.pythonhosted.org/packages/7a/87/b8a8f3d56b8d848008559e7c9d8bf367934d5367f6d932ba779456e2f73b/ruff-0.15.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fb0511670002c6c529ec66c0e30641c976c8963de26a113f3a30456b702468b0", size = 11138536, upload-time = "2026-03-19T16:27:06.101Z" }, - { url = "https://files.pythonhosted.org/packages/e4/f2/4fd0d05aab0c5934b2e1464784f85ba2eab9d54bffc53fb5430d1ed8b829/ruff-0.15.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0d19644f801849229db8345180a71bee5407b429dd217f853ec515e968a6912", size = 11994292, upload-time = "2026-03-19T16:26:48.718Z" }, - { url = "https://files.pythonhosted.org/packages/64/22/fc4483871e767e5e95d1622ad83dad5ebb830f762ed0420fde7dfa9d9b08/ruff-0.15.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4806d8e09ef5e84eb19ba833d0442f7e300b23fe3f0981cae159a248a10f0036", size = 11398981, upload-time = "2026-03-19T16:26:54.513Z" }, - { url = "https://files.pythonhosted.org/packages/b0/99/66f0343176d5eab02c3f7fcd2de7a8e0dd7a41f0d982bee56cd1c24db62b/ruff-0.15.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dce0896488562f09a27b9c91b1f58a097457143931f3c4d519690dea54e624c5", size = 11242422, upload-time = "2026-03-19T16:26:29.277Z" }, - { url = "https://files.pythonhosted.org/packages/5d/3a/a7060f145bfdcce4c987ea27788b30c60e2c81d6e9a65157ca8afe646328/ruff-0.15.7-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:1852ce241d2bc89e5dc823e03cff4ce73d816b5c6cdadd27dbfe7b03217d2a12", size = 11232158, upload-time = "2026-03-19T16:26:42.321Z" }, - { url = "https://files.pythonhosted.org/packages/a7/53/90fbb9e08b29c048c403558d3cdd0adf2668b02ce9d50602452e187cd4af/ruff-0.15.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5f3e4b221fb4bd293f79912fc5e93a9063ebd6d0dcbd528f91b89172a9b8436c", size = 10577861, upload-time = "2026-03-19T16:26:57.459Z" }, - { url = "https://files.pythonhosted.org/packages/2f/aa/5f486226538fe4d0f0439e2da1716e1acf895e2a232b26f2459c55f8ddad/ruff-0.15.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:b15e48602c9c1d9bdc504b472e90b90c97dc7d46c7028011ae67f3861ceba7b4", size = 10327310, upload-time = "2026-03-19T16:26:35.909Z" }, - { url = "https://files.pythonhosted.org/packages/99/9e/271afdffb81fe7bfc8c43ba079e9d96238f674380099457a74ccb3863857/ruff-0.15.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:1b4705e0e85cedc74b0a23cf6a179dbb3df184cb227761979cc76c0440b5ab0d", size = 10840752, upload-time = "2026-03-19T16:26:45.723Z" }, - { url = "https://files.pythonhosted.org/packages/bf/29/a4ae78394f76c7759953c47884eb44de271b03a66634148d9f7d11e721bd/ruff-0.15.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:112c1fa316a558bb34319282c1200a8bf0495f1b735aeb78bfcb2991e6087580", size = 11336961, upload-time = "2026-03-19T16:26:39.076Z" }, - { url = "https://files.pythonhosted.org/packages/26/6b/8786ba5736562220d588a2f6653e6c17e90c59ced34a2d7b512ef8956103/ruff-0.15.7-py3-none-win32.whl", hash = "sha256:6d39e2d3505b082323352f733599f28169d12e891f7dd407f2d4f54b4c2886de", size = 10582538, upload-time = "2026-03-19T16:26:15.992Z" }, - { url = "https://files.pythonhosted.org/packages/2b/e9/346d4d3fffc6871125e877dae8d9a1966b254fbd92a50f8561078b88b099/ruff-0.15.7-py3-none-win_amd64.whl", hash = "sha256:4d53d712ddebcd7dace1bc395367aec12c057aacfe9adbb6d832302575f4d3a1", size = 11755839, upload-time = "2026-03-19T16:26:19.897Z" }, - { url = "https://files.pythonhosted.org/packages/8f/e8/726643a3ea68c727da31570bde48c7a10f1aa60eddd628d94078fec586ff/ruff-0.15.7-py3-none-win_arm64.whl", hash = "sha256:18e8d73f1c3fdf27931497972250340f92e8c861722161a9caeb89a58ead6ed2", size = 11023304, upload-time = "2026-03-19T16:26:51.669Z" }, +version = "0.15.8" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/14/b0/73cf7550861e2b4824950b8b52eebdcc5adc792a00c514406556c5b80817/ruff-0.15.8.tar.gz", hash = "sha256:995f11f63597ee362130d1d5a327a87cb6f3f5eae3094c620bcc632329a4d26e", size = 4610921, upload-time = "2026-03-26T18:39:38.675Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4a/92/c445b0cd6da6e7ae51e954939cb69f97e008dbe750cfca89b8cedc081be7/ruff-0.15.8-py3-none-linux_armv6l.whl", hash = "sha256:cbe05adeba76d58162762d6b239c9056f1a15a55bd4b346cfd21e26cd6ad7bc7", size = 10527394, upload-time = "2026-03-26T18:39:41.566Z" }, + { url = "https://files.pythonhosted.org/packages/eb/92/f1c662784d149ad1414cae450b082cf736430c12ca78367f20f5ed569d65/ruff-0.15.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d3e3d0b6ba8dca1b7ef9ab80a28e840a20070c4b62e56d675c24f366ef330570", size = 10905693, upload-time = "2026-03-26T18:39:30.364Z" }, + { url = "https://files.pythonhosted.org/packages/ca/f2/7a631a8af6d88bcef997eb1bf87cc3da158294c57044aafd3e17030613de/ruff-0.15.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:6ee3ae5c65a42f273f126686353f2e08ff29927b7b7e203b711514370d500de3", size = 10323044, upload-time = "2026-03-26T18:39:33.37Z" }, + { url = "https://files.pythonhosted.org/packages/67/18/1bf38e20914a05e72ef3b9569b1d5c70a7ef26cd188d69e9ca8ef588d5bf/ruff-0.15.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fdce027ada77baa448077ccc6ebb2fa9c3c62fd110d8659d601cf2f475858d94", size = 10629135, upload-time = "2026-03-26T18:39:44.142Z" }, + { url = "https://files.pythonhosted.org/packages/d2/e9/138c150ff9af60556121623d41aba18b7b57d95ac032e177b6a53789d279/ruff-0.15.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:12e617fc01a95e5821648a6df341d80456bd627bfab8a829f7cfc26a14a4b4a3", size = 10348041, upload-time = "2026-03-26T18:39:52.178Z" }, + { url = "https://files.pythonhosted.org/packages/02/f1/5bfb9298d9c323f842c5ddeb85f1f10ef51516ac7a34ba446c9347d898df/ruff-0.15.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:432701303b26416d22ba696c39f2c6f12499b89093b61360abc34bcc9bf07762", size = 11121987, upload-time = "2026-03-26T18:39:55.195Z" }, + { url = "https://files.pythonhosted.org/packages/10/11/6da2e538704e753c04e8d86b1fc55712fdbdcc266af1a1ece7a51fff0d10/ruff-0.15.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d910ae974b7a06a33a057cb87d2a10792a3b2b3b35e33d2699fdf63ec8f6b17a", size = 11951057, upload-time = "2026-03-26T18:39:19.18Z" }, + { url = "https://files.pythonhosted.org/packages/83/f0/c9208c5fd5101bf87002fed774ff25a96eea313d305f1e5d5744698dc314/ruff-0.15.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2033f963c43949d51e6fdccd3946633c6b37c484f5f98c3035f49c27395a8ab8", size = 11464613, upload-time = "2026-03-26T18:40:06.301Z" }, + { url = "https://files.pythonhosted.org/packages/f8/22/d7f2fabdba4fae9f3b570e5605d5eb4500dcb7b770d3217dca4428484b17/ruff-0.15.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f29b989a55572fb885b77464cf24af05500806ab4edf9a0fd8977f9759d85b1", size = 11257557, upload-time = "2026-03-26T18:39:57.972Z" }, + { url = "https://files.pythonhosted.org/packages/71/8c/382a9620038cf6906446b23ce8632ab8c0811b8f9d3e764f58bedd0c9a6f/ruff-0.15.8-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:ac51d486bf457cdc985a412fb1801b2dfd1bd8838372fc55de64b1510eff4bec", size = 11169440, upload-time = "2026-03-26T18:39:22.205Z" }, + { url = "https://files.pythonhosted.org/packages/4d/0d/0994c802a7eaaf99380085e4e40c845f8e32a562e20a38ec06174b52ef24/ruff-0.15.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:c9861eb959edab053c10ad62c278835ee69ca527b6dcd72b47d5c1e5648964f6", size = 10605963, upload-time = "2026-03-26T18:39:46.682Z" }, + { url = "https://files.pythonhosted.org/packages/19/aa/d624b86f5b0aad7cef6bbf9cd47a6a02dfdc4f72c92a337d724e39c9d14b/ruff-0.15.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8d9a5b8ea13f26ae90838afc33f91b547e61b794865374f114f349e9036835fb", size = 10357484, upload-time = "2026-03-26T18:39:49.176Z" }, + { url = "https://files.pythonhosted.org/packages/35/c3/e0b7835d23001f7d999f3895c6b569927c4d39912286897f625736e1fd04/ruff-0.15.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c2a33a529fb3cbc23a7124b5c6ff121e4d6228029cba374777bd7649cc8598b8", size = 10830426, upload-time = "2026-03-26T18:40:03.702Z" }, + { url = "https://files.pythonhosted.org/packages/f0/51/ab20b322f637b369383adc341d761eaaa0f0203d6b9a7421cd6e783d81b9/ruff-0.15.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:75e5cd06b1cf3f47a3996cfc999226b19aa92e7cce682dcd62f80d7035f98f49", size = 11345125, upload-time = "2026-03-26T18:39:27.799Z" }, + { url = "https://files.pythonhosted.org/packages/37/e6/90b2b33419f59d0f2c4c8a48a4b74b460709a557e8e0064cf33ad894f983/ruff-0.15.8-py3-none-win32.whl", hash = "sha256:bc1f0a51254ba21767bfa9a8b5013ca8149dcf38092e6a9eb704d876de94dc34", size = 10571959, upload-time = "2026-03-26T18:39:36.117Z" }, + { url = "https://files.pythonhosted.org/packages/1f/a2/ef467cb77099062317154c63f234b8a7baf7cb690b99af760c5b68b9ee7f/ruff-0.15.8-py3-none-win_amd64.whl", hash = "sha256:04f79eff02a72db209d47d665ba7ebcad609d8918a134f86cb13dd132159fc89", size = 11743893, upload-time = "2026-03-26T18:39:25.01Z" }, + { url = "https://files.pythonhosted.org/packages/15/e2/77be4fff062fa78d9b2a4dea85d14785dac5f1d0c1fb58ed52331f0ebe28/ruff-0.15.8-py3-none-win_arm64.whl", hash = "sha256:cf891fa8e3bb430c0e7fac93851a5978fc99c8fa2c053b57b118972866f8e5f2", size = 11048175, upload-time = "2026-03-26T18:40:01.06Z" }, ] [[package]] @@ -21809,28 +21815,28 @@ wheels = [ [[package]] name = "uv" -version = "0.11.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/2b/e9/691eb77e5e767cdec695db3f91ec259bbb66f9af7c86a8dbe462ef72a120/uv-0.11.1.tar.gz", hash = "sha256:8aa7e4983fabb06d0ba58e8b8c969d568ce495ad5f2f0426af97b55720f0dee1", size = 4007244, upload-time = "2026-03-24T23:14:18.269Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/16/f9/a95c44fba785c27a966087154a8f6825774d49a38b3c5cd35f80e07ca5ca/uv-0.11.1-py3-none-linux_armv6l.whl", hash = "sha256:424b5b412d37838ea6dc11962f037be98b92e83c6ec755509e2af8a4ca3fbf2a", size = 23320598, upload-time = "2026-03-24T23:13:44.998Z" }, - { url = "https://files.pythonhosted.org/packages/5d/de/b7e24956a2508debf2addefcad93c72165069370f914d90db6264e0cf96a/uv-0.11.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:c2133b0532af0217bf252d981bded8bff0c770f174f91f20655f88705f28c03f", size = 22832732, upload-time = "2026-03-24T23:13:33.677Z" }, - { url = "https://files.pythonhosted.org/packages/93/bd/1ac91bc704c22a427a44262f09e208ae897817a856d0e8dc0d60e4032e92/uv-0.11.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:1a7b74e5a15b9bc6e61ce807adeca5a2807f557d3f06a5586de1da309d844c1d", size = 21406409, upload-time = "2026-03-24T23:14:32.231Z" }, - { url = "https://files.pythonhosted.org/packages/34/1d/f767701e1160538d25ee6c1d49ce1e72442970b6658365afdd57339d10e0/uv-0.11.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:fb1f32ec6c7dffb7ae71afaf6bf1defca0bd20a73a25e61226210c0a3e8bb13d", size = 23154066, upload-time = "2026-03-24T23:14:07.334Z" }, - { url = "https://files.pythonhosted.org/packages/55/21/d2cfa3571557ba68ffd530656b1d7159fe59a6b01be94595351b1eec1c29/uv-0.11.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:0d5cf3c1c96f8afd67072d80479a58c2d69471916bac4ac36cc55f2aa025dc8e", size = 22922490, upload-time = "2026-03-24T23:13:25.83Z" }, - { url = "https://files.pythonhosted.org/packages/59/3c/68119f555b2ec152235951cc9aa0f40006c5f03d17c98adaab6a3d36d42b/uv-0.11.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5829a254c64b19420b9e48186182d162b01f8da0130e770cbb8851fd138bb820", size = 22923054, upload-time = "2026-03-24T23:14:03.595Z" }, - { url = "https://files.pythonhosted.org/packages/70/ce/0df944835519372b1d698acaa388baa874cf69a6183b5f0980cb8855b81a/uv-0.11.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d4259027e80f4dcc9ae3dceddcd5407173d334484737166fc212e96bb760d6ea", size = 24576177, upload-time = "2026-03-24T23:14:25.263Z" }, - { url = "https://files.pythonhosted.org/packages/db/04/0076335413c618fe086e5a4762103634552e638a841e12a4bb8f5137d710/uv-0.11.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b6169eb49d1d2b5df7a7079162e1242e49ad46c6590c55f05b182fa526963763", size = 25207026, upload-time = "2026-03-24T23:14:11.579Z" }, - { url = "https://files.pythonhosted.org/packages/bb/57/79c0479e12c2291ad9777be53d813957fa38283975b708eead8e855ba725/uv-0.11.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c96a7310a051b1013efffe082f31d718bce0538d4abc20a716d529bf226b7c44", size = 24393748, upload-time = "2026-03-24T23:13:48.553Z" }, - { url = "https://files.pythonhosted.org/packages/c3/25/9ef73c8b6ef04b0cead7d8f1547034568e3e58f3397b55b83167e587f84a/uv-0.11.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:41ccc438dbb905240a3630265feb25be1bda61656ec7c32682a83648a686f4aa", size = 24518525, upload-time = "2026-03-24T23:13:41.129Z" }, - { url = "https://files.pythonhosted.org/packages/a0/a3/035c7c2feb2139efb5d70f2e9f68912c34f7d92ee2429bacd708824483bb/uv-0.11.1-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:44f528ba3d66321cea829770982cccb14af142203e4e19d00ff0c23b28e3cd33", size = 23270167, upload-time = "2026-03-24T23:13:51.937Z" }, - { url = "https://files.pythonhosted.org/packages/25/59/2dd782b537bfd1e41cb06de4f4a529fe2f9bd10034fb3fcce225ec86c1a5/uv-0.11.1-py3-none-manylinux_2_31_riscv64.musllinux_1_1_riscv64.whl", hash = "sha256:4fcc3d5fdea24181d77e7765bf9d16cdd9803fd524820c62c66f91b2e2644d5b", size = 24011976, upload-time = "2026-03-24T23:13:37.402Z" }, - { url = "https://files.pythonhosted.org/packages/7b/f0/9983e6f31d495cc548f1e211cab5b89a3716f406a2d9d8134b8245ec103c/uv-0.11.1-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:5de9e43a32079b8d57093542b0cd8415adba5ed9944fa49076c0927f3ff927e1", size = 24029605, upload-time = "2026-03-24T23:14:28.819Z" }, - { url = "https://files.pythonhosted.org/packages/19/dc/9c59e803bfc1b9d6c4c4b7374689c688e9dc0a1ecc2375399d3a59fd4a58/uv-0.11.1-py3-none-musllinux_1_1_i686.whl", hash = "sha256:f13ae98a938effae5deb587a63e7e42f05d6ba9c1661903ef538e4e87b204f8c", size = 23702811, upload-time = "2026-03-24T23:14:21.207Z" }, - { url = "https://files.pythonhosted.org/packages/7d/77/b1cbfdac0b2dd3e7aa420e9dad1abe8badb47eabd8741a9993586b14f8dc/uv-0.11.1-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:57d38e8b6f6937e1521da568adf846bb89439c73e146e89a8ab2cfe7bb15657a", size = 24714239, upload-time = "2026-03-24T23:13:29.814Z" }, - { url = "https://files.pythonhosted.org/packages/e4/d3/94917751acbbb5e053cb366004ae8be3c9664f82aef7de54f55e38ec15cb/uv-0.11.1-py3-none-win32.whl", hash = "sha256:36f4552b24acaa4699b02baeb1bb928202bb98d426dcc5041ab7ebae082a6430", size = 22404606, upload-time = "2026-03-24T23:13:55.614Z" }, - { url = "https://files.pythonhosted.org/packages/aa/87/8dadfe03944a4a493cd58b6f4f13e5181069a0048aeb2fae7da2c587a542/uv-0.11.1-py3-none-win_amd64.whl", hash = "sha256:d6a1c4cdb1064e9ceaa59e89a7489dd196222a0b90cfb77ca37a909b5e024ea0", size = 24850092, upload-time = "2026-03-24T23:14:15.186Z" }, - { url = "https://files.pythonhosted.org/packages/38/1b/dad559273df0c8263533afa4a28570cf6804272f379df9830b528a9cf8bc/uv-0.11.1-py3-none-win_arm64.whl", hash = "sha256:3bc9632033c7a280342f9b304bd12eccb47d6965d50ea9ee57ecfaf4f1f393c4", size = 23376127, upload-time = "2026-03-24T23:13:59.59Z" }, +version = "0.11.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ba/9e/65dfeeafe5644a2e0bdd9dfdd4bdc37c87b06067fdff4596eeba0bc0f2f5/uv-0.11.2.tar.gz", hash = "sha256:ef226af1d814466df45dc8a746c5220a951643d0832296a00c30ac3db95a3a4c", size = 4010086, upload-time = "2026-03-26T21:22:13.185Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/29/6f/6469561a85b81d690ad63eac1135ce4d4f8269cb4fc92da20ff7efa5fa4f/uv-0.11.2-py3-none-linux_armv6l.whl", hash = "sha256:f27ca998085eb8dc095ff9d7568aa08d9ce7c0d2b74bd525da5cd2e5b7367b71", size = 23387567, upload-time = "2026-03-26T21:22:02.49Z" }, + { url = "https://files.pythonhosted.org/packages/27/2a/313b5de76e52cc75e38fd3e5f1644d6b16d4d4bdb9aaff8508ec955255ed/uv-0.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:00054a0041c25b3ec3d0f4f6221d3cbfda32e70f7d1c60bee36f1a9736f47b68", size = 22819340, upload-time = "2026-03-26T21:22:42.942Z" }, + { url = "https://files.pythonhosted.org/packages/3a/74/64ea01a48383748f0e1087e617fab0d88176f506fc47e3a18fb936a22a3d/uv-0.11.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:89972042233c90adf8b8150ec164444a4df41938739e5736773ac00870840887", size = 21425465, upload-time = "2026-03-26T21:22:05.232Z" }, + { url = "https://files.pythonhosted.org/packages/b6/85/d9d71a940e90d1ec130483a02d25711010609c613d245abd48ff14fdfd1d/uv-0.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:e1f98621b3ffd5dd40bec12bd716e67aec552a7978c7753b709206d7a0e4f93f", size = 23140501, upload-time = "2026-03-26T21:22:31.896Z" }, + { url = "https://files.pythonhosted.org/packages/59/4d/c25126473337acf071b0d572ff94fb6444364641b3d311568028349c964d/uv-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:66925ceb0e76826b5280937a93e31f0b093c9edfafbb52db7936595b1ef205b8", size = 23003445, upload-time = "2026-03-26T21:22:15.371Z" }, + { url = "https://files.pythonhosted.org/packages/5b/3e/1ef69d9fc88e04037ffebd5c41f70dadeb73021033ced57b2e186b23ac7c/uv-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a10911b6a555d31beb835653cedc0bc491b656e964d30be8eb9186f1fe0ef88c", size = 22989489, upload-time = "2026-03-26T21:22:26.226Z" }, + { url = "https://files.pythonhosted.org/packages/a0/04/0398b4a5be0f3dd07be80d31275754338ae8857f78309b9776ab854d0a85/uv-0.11.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2b8fa0a2ddc69c9ed373d72144b950ac2af81e3d95047c2d02564a8a03be538c", size = 24603289, upload-time = "2026-03-26T21:22:45.967Z" }, + { url = "https://files.pythonhosted.org/packages/e6/79/0388bbb629db283a883e4412d5f54cf62ec4b9f7bb6631781fbbb49c0792/uv-0.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:fbbd6e6e682b7f0bbdfff3348e580ea0fa58a07741e54cc8641b919bdf6f9128", size = 25218467, upload-time = "2026-03-26T21:22:20.701Z" }, + { url = "https://files.pythonhosted.org/packages/25/5c/725442191dee62e5b906576ed0ff432a1f2e3b38994c81e16156574e97ab/uv-0.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:8f9f3ac825561edec6494588d6aed7d3f4a08618b167eb256b4a9027b13304a6", size = 24418929, upload-time = "2026-03-26T21:22:23.446Z" }, + { url = "https://files.pythonhosted.org/packages/9f/6e/f49ca8ad037919e5d44a2070af3d369792be3419c594cfb92f4404ab7832/uv-0.11.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:be4bb136bbc8840ede58663e8ba5a9bbf3b5376f7f933f915df28d4078bb9095", size = 24586892, upload-time = "2026-03-26T21:22:18.044Z" }, + { url = "https://files.pythonhosted.org/packages/83/08/aff0a8098ac5946d195e67bf091d494f34c1009ea6e163d0c23e241527e1/uv-0.11.2-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:fea7efc97f9fcfb345e588c71fa56250c0db8c2bfd8d4e2cd4d21e1308c4e6ac", size = 23232598, upload-time = "2026-03-26T21:22:51.865Z" }, + { url = "https://files.pythonhosted.org/packages/1c/43/eced218d15f8ed58fbb081f0b826e4f016b501b50ec317ab6c331b60c15c/uv-0.11.2-py3-none-manylinux_2_31_riscv64.musllinux_1_1_riscv64.whl", hash = "sha256:b5529572ea7150311f5a17b5d09ef19781c2484932e14eed44a0c038f93ef722", size = 23998818, upload-time = "2026-03-26T21:22:49.097Z" }, + { url = "https://files.pythonhosted.org/packages/62/96/da68d159ba3f49a516796273463288b53d675675c5a0df71c14301ec4323/uv-0.11.2-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:0919096889e26d0edcbc731e95c4a4d1f47ef881fb46970cbf0800bf17d4840e", size = 24047673, upload-time = "2026-03-26T21:22:37.6Z" }, + { url = "https://files.pythonhosted.org/packages/62/be/db2400f4699717b4f34e036e7a1c54bc1f89c7c5b3303abc8d8a00664071/uv-0.11.2-py3-none-musllinux_1_1_i686.whl", hash = "sha256:7a05747eecca4534c284dbab213526468092317e8f6aec7a6c9f89ce3d1248d3", size = 23733334, upload-time = "2026-03-26T21:22:40.247Z" }, + { url = "https://files.pythonhosted.org/packages/29/27/4045960075f4898a44f092625e9f08ee8af4229be7df6ad487d58aa7d51e/uv-0.11.2-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:00cbf1829e158b053b0bdc675d9f9c13700b29be90a9bad966cc9b586c01265b", size = 24790898, upload-time = "2026-03-26T21:22:07.812Z" }, + { url = "https://files.pythonhosted.org/packages/e4/9d/7470f39bf72683f1908e7ba70f5379f14e4984c8e6a65f7563f3dfb19f13/uv-0.11.2-py3-none-win32.whl", hash = "sha256:a1b8a39b17cf9e3183a35a44dffa103c91c412f003569a210883ffb537c2c65d", size = 22516649, upload-time = "2026-03-26T21:22:34.806Z" }, + { url = "https://files.pythonhosted.org/packages/f6/a3/c88fa454a7c07785ce63e96b6c1c7b24b5abcb3a6afbc6ad8b29b9bc1a1d/uv-0.11.2-py3-none-win_amd64.whl", hash = "sha256:d4dbcecf6daca8605f46fba232f49e9b49d06ebe3b9cba5e59e608c5be03890e", size = 24989876, upload-time = "2026-03-26T21:22:28.917Z" }, + { url = "https://files.pythonhosted.org/packages/a2/50/fae409a028d87db02ffbf3a3b5ac39980fbeb3d9a0356f49943722b2cabb/uv-0.11.2-py3-none-win_arm64.whl", hash = "sha256:e5b8570e88af5073ce5aa5df4866484e69035a6e66caab8a5c51a988a989a467", size = 23450736, upload-time = "2026-03-26T21:22:10.838Z" }, ] [[package]] From 3780594bc171382d6abaf89dcf25373f14f026d2 Mon Sep 17 00:00:00 2001 From: evertonsa Date: Thu, 2 Apr 2026 20:30:36 +0200 Subject: [PATCH 6/6] fix: revert wrong changes --- pyproject.toml | 14 +++++++------- uv.lock | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f26e54af57a81..14cb2c5f91037 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,7 +108,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-amazon>=9.0.0" ] "apache.cassandra" = [ - "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"", + "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"" ] "apache.drill" = [ "apache-airflow-providers-apache-drill>=2.8.1" @@ -168,7 +168,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-celery>=3.8.3" ] "cloudant" = [ - "apache-airflow-providers-cloudant>=4.0.1", + "apache-airflow-providers-cloudant>=4.0.1" ] "cncf.kubernetes" = [ "apache-airflow-providers-cncf-kubernetes>=9.0.0" @@ -237,7 +237,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-google>=10.24.0" ] "grafana" = [ - "apache-airflow-providers-grafana>=1.0.0" # Set from local provider pyproject.toml + "apache-airflow-providers-grafana>=1.0.0" ] "grpc" = [ "apache-airflow-providers-grpc>=3.7.0" @@ -401,7 +401,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-airbyte>=5.0.0", "apache-airflow-providers-alibaba>=3.0.0", "apache-airflow-providers-amazon>=9.0.0", - "apache-airflow-providers-apache-cassandra>=3.7.0", + "apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=\"3.14\"", "apache-airflow-providers-apache-drill>=2.8.1", "apache-airflow-providers-apache-druid>=3.12.0", "apache-airflow-providers-apache-flink>=1.6.0", @@ -421,7 +421,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-asana>=2.7.0", "apache-airflow-providers-atlassian-jira>=2.7.1", "apache-airflow-providers-celery>=3.8.3", - "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\"", + "apache-airflow-providers-cloudant>=4.0.1", "apache-airflow-providers-cncf-kubernetes>=9.0.0", "apache-airflow-providers-cohere>=1.4.0", "apache-airflow-providers-common-ai>=0.1.0", # Set from local provider pyproject.toml @@ -444,7 +444,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-git>=0.0.2", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-github>=2.8.0", "apache-airflow-providers-google>=10.24.0", - "apache-airflow-providers-grafana>=1.0.0", # Set from local provider pyproject.toml + "apache-airflow-providers-grafana>=1.0.0", "apache-airflow-providers-grpc>=3.7.0", "apache-airflow-providers-hashicorp>=4.0.0", "apache-airflow-providers-http>=4.13.2", @@ -1622,4 +1622,4 @@ members = [ "providers/ydb", "providers/zendesk", # End of automatically generated provider workspace members -] +] \ No newline at end of file diff --git a/uv.lock b/uv.lock index b4a4ad0e0fbc8..4ec931eaea43a 100644 --- a/uv.lock +++ b/uv.lock @@ -12,7 +12,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-03-29T18:25:19.975988101Z" +exclude-newer = "2026-03-29T18:29:55.51454679Z" exclude-newer-span = "P4D" [manifest] @@ -827,7 +827,7 @@ all = [ { name = "apache-airflow-providers-airbyte" }, { name = "apache-airflow-providers-alibaba" }, { name = "apache-airflow-providers-amazon", extra = ["aiobotocore", "python3-saml", "s3fs"] }, - { name = "apache-airflow-providers-apache-cassandra" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*'" }, { name = "apache-airflow-providers-apache-drill" }, { name = "apache-airflow-providers-apache-druid" }, { name = "apache-airflow-providers-apache-flink" }, @@ -1353,8 +1353,8 @@ requires-dist = [ { name = "apache-airflow-providers-amazon", extras = ["aiobotocore"], marker = "extra == 'aiobotocore'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["python3-saml"], marker = "extra == 'amazon-aws-auth'", editable = "providers/amazon" }, { name = "apache-airflow-providers-amazon", extras = ["s3fs"], marker = "extra == 's3fs'", editable = "providers/amazon" }, + { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*' and extra == 'all'", editable = "providers/apache/cassandra" }, { name = "apache-airflow-providers-apache-cassandra", marker = "python_full_version != '3.14.*' and extra == 'apache-cassandra'", editable = "providers/apache/cassandra" }, - { name = "apache-airflow-providers-apache-cassandra", marker = "extra == 'all'", editable = "providers/apache/cassandra" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'all'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-drill", marker = "extra == 'apache-drill'", editable = "providers/apache/drill" }, { name = "apache-airflow-providers-apache-druid", marker = "extra == 'all'", editable = "providers/apache/druid" }, @@ -1394,7 +1394,7 @@ requires-dist = [ { name = "apache-airflow-providers-atlassian-jira", marker = "extra == 'atlassian-jira'", editable = "providers/atlassian/jira" }, { name = "apache-airflow-providers-celery", marker = "extra == 'all'", editable = "providers/celery" }, { name = "apache-airflow-providers-celery", marker = "extra == 'celery'", editable = "providers/celery" }, - { name = "apache-airflow-providers-cloudant", marker = "python_full_version != '3.9.*' and extra == 'all'", editable = "providers/cloudant" }, + { name = "apache-airflow-providers-cloudant", marker = "extra == 'all'", editable = "providers/cloudant" }, { name = "apache-airflow-providers-cloudant", marker = "extra == 'cloudant'", editable = "providers/cloudant" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'all'", editable = "providers/cncf/kubernetes" }, { name = "apache-airflow-providers-cncf-kubernetes", marker = "extra == 'cncf-kubernetes'", editable = "providers/cncf/kubernetes" },