From 33ba157da73f1e9a676b1db62c8be5e1c05291de Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Mon, 30 Mar 2026 22:47:58 +0300 Subject: [PATCH 1/2] handle rate limiting of k8s api server in k8s executor --- .../executors/kubernetes_executor.py | 29 +++- .../executors/test_kubernetes_executor.py | 127 ++++++++++++++++-- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 8c3722d2630b6..a578704fef6df 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -31,7 +31,7 @@ import time from collections import Counter, defaultdict from contextlib import suppress -from datetime import datetime +from datetime import datetime, timedelta from queue import Empty, Queue from typing import TYPE_CHECKING, Any @@ -116,6 +116,7 @@ def __init__(self, *args, **kwargs): "kubernetes_executor", "task_publish_max_retries", fallback=0 ) self.completed: set[KubernetesResults] = set() + self.create_pods_after: datetime | None = None def _list_pods(self, query_kwargs): query_kwargs["header_params"] = { @@ -313,6 +314,12 @@ def sync(self) -> None: from kubernetes.client.rest import ApiException + if self.create_pods_after and self.create_pods_after > datetime.now(): + self.log.warning("Skipping pod creation due to kubernetes rate limit") + return + + self.create_pods_after = None + with contextlib.suppress(Empty): for _ in range(self.kube_config.worker_pods_creation_batch_size): task = self.task_queue.get_nowait() @@ -339,15 +346,21 @@ def sync(self) -> None: # Use the body directly as the message instead. body = {"message": e.body} + headers = e.headers retries = self.task_publish_retries[key] # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries + # In case of a rate limit, wait and do not create new pods for "Retry-After" seconds + can_retry_publish = ( + self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries + ) message = body.get("message", "") if ( (str(e.status) == "403" and "exceeded quota" in message) or (str(e.status) == "409" and "object has been modified" in message) or (str(e.status) == "410" and "too old resource version" in message) or str(e.status) == "500" - ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): + or str(e.status) == "429" + ) and can_retry_publish: self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", self.task_publish_retries[key] + 1, @@ -356,8 +369,20 @@ def sync(self) -> None: e.reason, message, ) + self.task_queue.put(task) self.task_publish_retries[key] = retries + 1 + + if str(e.status) == "429": + self.create_pods_after = datetime.now() + timedelta( + seconds=int(headers.get("Retry-After", "0")) + ) + self.log.warning( + "Got rate limit from k8s api, skipping pod creation until %s", + self.create_pods_after, + ) + # stop pod creation to stop api requests + break else: self.log.error("Pod creation failed with reason %r. Failing task", e.reason) key = task.key diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 0afb2836e96ed..22c4097a719c4 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -20,7 +20,7 @@ import re import string import time -from datetime import datetime +from datetime import datetime, timedelta from unittest import mock import pytest @@ -275,13 +275,14 @@ def test_resource_version_singleton(self): AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" ) @pytest.mark.parametrize( - ("response", "task_publish_max_retries", "should_requeue", "task_expected_state"), + ("response", "task_publish_max_retries", "should_requeue", "task_expected_state", "retry_delay"), [ pytest.param( HTTPResponse(body='{"message": "any message"}', status=400), 0, False, State.FAILED, + None, id="400 BadRequest", ), pytest.param( @@ -289,6 +290,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="400 BadRequest (task_publish_max_retries=1)", ), pytest.param( @@ -296,6 +298,7 @@ def test_resource_version_singleton(self): 0, False, State.FAILED, + None, id="403 Forbidden (permission denied)", ), pytest.param( @@ -303,6 +306,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="403 Forbidden (permission denied) (task_publish_max_retries=1)", ), pytest.param( @@ -315,6 +319,7 @@ def test_resource_version_singleton(self): 0, False, State.FAILED, + None, id="403 Forbidden (exceeded quota)", ), pytest.param( @@ -327,6 +332,7 @@ def test_resource_version_singleton(self): 1, True, State.SUCCESS, + None, id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry succeeded)", ), pytest.param( @@ -339,6 +345,7 @@ def test_resource_version_singleton(self): 1, True, State.FAILED, + None, id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry failed)", ), pytest.param( @@ -346,6 +353,7 @@ def test_resource_version_singleton(self): 0, False, State.FAILED, + None, id="404 Not Found", ), pytest.param( @@ -353,6 +361,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="404 Not Found (task_publish_max_retries=1)", ), pytest.param( @@ -360,6 +369,7 @@ def test_resource_version_singleton(self): 0, False, State.FAILED, + None, id="422 Unprocessable Entity", ), pytest.param( @@ -367,6 +377,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="422 Unprocessable Entity (task_publish_max_retries=1)", ), pytest.param( @@ -374,6 +385,7 @@ def test_resource_version_singleton(self): 0, False, State.FAILED, + None, id="12345 fake-unhandled-reason", ), pytest.param( @@ -381,6 +393,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry succeeded)", ), pytest.param( @@ -388,6 +401,7 @@ def test_resource_version_singleton(self): 1, False, State.FAILED, + None, id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)", ), pytest.param( @@ -398,6 +412,7 @@ def test_resource_version_singleton(self): 1, True, State.SUCCESS, + None, id="409 conflict", ), pytest.param( @@ -408,27 +423,27 @@ def test_resource_version_singleton(self): 1, True, State.SUCCESS, + None, id="410 gone", ), pytest.param( - HTTPResponse(body="Too many requests, please try again later.", status=429), - 0, - False, - State.FAILED, - id="429 Too Many Requests (non-JSON body)", - ), - pytest.param( - HTTPResponse(body="Too many requests, please try again later.", status=429), + HTTPResponse( + body="Too many requests, please try again later.", + status=429, + headers={"Retry-After": "3"}, + ), 1, - False, - State.FAILED, - id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", + True, + State.SUCCESS, + 3, + id="429 Too Many Requests (non-JSON body) (requeued after retry delay)", ), pytest.param( HTTPResponse(body="", status=429), 0, False, State.FAILED, + None, id="429 Too Many Requests (empty body)", ), pytest.param( @@ -439,6 +454,7 @@ def test_resource_version_singleton(self): 1, True, State.SUCCESS, + None, id="500 Internal Server Error (webhook failure)", ), pytest.param( @@ -449,6 +465,7 @@ def test_resource_version_singleton(self): 1, True, State.FAILED, + None, id="500 Internal Server Error (webhook failure) (retry failed)", ), ], @@ -463,6 +480,7 @@ def test_run_next_exception_requeue( task_publish_max_retries, should_requeue, task_expected_state, + retry_delay: int | None, data_file, ): """ @@ -520,11 +538,15 @@ def test_run_next_exception_requeue( assert not kubernetes_executor.task_queue.empty() # Disable the ApiException - if task_expected_state == State.SUCCESS: + if task_expected_state == State.SUCCESS or task_expected_state == State.QUEUED: mock_kube_client.create_namespaced_pod.side_effect = None # Execute the task without errors should empty the queue mock_kube_client.create_namespaced_pod.reset_mock() + + if retry_delay: + time.sleep(retry_delay + 1) + kubernetes_executor.sync() assert mock_kube_client.create_namespaced_pod.called assert kubernetes_executor.task_queue.empty() @@ -536,6 +558,82 @@ def test_run_next_exception_requeue( finally: kubernetes_executor.end() + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_skip_pod_creation_on_create_pods_after( + self, + mock_get_kube_client, + mock_kubernetes_job_watcher, + data_file, + ): + """ + Skip pod creation when create_pods_after is in the future + """ + + template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() + # A mock kube_client that throws errors when making a pod + mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True) + mock_kube_client.create_namespaced_pod = mock.MagicMock( + side_effect=ApiException( + http_resp=HTTPResponse( + body="Too many requests, please try again later.", + status=429, + headers={"Retry-After": "999999"}, + ), + ) + ) + mock_get_kube_client.return_value = mock_kube_client + mock_api_client = mock.MagicMock() + mock_api_client.sanitize_for_serialization.return_value = {} + mock_kube_client.api_client = mock_api_client + config = { + ("kubernetes_executor", "pod_template_file"): template_file, + } + with conf_vars(config): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.task_publish_max_retries = 1 + kubernetes_executor.start() + try: + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() + + # There should be only one request to create pod which fails + assert mock_kube_client.create_namespaced_pod.call_count == 1 + # The task is queued + assert not kubernetes_executor.task_queue.empty() + + # sync multiple times to make sure that no other pod is trying to be created + kubernetes_executor.sync() + kubernetes_executor.sync() + kubernetes_executor.sync() + + assert not kubernetes_executor.task_queue.empty() + assert mock_kube_client.create_namespaced_pod.call_count == 1 + + kubernetes_executor.create_pods_after = datetime.now() - timedelta(hours=1) + + mock_kube_client.create_namespaced_pod.side_effect = None + mock_kube_client.create_namespaced_pod.reset_mock() + + kubernetes_executor.sync() + + assert mock_kube_client.create_namespaced_pod.call_count == 1 + + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.QUEUED + + finally: + kubernetes_executor.end() + @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" ) @@ -595,6 +693,7 @@ def test_run_next_pod_reconciliation_error( mock_api_client = mock.MagicMock() mock_api_client.sanitize_for_serialization.return_value = {} mock_kube_client.api_client = mock_api_client + config = {("kubernetes_executor", "pod_template_file"): template_file} with conf_vars(config): kubernetes_executor = self.kubernetes_executor From de936edf5f972009050ef29c46785ac3dedf7b08 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Tue, 31 Mar 2026 13:30:52 +0300 Subject: [PATCH 2/2] fix mypy --- .../executors/test_kubernetes_executor.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 22c4097a719c4..74fb8206c7a74 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -509,11 +509,11 @@ def test_run_next_exception_requeue( template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() # A mock kube_client that throws errors when making a pod mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True) - mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=ApiException(http_resp=response)) + mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=ApiException(http_resp=response)) # type: ignore[attr-defined] mock_get_kube_client.return_value = mock_kube_client mock_api_client = mock.MagicMock() mock_api_client.sanitize_for_serialization.return_value = {} - mock_kube_client.api_client = mock_api_client + mock_kube_client.api_client = mock_api_client # type: ignore[attr-defined] config = { ("kubernetes_executor", "pod_template_file"): template_file, } @@ -532,23 +532,23 @@ def test_run_next_exception_requeue( ) kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.call_count == 1 + assert mock_kube_client.create_namespaced_pod.call_count == 1 # type: ignore[attr-defined] if should_requeue: assert not kubernetes_executor.task_queue.empty() # Disable the ApiException if task_expected_state == State.SUCCESS or task_expected_state == State.QUEUED: - mock_kube_client.create_namespaced_pod.side_effect = None + mock_kube_client.create_namespaced_pod.side_effect = None # type: ignore[attr-defined] # Execute the task without errors should empty the queue - mock_kube_client.create_namespaced_pod.reset_mock() + mock_kube_client.create_namespaced_pod.reset_mock() # type: ignore[attr-defined] if retry_delay: time.sleep(retry_delay + 1) kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.called + assert mock_kube_client.create_namespaced_pod.called # type: ignore[attr-defined] assert kubernetes_executor.task_queue.empty() if task_expected_state != State.SUCCESS: assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state @@ -576,7 +576,7 @@ def test_skip_pod_creation_on_create_pods_after( template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() # A mock kube_client that throws errors when making a pod mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True) - mock_kube_client.create_namespaced_pod = mock.MagicMock( + mock_kube_client.create_namespaced_pod = mock.MagicMock( # type: ignore[attr-defined] side_effect=ApiException( http_resp=HTTPResponse( body="Too many requests, please try again later.", @@ -588,7 +588,7 @@ def test_skip_pod_creation_on_create_pods_after( mock_get_kube_client.return_value = mock_kube_client mock_api_client = mock.MagicMock() mock_api_client.sanitize_for_serialization.return_value = {} - mock_kube_client.api_client = mock_api_client + mock_kube_client.api_client = mock_api_client # type: ignore[attr-defined] config = { ("kubernetes_executor", "pod_template_file"): template_file, } @@ -607,7 +607,7 @@ def test_skip_pod_creation_on_create_pods_after( kubernetes_executor.sync() # There should be only one request to create pod which fails - assert mock_kube_client.create_namespaced_pod.call_count == 1 + assert mock_kube_client.create_namespaced_pod.call_count == 1 # type: ignore[attr-defined] # The task is queued assert not kubernetes_executor.task_queue.empty() @@ -617,7 +617,7 @@ def test_skip_pod_creation_on_create_pods_after( kubernetes_executor.sync() assert not kubernetes_executor.task_queue.empty() - assert mock_kube_client.create_namespaced_pod.call_count == 1 + assert mock_kube_client.create_namespaced_pod.call_count == 1 # type: ignore[attr-defined] kubernetes_executor.create_pods_after = datetime.now() - timedelta(hours=1)