diff --git a/examples/dataproc/using_wrapper.py b/examples/dataproc/using_wrapper.py index 23a53fa1..2810b785 100644 --- a/examples/dataproc/using_wrapper.py +++ b/examples/dataproc/using_wrapper.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import argparse +import grpc import json import logging import os @@ -14,11 +15,22 @@ def main(): logging.basicConfig(level=logging.INFO) arguments = parse_cmd() - if arguments.token: - sdk = yandexcloud.SDK(token=arguments.token, user_agent=USER_AGENT) - else: - with open(arguments.sa_json_path) as infile: - sdk = yandexcloud.SDK(service_account_key=json.load(infile), user_agent=USER_AGENT) + + operation_intercepter = yandexcloud.RetryInterceptor( + max_retry_count=15, + per_call_timeout=30, + back_off_func=yandexcloud.backoff_exponential_jittered_min_interval(), + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + grpc.StatusCode.DEADLINE_EXCEEDED, + ), + non_retriable_codes=() + ) + + sdk = yandexcloud.SDK(user_agent=USER_AGENT, operation_interceptor=operation_intercepter, **get_auth(arguments)) fill_missing_arguments(sdk, arguments) dataproc = sdk.wrappers.Dataproc( @@ -192,6 +204,14 @@ def parse_cmd(): return parser.parse_args() +def get_auth(arguments): + if arguments.token: + return {'token': arguments.token} + with open(arguments.sa_json_path) as infile: + sa = json.load(infile) + return {'service_account_key': sa} + + def fill_missing_arguments(sdk, arguments): if os.path.exists(os.path.expanduser(arguments.ssh_public_key)): with open(arguments.ssh_public_key) as infile: diff --git a/yandexcloud/__init__.py b/yandexcloud/__init__.py index 3243f2f9..f1da0c4b 100644 --- a/yandexcloud/__init__.py +++ b/yandexcloud/__init__.py @@ -3,6 +3,7 @@ # flake8: noqa from yandexcloud._auth_fabric import set_up_yc_api_endpoint from yandexcloud._backoff import ( + backoff_exponential_jittered_min_interval, backoff_exponential_with_jitter, backoff_linear_with_jitter, default_backoff, diff --git a/yandexcloud/_operation_waiter.py b/yandexcloud/_operation_waiter.py index a10a5750..ce6b0540 100644 --- a/yandexcloud/_operation_waiter.py +++ b/yandexcloud/_operation_waiter.py @@ -22,23 +22,23 @@ from yandexcloud._sdk import SDK -def operation_waiter(sdk: "SDK", operation_id: str, timeout: Optional[float]) -> "OperationWaiter": - retriable_codes = ( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - ) +def create_operation_interceptor() -> "RetryInterceptor": # withstand server downtime for ~3.4 minutes with an exponential backoff - retry_interceptor = RetryInterceptor( + return RetryInterceptor( max_retry_count=13, per_call_timeout=30, back_off_func=backoff_exponential_jittered_min_interval(), - retriable_codes=retriable_codes, - ) - operation_service = sdk.client( - OperationServiceStub, - interceptor=retry_interceptor, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + ), ) + + +def operation_waiter(sdk: "SDK", operation_id: str, timeout: Optional[float]) -> "OperationWaiter": + operation_interceptor = sdk._operation_interceptor or create_operation_interceptor() + operation_service = sdk.client(OperationServiceStub, interceptor=operation_interceptor) return OperationWaiter(operation_id, operation_service, timeout) diff --git a/yandexcloud/_retry_interceptor.py b/yandexcloud/_retry_interceptor.py index 3ffb77c9..d322676a 100644 --- a/yandexcloud/_retry_interceptor.py +++ b/yandexcloud/_retry_interceptor.py @@ -46,6 +46,7 @@ def __init__( self, max_retry_count: int = 0, retriable_codes: Iterable["grpc.StatusCode"] = _DEFAULT_RETRIABLE_CODES, + non_retriable_codes: Iterable["grpc.StatusCode"] = _NON_RETRIABLE_CODES, add_retry_count_to_header: bool = False, back_off_func: Optional[Callable[[int], float]] = None, per_call_timeout: Optional[float] = None, @@ -53,6 +54,7 @@ def __init__( # pylint: disable=super-init-not-called self.__max_retry_count = max_retry_count self.__retriable_codes = retriable_codes + self.__non_retriable_codes = non_retriable_codes self.__add_retry_count_to_header = add_retry_count_to_header self.__back_off_func = back_off_func self.__per_call_timeout = per_call_timeout @@ -94,7 +96,7 @@ def __deadline(timeout: Optional[float]) -> Optional[float]: return time.time() + timeout if timeout is not None else None def __is_retriable(self, error: "grpc.StatusCode") -> bool: - if error in self._NON_RETRIABLE_CODES: + if error in self.__non_retriable_codes: return False if error in self.__retriable_codes: diff --git a/yandexcloud/_sdk.py b/yandexcloud/_sdk.py index c48867c2..3d181327 100644 --- a/yandexcloud/_sdk.py +++ b/yandexcloud/_sdk.py @@ -40,6 +40,13 @@ def __init__( private_key: Optional[bytes] = None, certificate_chain: Optional[bytes] = None, retry_policy: Optional[_retry_policy.RetryPolicy] = None, + operation_interceptor: Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, + None, + ] = None, **kwargs: str, ): """ @@ -65,6 +72,7 @@ def __init__( **kwargs, ) self._default_interceptor = interceptor + self._operation_interceptor = operation_interceptor self.helpers = _helpers.Helpers(self) self.wrappers = Wrappers(self)