Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import time
from collections import Counter, defaultdict
from contextlib import suppress
from datetime import datetime
from datetime import datetime, timedelta
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -116,6 +116,7 @@ def __init__(self, *args, **kwargs):
"kubernetes_executor", "task_publish_max_retries", fallback=0
)
self.completed: set[KubernetesResults] = set()
self.create_pods_after: datetime | None = None

def _list_pods(self, query_kwargs):
query_kwargs["header_params"] = {
Expand Down Expand Up @@ -313,6 +314,12 @@ def sync(self) -> None:

from kubernetes.client.rest import ApiException

if self.create_pods_after and self.create_pods_after > datetime.now():
self.log.warning("Skipping pod creation due to kubernetes rate limit")
return

self.create_pods_after = None

with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
Expand All @@ -339,15 +346,21 @@ def sync(self) -> None:
# Use the body directly as the message instead.
body = {"message": e.body}

headers = e.headers
retries = self.task_publish_retries[key]
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
# In case of a rate limit, wait and do not create new pods for "Retry-After" seconds
can_retry_publish = (
self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries
)
message = body.get("message", "")
if (
(str(e.status) == "403" and "exceeded quota" in message)
or (str(e.status) == "409" and "object has been modified" in message)
or (str(e.status) == "410" and "too old resource version" in message)
or str(e.status) == "500"
) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
or str(e.status) == "429"
) and can_retry_publish:
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
self.task_publish_retries[key] + 1,
Expand All @@ -356,8 +369,20 @@ def sync(self) -> None:
e.reason,
message,
)

self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1

if str(e.status) == "429":
self.create_pods_after = datetime.now() + timedelta(
seconds=int(headers.get("Retry-After", "0"))
)
self.log.warning(
"Got rate limit from k8s api, skipping pod creation until %s",
self.create_pods_after,
)
# stop pod creation to stop api requests
break
else:
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key = task.key
Expand Down
Loading
Loading