diff --git a/pyproject.toml b/pyproject.toml index c8fa96a8d..e52ad5ead 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,13 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"] pydantic = ["pydantic>=2.0.0,<3"] openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"] google-adk = ["google-adk>=1.27.0,<2"] +lambda-worker-otel = [ + "opentelemetry-api>=1.11.1,<2", + "opentelemetry-sdk>=1.11.1,<2", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", +] aioboto3 = [ "aioboto3>=10.4.0", "types-aioboto3[s3]>=10.4.0", @@ -69,6 +76,9 @@ dev = [ "googleapis-common-protos==1.70.0", "pytest-rerunfailures>=16.1", "moto[s3,server]>=5", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", ] [tool.poe.tasks] diff --git a/temporalio/client.py b/temporalio/client.py index cc2750ec6..9e5e3494e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2802,6 +2802,29 @@ async def get_worker_task_reachability( ) +class ClientConnectConfig(TypedDict, total=False): + """TypedDict of keyword arguments for :py:meth:`Client.connect`.""" + + target_host: str + namespace: str + api_key: str | None + data_converter: temporalio.converter.DataConverter + plugins: Sequence[Plugin] + interceptors: Sequence[Interceptor] + default_workflow_query_reject_condition: ( + temporalio.common.QueryRejectCondition | None + ) + tls: bool | TLSConfig | None + retry_config: RetryConfig | None + keep_alive_config: KeepAliveConfig | None + rpc_metadata: Mapping[str, str | bytes] + identity: str | None + lazy: bool + runtime: temporalio.runtime.Runtime | None + http_connect_proxy_config: HttpConnectProxyConfig | None + header_codec_behavior: HeaderCodecBehavior + + class ClientConfig(TypedDict, total=False): """TypedDict of config originally passed to :py:meth:`Client`.""" diff --git a/temporalio/contrib/aws/__init__.py b/temporalio/contrib/aws/__init__.py new file mode 100644 index 000000000..a8b8c648f --- /dev/null +++ b/temporalio/contrib/aws/__init__.py @@ -0,0 +1 @@ +"""AWS integrations for Temporal SDK.""" diff --git a/temporalio/contrib/aws/lambda_worker/README.md b/temporalio/contrib/aws/lambda_worker/README.md new file mode 100644 index 000000000..f9166b13d --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/README.md @@ -0,0 +1,104 @@ +# lambda_worker + +A wrapper for running [Temporal](https://temporal.io) workers inside AWS Lambda. A single +`run_worker` call handles the full per-invocation lifecycle: connecting to the Temporal server, +creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully shutting down before +the invocation deadline. + +## Quick start + +```python +# handler.py +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + +from my_workflows import MyWorkflow +from my_activities import my_activity + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +## Configuration + +Client connection settings (address, namespace, TLS, API key) are loaded +automatically from a TOML config file and/or environment variables via +`temporalio.envconfig`. The config file is resolved in order: + +1. `TEMPORAL_CONFIG_FILE` env var, if set. +2. `temporal.toml` in `$LAMBDA_TASK_ROOT` (typically `/var/task`). +3. `temporal.toml` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a `LambdaWorkerConfig` dataclass with fields +pre-populated with Lambda-appropriate defaults. Override any field directly in +the callback. The `task_queue` key in `worker_config` is pre-populated from the +`TEMPORAL_TASK_QUEUE` environment variable if set. + +## Lambda-tuned worker defaults + +The package applies conservative concurrency limits suited to Lambda's resource +constraints: + +| Setting | Default | +| --- | --- | +| `max_concurrent_activities` | 2 | +| `max_concurrent_workflow_tasks` | 10 | +| `max_concurrent_local_activities` | 2 | +| `max_concurrent_nexus_tasks` | 5 | +| `workflow_task_poller_behavior` | `SimpleMaximum(2)` | +| `activity_task_poller_behavior` | `SimpleMaximum(1)` | +| `nexus_task_poller_behavior` | `SimpleMaximum(1)` | +| `graceful_shutdown_timeout` | 5 seconds | +| `max_cached_workflows` | 100 | +| `disable_eager_activity_execution` | Always `True` | + +Worker Deployment Versioning is always enabled. + +## Observability + +Metrics and tracing are opt-in. The `otel` module provides convenience helpers +for AWS Distro for OpenTelemetry (ADOT): + +```python +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker +from temporalio.contrib.aws.lambda_worker.otel import apply_defaults, OtelOptions + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + apply_defaults(config, OtelOptions()) + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +You can also use `apply_metrics` or `apply_tracing` individually. + +If you use OTEL, you can use +[ADOT](https://aws-otel.github.io/docs/getting-started/lambda/lambda-python) +(the AWS Distro For OpenTelemetry) to automatically integrate with AWS +observability functionality. Namely, you will want to add the Lambda layer in +the aforementioned link. We'll handle setting up the SDK for you. diff --git a/temporalio/contrib/aws/lambda_worker/__init__.py b/temporalio/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..11f748c9b --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/__init__.py @@ -0,0 +1,49 @@ +"""A wrapper for running Temporal workers inside AWS Lambda. + +A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the +Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully +shutting down before the invocation deadline. + +Quick start:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + +Configuration +------------- +Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML +config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is +resolved in order: + +1. ``TEMPORAL_CONFIG_FILE`` env var, if set. +2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). +3. ``temporal.toml`` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated +with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue`` +key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if +set. +""" + +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.contrib.aws.lambda_worker._run_worker import run_worker + +__all__ = [ + "LambdaWorkerConfig", + "run_worker", +] diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py new file mode 100644 index 000000000..dd1657f6d --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -0,0 +1,72 @@ +"""Configuration for the Lambda worker.""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from datetime import timedelta + +from temporalio.client import ClientConnectConfig +from temporalio.worker import WorkerConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class LambdaWorkerConfig: + """Passed to the configure callback of :py:func:`run_worker`. + + Fields are pre-populated with Lambda-appropriate defaults before the configure callback is + invoked; the callback may read and override any of them. + + Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options. + The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if + set. + + Attributes: + client_connect_config: Keyword arguments that will be passed to + :py:meth:`temporalio.client.Client.connect`. Pre-populated from the + config file / environment variables via envconfig, with Lambda + defaults applied. + worker_config: Keyword arguments that will be passed to the + :py:class:`temporalio.worker.Worker` constructor (the ``client`` + key is managed internally). Pre-populated with Lambda-appropriate + defaults (low concurrency, eager activities disabled) and + ``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set. + shutdown_deadline_buffer: How long before the Lambda invocation + deadline the worker begins its shutdown sequence (worker drain + + shutdown hooks). Pre-populated to + ``graceful_shutdown_timeout + 2s``. If you change + ``graceful_shutdown_timeout`` in ``worker_config``, adjust this + accordingly. + shutdown_hooks: Functions called at the end of each Lambda invocation, + after the worker has stopped. Run in list order. Each may be sync + or async. Use this to flush telemetry providers or release other + per-process resources. + """ + + client_connect_config: ClientConnectConfig = field( + default_factory=ClientConnectConfig + ) + worker_config: WorkerConfig = field(default_factory=WorkerConfig) + shutdown_deadline_buffer: timedelta = field( + default_factory=lambda: timedelta(seconds=7) + ) + shutdown_hooks: list[Callable[[], Awaitable[None] | None]] = field( + default_factory=list + ) + + +async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction] + config: LambdaWorkerConfig, +) -> None: + """Run all registered shutdown hooks in order, logging errors.""" + for fn in config.shutdown_hooks: + try: + result = fn() + if asyncio.iscoroutine(result): + await result + except Exception as e: + logger.error(f"shutdown hook error: {e}") diff --git a/temporalio/contrib/aws/lambda_worker/_defaults.py b/temporalio/contrib/aws/lambda_worker/_defaults.py new file mode 100644 index 000000000..1b93e3407 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_defaults.py @@ -0,0 +1,84 @@ +"""Lambda-tuned defaults for Temporal worker and client configuration.""" + +from __future__ import annotations + +import os +from collections.abc import Callable +from datetime import timedelta +from pathlib import Path + +from temporalio.worker import PollerBehaviorSimpleMaximum, WorkerConfig + +# ---- Lambda-tuned worker defaults ---- +# Conservative concurrency limits suited to Lambda's resource constraints. + +DEFAULT_MAX_CONCURRENT_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS: int = 10 +DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5 +DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5) +DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2) +DEFAULT_MAX_CACHED_WORKFLOWS: int = 30 + +DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2) +DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) +DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) + +# ---- Environment variable names ---- +ENV_TASK_QUEUE = "TEMPORAL_TASK_QUEUE" +ENV_LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" +ENV_CONFIG_FILE = "TEMPORAL_CONFIG_FILE" +DEFAULT_CONFIG_FILE = "temporal.toml" + + +def apply_lambda_worker_defaults(config: WorkerConfig) -> None: + """Apply Lambda-appropriate defaults to worker config. + + Only sets values that have not already been set (i.e. are absent from *config*). + ``disable_eager_activity_execution`` is always set to ``True``. + """ + config.setdefault("max_concurrent_activities", DEFAULT_MAX_CONCURRENT_ACTIVITIES) + config.setdefault( + "max_concurrent_workflow_tasks", DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + config.setdefault( + "max_concurrent_local_activities", DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + config.setdefault("max_concurrent_nexus_tasks", DEFAULT_MAX_CONCURRENT_NEXUS_TASKS) + config.setdefault("graceful_shutdown_timeout", DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT) + config.setdefault("max_cached_workflows", DEFAULT_MAX_CACHED_WORKFLOWS) + config.setdefault( + "workflow_task_poller_behavior", DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR + ) + config.setdefault( + "activity_task_poller_behavior", DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR + ) + config.setdefault("nexus_task_poller_behavior", DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR) + # Always disable eager activities in Lambda. + config["disable_eager_activity_execution"] = True + + +def build_lambda_identity(request_id: str, function_arn: str) -> str: + """Build a worker identity string from the Lambda invocation context. + + Format: ``@``. + """ + return f"{request_id or 'unknown'}@{function_arn or 'unknown'}" + + +def lambda_default_config_file_path( + getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment] +) -> Path: + """Return the config file path for a Lambda environment. + + Resolution order: + + 1. ``TEMPORAL_CONFIG_FILE`` env var, if set. + 2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). + 3. ``temporal.toml`` in the current working directory. + """ + config_file = getenv(ENV_CONFIG_FILE) + if config_file: + return Path(config_file) + root = getenv(ENV_LAMBDA_TASK_ROOT) or "." + return Path(root) / DEFAULT_CONFIG_FILE diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py new file mode 100644 index 000000000..6a2cc75a3 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +import asyncio +import logging +import os +import sys +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any + +import temporalio.client +import temporalio.worker +from temporalio.client import ClientConnectConfig +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_SHUTDOWN_HOOK_BUFFER, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig, WorkerDeploymentConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class _WorkerDeps: + """External dependencies injected for testability.""" + + connect: Callable[..., Awaitable[temporalio.client.Client]] = field( + default_factory=lambda: temporalio.client.Client.connect + ) + create_worker: Callable[..., temporalio.worker.Worker] = field( + default_factory=lambda: temporalio.worker.Worker + ) + load_config: Callable[[], ClientConfigProfile] | None = None + getenv: Callable[[str], str | None] = field(default_factory=lambda: os.environ.get) + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None] | None = None + + +def _default_load_config(getenv: Callable[[str], str | None]) -> ClientConfigProfile: + config_path = lambda_default_config_file_path(getenv) # type: ignore[arg-type] + return ClientConfigProfile.load(config_source=config_path) + + +def _default_extract_lambda_ctx( + lambda_context: Any, +) -> tuple[str, str] | None: + """Extract (request_id, function_arn) from a Lambda context object.""" + if lambda_context is None: + return None + request_id = getattr(lambda_context, "aws_request_id", None) + function_arn = getattr(lambda_context, "invoked_function_arn", None) + if request_id is not None and function_arn is not None: + return (request_id, function_arn) + return None + + +def run_worker( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], +) -> Callable[[Any, Any], None]: + """Create a Temporal worker Lambda handler. + + Calls the *configure* callback to collect workflow/activity registrations and option overrides, + then returns a Lambda handler function. On each invocation the handler connects to the Temporal + server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation + deadline approaches, and then gracefully shuts down. + + The *version* parameter identifies this worker's deployment version. ``run_worker`` always + enables Worker Deployment Versioning (``use_worker_versioning=True``). To provide a default + versioning behavior for workflows that do not specify one at registration time, set + ``deployment_config`` in ``worker_config`` in the configure callback. + + The returned handler has the signature ``handler(event, context)`` and should be set as your + Lambda function's handler entry point. + + Args: + version: The worker deployment version. Required. + configure: A callback that receives a :py:class:`LambdaWorkerConfig` + (pre-populated with Lambda defaults) and configures workflows, + activities, and options on it. + + Returns: + A Lambda handler function. + + Example:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import ( + LambdaWorkerConfig, + run_worker, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + """ + deps = _WorkerDeps() + try: + return _run_worker_internal(version, configure, deps) + except Exception as e: + logger.error(f"fatal error running lambda worker: {e}") + sys.exit(1) + + +def _run_worker_internal( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], + deps: _WorkerDeps, +) -> Callable[[Any, Any], None]: + """Core logic with injected dependencies for testability.""" + if not version.deployment_name or not version.build_id: + raise ValueError( + "version is required (deployment_name and build_id must be set)" + ) + + # Load client config from envconfig / TOML. + load_config = deps.load_config or (lambda: _default_load_config(deps.getenv)) + profile = load_config() + connect_config: ClientConnectConfig = {**profile.to_client_connect_config()} + + # Build worker config with Lambda defaults. + worker_config: WorkerConfig = {} + apply_lambda_worker_defaults(worker_config) + + # Always enable deployment versioning. + worker_config["deployment_config"] = WorkerDeploymentConfig( + version=version, + use_worker_versioning=True, + ) + + # Calculate default shutdown buffer. + graceful_timeout = worker_config.get( + "graceful_shutdown_timeout", timedelta(seconds=5) + ) + shutdown_buffer = graceful_timeout + DEFAULT_SHUTDOWN_HOOK_BUFFER + + # Pre-populate config with defaults. + config = LambdaWorkerConfig( + client_connect_config=connect_config, + worker_config=worker_config, + shutdown_deadline_buffer=shutdown_buffer, + ) + + # Pre-populate task queue from environment if available. + env_tq = deps.getenv("TEMPORAL_TASK_QUEUE") + if env_tq: + config.worker_config["task_queue"] = env_tq + + # Call user configure callback with pre-populated config. + configure(config) + + # Validate task queue. + if not config.worker_config.get("task_queue"): + raise ValueError( + "task queue not configured: set " + 'worker_config["task_queue"] or the ' + "TEMPORAL_TASK_QUEUE environment variable" + ) + + extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx + + def _handler(_event: Any, lambda_context: Any) -> None: + asyncio.run( + _invocation_handler( + lambda_context=lambda_context, + config=config, + deps=deps, + extract_lambda_ctx=extract_lambda_ctx, + ) + ) + + return _handler + + +async def _invocation_handler( + *, + lambda_context: Any, + config: LambdaWorkerConfig, + deps: _WorkerDeps, + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None], +) -> None: + """Handle a single Lambda invocation.""" + shutdown_buffer = config.shutdown_deadline_buffer + + # Check deadline feasibility. + remaining_ms_fn = getattr(lambda_context, "get_remaining_time_in_millis", None) + deadline_available = remaining_ms_fn is not None + if deadline_available: + assert remaining_ms_fn is not None + remaining = timedelta(milliseconds=remaining_ms_fn()) + work_time = remaining - shutdown_buffer + if work_time <= timedelta(seconds=1): + raise RuntimeError( + f"Lambda timeout is too short: {remaining.total_seconds():.1f}s " + f"remaining but {shutdown_buffer.total_seconds():.1f}s is " + f"reserved for shutdown, leaving no time for work. " + f"Increase the function timeout or decrease the shutdown " + f"deadline buffer" + ) + elif work_time < timedelta(seconds=5): + logger.warning( + "Lambda timeout leaves less than 5s for work after " + "shutdown buffer; consider increasing the function " + "timeout or decreasing the shutdown deadline buffer " + "(work_time=%s, shutdown_buffer=%s)", + work_time, + shutdown_buffer, + ) + + # Build per-invocation connect kwargs with identity from Lambda context. + invocation_connect_kwargs: ClientConnectConfig = {**config.client_connect_config} + if "identity" not in invocation_connect_kwargs: + ctx_info = extract_lambda_ctx(lambda_context) + if ctx_info is not None: + request_id, function_arn = ctx_info + invocation_connect_kwargs["identity"] = build_lambda_identity( + request_id, function_arn + ) + + # Connect to Temporal. + client = await deps.connect(**invocation_connect_kwargs) + + # Create the worker. + worker = deps.create_worker(client, **config.worker_config) + + # Run the worker until the deadline approaches or context is done. + if deadline_available: + assert remaining_ms_fn is not None + work_time_secs = ( + timedelta(milliseconds=remaining_ms_fn()) - shutdown_buffer + ).total_seconds() + if work_time_secs > 0: + try: + await asyncio.wait_for(worker.run(), timeout=work_time_secs) + except asyncio.TimeoutError: + pass + else: + # No deadline - run until cancelled. + await worker.run() + + # Run shutdown hooks after worker has stopped. + await _run_shutdown_hooks(config) diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py new file mode 100644 index 000000000..216e80f48 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -0,0 +1,241 @@ +"""OpenTelemetry helpers for Temporal workers running inside AWS Lambda. + +Use :py:func:`apply_defaults` inside a :py:func:`run_worker` configure callback for a +batteries-included setup that creates an OTel collector exporter and tracing plugin, suitable +for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer. + +Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only +need one. +""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass, field +from datetime import timedelta + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME +from opentelemetry.trace import get_tracer_provider, set_tracer_provider + +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider +from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class OtelOptions: + """Options for :py:func:`apply_defaults`. + + Attributes: + metric_periodicity: How often the Core SDK exports metrics to the + collector. Defaults to 10 seconds. Set this shorter than your + Lambda timeout to ensure at least one export per invocation. + service_name: OTel service name resource attribute. If empty, + falls back to ``OTEL_SERVICE_NAME``, then + ``AWS_LAMBDA_FUNCTION_NAME``, then + ``"temporal-lambda-worker"``. + collector_endpoint: OTLP collector endpoint (e.g. + ``"http://localhost:4317"``). If empty, falls back to + ``OTEL_EXPORTER_OTLP_ENDPOINT``, then + ``"http://localhost:4317"``. + """ + + metric_periodicity: timedelta = field(default_factory=lambda: timedelta(seconds=10)) + service_name: str = "" + collector_endpoint: str = "" + + +def _resolve_service_name(options: OtelOptions) -> str: + service_name = options.service_name + if not service_name: + service_name = os.environ.get("OTEL_SERVICE_NAME", "") + if not service_name: + service_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "") + if not service_name: + service_name = "temporal-lambda-worker" + return service_name + + +def _resolve_endpoint(options: OtelOptions) -> str: + endpoint = options.collector_endpoint + if not endpoint: + endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "") + if not endpoint: + endpoint = "http://localhost:4317" + return endpoint + + +def apply_defaults( + config: LambdaWorkerConfig, + options: OtelOptions | None = None, +) -> None: + """Configure OTel metrics and tracing with AWS Lambda defaults. + + Sets up Core SDK metrics export via a :py:class:`temporalio.runtime.Runtime` with an + :py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds the + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` for distributed tracing with + workflow sandbox passthrough. + + Creates a replay-safe ``TracerProvider`` (with X-Ray ID generator and OTLP gRPC exporter if + available) and sets it as the global OpenTelemetry tracer provider. The + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` uses the global provider, so + it must be set before the worker starts. + + The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by + the ADOT collector Lambda layer. + + Registers a per-invocation ``ForceFlush`` shutdown hook for the global ``TracerProvider`` so + pending traces are exported before each Lambda invocation completes. + + Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread. + There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to + ensure at least one export per invocation. + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + options: Optional overrides for service name, endpoint, etc. + """ + if options is None: + options = OtelOptions() + + endpoint = _resolve_endpoint(options) + service_name = _resolve_service_name(options) + + telemetry_config = build_metrics_telemetry_config( + endpoint=endpoint, + service_name=service_name, + metric_periodicity=options.metric_periodicity, + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + resource = Resource.create({SERVICE_NAME: service_name}) + + # Try to use X-Ray ID generator if available. + try: + from opentelemetry.sdk.extension.aws.trace import ( # type: ignore[reportMissingTypeStubs] + AwsXRayIdGenerator, + ) + + tracer_provider = create_tracer_provider( + resource=resource, id_generator=AwsXRayIdGenerator() + ) + except ImportError: + logger.warning( + "opentelemetry-sdk-extension-aws is not installed; " + "X-Ray trace ID generation is disabled. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + tracer_provider = create_tracer_provider(resource=resource) + + # Use OTLP gRPC exporter if available, otherwise skip trace export. + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + tracer_provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) + ) + except ImportError: + logger.warning( + "opentelemetry-exporter-otlp-proto-grpc is not installed; " + "traces will not be exported to the OTLP collector. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + + # Set as global so the OpenTelemetryPlugin picks it up. + set_tracer_provider(tracer_provider) + + apply_tracing(config) + + +def build_metrics_telemetry_config( + *, + endpoint: str = "", + service_name: str = "", + metric_periodicity: timedelta | None = None, +) -> TelemetryConfig: + """Build a :py:class:`temporalio.runtime.TelemetryConfig` for OTel metrics. + + Returns a ``TelemetryConfig`` with :py:class:`temporalio.runtime.OpenTelemetryConfig` metrics + pointed at the given OTLP collector endpoint. Use this when you need to compose metrics config + with other telemetry settings (e.g. custom logging) into your own + :py:class:`temporalio.runtime.Runtime`. + + Core SDK metrics are exported on the ``metric_periodicity`` interval by the runtime's internal + thread. There is no explicit flush API; set ``metric_periodicity`` short enough to ensure at + least one export per Lambda invocation. + + Example:: + + telemetry = build_metrics_telemetry_config( + endpoint="http://localhost:4317", + service_name="my-service", + ) + # Customize further: + telemetry_config = dataclasses.replace( + telemetry, logging=my_logging_config + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + Args: + endpoint: OTLP collector endpoint. Defaults to + ``http://localhost:4317``. + service_name: OTel service name. Used as a global tag. + metric_periodicity: How often metrics are exported. + + Returns: + A ``TelemetryConfig`` ready to pass to + :py:class:`temporalio.runtime.Runtime`. + """ + if not endpoint: + endpoint = "http://localhost:4317" + + otel_config = OpenTelemetryConfig( + url=endpoint, + metric_periodicity=metric_periodicity, + ) + + global_tags: dict[str, str] = {} + if service_name: + global_tags["service_name"] = service_name + + return TelemetryConfig( + metrics=otel_config, + global_tags=global_tags, + ) + + +def apply_tracing(config: LambdaWorkerConfig) -> None: + """Configure only OTel tracing (no metrics) on the Lambda worker config. + + Adds an :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` to + ``config.worker_config["plugins"]``. The plugin uses the global + ``TracerProvider`` set via ``opentelemetry.trace.set_tracer_provider``. + Ensure your provider is set globally before the worker starts. + + Also registers a ``ForceFlush`` shutdown hook that flushes the global + ``TracerProvider`` (if it supports ``force_flush``). + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + """ + plugin = OpenTelemetryPlugin() + plugins = list(config.worker_config.get("plugins", [])) + plugins.append(plugin) + config.worker_config["plugins"] = plugins + + async def _flush() -> None: + provider = get_tracer_provider() + flush = getattr(provider, "force_flush", None) + if flush is not None: + flush() + + config.shutdown_hooks.append(_flush) diff --git a/temporalio/contrib/aws/s3driver/_driver.py b/temporalio/contrib/aws/s3driver/_driver.py index 481e3a9d4..3b9858813 100644 --- a/temporalio/contrib/aws/s3driver/_driver.py +++ b/temporalio/contrib/aws/s3driver/_driver.py @@ -64,7 +64,7 @@ def __init__( Args: client: An :class:`S3StorageDriverClient` implementation. Use - :func:`~temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to + :func:`temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to wrap an aioboto3 S3 client. bucket: S3 bucket name, access point ARN, or a callable that accepts ``(StorageDriverStoreContext, Payload)`` and returns @@ -73,7 +73,7 @@ def __init__( driver_name: Name of this driver instance. Defaults to ``"aws.s3driver"``. Override when registering multiple S3StorageDriver instances with distinct configurations - under the same :attr:`~temporalio.extstore.Options.drivers` list. + under the same ``temporalio.extstore.Options.drivers`` list. max_payload_size: Maximum serialized payload size in bytes that the driver will accept. Defaults to 52428800 (50 MiB). Raise this value if your workload requires larger payloads; lower it to @@ -105,7 +105,7 @@ async def store( context: StorageDriverStoreContext, payloads: Sequence[Payload], ) -> list[StorageDriverClaim]: - """Stores payloads in S3 and returns a :class:`~temporalio.extstore.DriverClaim` for each one. + """Stores payloads in S3 and returns a ``temporalio.extstore.DriverClaim`` for each one. Payloads are keyed by their SHA-256 hash, so identical serialized bytes share the same S3 object. Deduplication is best-effort because the same @@ -190,7 +190,7 @@ async def retrieve( context: StorageDriverRetrieveContext, # noqa: ARG002 claims: Sequence[StorageDriverClaim], ) -> list[Payload]: - """Retrieves payloads from S3 for the given :class:`~temporalio.extstore.DriverClaim` list.""" + """Retrieves payloads from S3 for the given ``temporalio.extstore.DriverClaim`` list.""" async def _download(claim: StorageDriverClaim) -> Payload: bucket = claim.claim_data["bucket"] diff --git a/tests/contrib/aws/lambda_worker/__init__.py b/tests/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/aws/lambda_worker/test_lambda_worker.py b/tests/contrib/aws/lambda_worker/test_lambda_worker.py new file mode 100644 index 000000000..178e078ac --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_lambda_worker.py @@ -0,0 +1,522 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.""" + +from __future__ import annotations + +from datetime import timedelta +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_MAX_CACHED_WORKFLOWS, + DEFAULT_MAX_CONCURRENT_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_NEXUS_TASKS, + DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.contrib.aws.lambda_worker._run_worker import ( + _run_worker_internal, + _WorkerDeps, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig + +TEST_VERSION = WorkerDeploymentVersion( + deployment_name="test-deployment", + build_id="test-build", +) + + +# ---- LambdaWorkerConfig tests ---- + + +class TestLambdaWorkerConfig: + def test_worker_config_task_queue(self) -> None: + config = LambdaWorkerConfig() + assert config.worker_config.get("task_queue") is None + config.worker_config["task_queue"] = "my-queue" + assert config.worker_config["task_queue"] == "my-queue" + + def test_worker_config_workflows(self) -> None: + config = LambdaWorkerConfig() + + class FakeWorkflow: + pass + + config.worker_config["workflows"] = [FakeWorkflow] + assert FakeWorkflow in config.worker_config["workflows"] + + def test_worker_config_activities(self) -> None: + config = LambdaWorkerConfig() + + def fake_activity() -> None: + pass + + config.worker_config["activities"] = [fake_activity] + assert fake_activity in config.worker_config["activities"] + + def test_client_connect_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.client_connect_config["namespace"] = "custom-ns" + assert config.client_connect_config["namespace"] == "custom-ns" + + def test_worker_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.worker_config["max_concurrent_activities"] = 42 + assert config.worker_config["max_concurrent_activities"] == 42 + + def test_shutdown_deadline_buffer(self) -> None: + config = LambdaWorkerConfig() + config.shutdown_deadline_buffer = timedelta(seconds=5) + assert config.shutdown_deadline_buffer == timedelta(seconds=5) + + def test_shutdown_hooks_list(self) -> None: + config = LambdaWorkerConfig() + fn = MagicMock() + config.shutdown_hooks.append(fn) + assert fn in config.shutdown_hooks + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_in_order(self) -> None: + config = LambdaWorkerConfig() + order: list[str] = [] + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + await _run_shutdown_hooks(config) + assert order == ["first", "second"] + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_async(self) -> None: + config = LambdaWorkerConfig() + called = False + + async def async_hook() -> None: + nonlocal called + called = True + + config.shutdown_hooks.append(async_hook) + await _run_shutdown_hooks(config) + assert called + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_error_continues(self) -> None: + config = LambdaWorkerConfig() + second_called = False + + def failing_hook() -> None: + raise RuntimeError("flush failed") + + def second_hook() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing_hook) + config.shutdown_hooks.append(second_hook) + await _run_shutdown_hooks(config) + assert second_called + + def test_is_dataclass(self) -> None: + import dataclasses + + assert dataclasses.is_dataclass(LambdaWorkerConfig) + + def test_default_field_independence(self) -> None: + """Each instance gets its own mutable containers.""" + a = LambdaWorkerConfig() + b = LambdaWorkerConfig() + a.worker_config["max_concurrent_activities"] = 99 + assert "max_concurrent_activities" not in b.worker_config + + +# ---- Defaults tests ---- + + +class TestDefaults: + def test_apply_lambda_worker_defaults(self) -> None: + config: WorkerConfig = {} + apply_lambda_worker_defaults(config) + assert ( + config.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + ) + assert ( + config.get("max_concurrent_workflow_tasks") + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert ( + config.get("max_concurrent_local_activities") + == DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + assert ( + config.get("max_concurrent_nexus_tasks") + == DEFAULT_MAX_CONCURRENT_NEXUS_TASKS + ) + assert config.get("max_cached_workflows") == DEFAULT_MAX_CACHED_WORKFLOWS + assert config.get("disable_eager_activity_execution") is True + + def test_apply_lambda_worker_defaults_preserves_existing(self) -> None: + config: WorkerConfig = { + "max_concurrent_activities": 50, + "graceful_shutdown_timeout": timedelta(seconds=10), + } + apply_lambda_worker_defaults(config) + assert config.get("max_concurrent_activities") == 50 + assert config.get("graceful_shutdown_timeout") == timedelta(seconds=10) + assert config.get("disable_eager_activity_execution") is True + + def test_build_lambda_identity(self) -> None: + assert ( + build_lambda_identity("req-123", "arn:aws:lambda:us-east-1:123:function:f") + == "req-123@arn:aws:lambda:us-east-1:123:function:f" + ) + + def test_build_lambda_identity_empty(self) -> None: + assert build_lambda_identity("", "") == "unknown@unknown" + + def test_lambda_default_config_file_path_env_var(self) -> None: + env = {"TEMPORAL_CONFIG_FILE": "/custom/path.toml"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("/custom/path.toml") + ) + + def test_lambda_default_config_file_path_lambda_root(self) -> None: + env = {"LAMBDA_TASK_ROOT": "/var/task"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("/var/task/temporal.toml") + ) + + def test_lambda_default_config_file_path_cwd(self) -> None: + env: dict[str, str] = {} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("temporal.toml") + ) + + +# ---- RunWorker tests ---- + + +def _make_lambda_context( + *, + remaining_ms: int = 3_600_000, + request_id: str = "req-123", + function_arn: str = "arn:aws:lambda:us-east-1:123:function:my-func", +) -> Any: + """Create a mock Lambda context object.""" + ctx = MagicMock() + ctx.get_remaining_time_in_millis.return_value = remaining_ms + ctx.aws_request_id = request_id + ctx.invoked_function_arn = function_arn + return ctx + + +def _make_test_deps( + *, + connect_kwargs_capture: list[dict[str, Any]] | None = None, + worker_kwargs_capture: list[dict[str, Any]] | None = None, +) -> _WorkerDeps: + """Create test deps with mocked connect and worker.""" + mock_client = MagicMock() + mock_worker = MagicMock() + mock_worker.run = AsyncMock() + + async def fake_connect(**kwargs: Any) -> Any: + if connect_kwargs_capture is not None: + connect_kwargs_capture.append(kwargs) + return mock_client + + def fake_create_worker(_client: Any, **kwargs: Any) -> Any: + if worker_kwargs_capture is not None: + worker_kwargs_capture.append(kwargs) + return mock_worker + + return _WorkerDeps( + connect=fake_connect, + create_worker=fake_create_worker, + load_config=lambda: ClientConfigProfile(), + getenv={"TEMPORAL_TASK_QUEUE": "test-queue"}.get, # type: ignore[arg-type] + extract_lambda_ctx=lambda ctx: ( + ctx.aws_request_id, + ctx.invoked_function_arn, + ) + if hasattr(ctx, "aws_request_id") + else None, + ) + + +class TestRunWorkerInternal: + def test_returns_handler(self) -> None: + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + assert callable(handler) + + def test_success(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["workflows"] = [type("FakeWf", (), {})] + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + + def test_configure_callback_error(self) -> None: + deps = _make_test_deps() + + def bad_configure(_config: LambdaWorkerConfig) -> None: + raise RuntimeError("bad config") + + with pytest.raises(RuntimeError, match="bad config"): + _run_worker_internal(TEST_VERSION, bad_configure, deps) + + def test_missing_task_queue(self) -> None: + deps = _make_test_deps() + deps.getenv = lambda _: None # type: ignore[assignment] + with pytest.raises(ValueError, match="task queue not configured"): + _run_worker_internal(TEST_VERSION, lambda config: None, deps) + + def test_missing_version(self) -> None: + deps = _make_test_deps() + with pytest.raises(ValueError, match="version is required"): + _run_worker_internal( + WorkerDeploymentVersion(deployment_name="", build_id=""), + lambda config: None, + deps, + ) + + def test_user_overrides_applied(self) -> None: + connect_capture: list[dict[str, Any]] = [] + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps( + connect_kwargs_capture=connect_capture, + worker_kwargs_capture=worker_capture, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "user-queue" + config.client_connect_config["namespace"] = "custom-ns" + config.worker_config["max_concurrent_activities"] = 99 + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + + assert connect_capture[0]["namespace"] == "custom-ns" + assert worker_capture[0]["max_concurrent_activities"] == 99 + + def test_lambda_defaults_applied(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, _make_lambda_context()) + + kwargs = worker_capture[0] + assert kwargs["max_concurrent_activities"] == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert ( + kwargs["max_concurrent_workflow_tasks"] + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert kwargs["disable_eager_activity_execution"] is True + dc = kwargs["deployment_config"] + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + def test_identity_from_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler( + {}, + _make_lambda_context( + request_id="req-abc-123", + function_arn="arn:aws:lambda:us-east-1:123456:function:my-func", + ), + ) + + assert ( + connect_capture[0]["identity"] + == "req-abc-123@arn:aws:lambda:us-east-1:123456:function:my-func" + ) + + def test_identity_user_override_wins(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + + def configure(config: LambdaWorkerConfig) -> None: + config.client_connect_config["identity"] = "my-custom-identity" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert connect_capture[0]["identity"] == "my-custom-identity" + + def test_identity_no_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + deps.extract_lambda_ctx = lambda ctx: None + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, MagicMock(spec=[])) + assert "identity" not in connect_capture[0] + + def test_shutdown_hooks_called(self) -> None: + deps = _make_test_deps() + shutdown_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_called + shutdown_called = True + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert shutdown_called + + def test_shutdown_hooks_called_per_invocation(self) -> None: + deps = _make_test_deps() + shutdown_count = 0 + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_count + shutdown_count += 1 + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert shutdown_count == 3 + + def test_shutdown_hooks_multiple_funcs_order(self) -> None: + deps = _make_test_deps() + order: list[str] = [] + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert order == ["first", "second"] + + def test_shutdown_hooks_error_continues(self) -> None: + deps = _make_test_deps() + second_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def failing() -> None: + raise RuntimeError("flush failed") + + def second() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing) + config.shutdown_hooks.append(second) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert second_called + + def test_tight_deadline_raises_error(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=1500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with pytest.raises(RuntimeError, match="Lambda timeout is too short"): + handler({}, _make_lambda_context(remaining_ms=2000)) + + def test_tight_deadline_logs_warning(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with patch( + "temporalio.contrib.aws.lambda_worker._run_worker.logger" + ) as mock_logger: + handler({}, _make_lambda_context(remaining_ms=2000)) + mock_logger.warning.assert_called_once() + assert "less than 5s" in mock_logger.warning.call_args[0][0] + + def test_per_invocation_lifecycle(self) -> None: + """Each invocation creates its own client and worker.""" + connect_count = 0 + deps = _make_test_deps() + original_connect = deps.connect + + async def counting_connect(**kwargs: Any) -> Any: + nonlocal connect_count + connect_count += 1 + return await original_connect(**kwargs) + + deps.connect = counting_connect + + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert connect_count == 3 + + def test_task_queue_from_config(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + deps.getenv = lambda _: None # type: ignore[assignment] + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "explicit-queue" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert worker_capture[0]["task_queue"] == "explicit-queue" + + def test_task_queue_pre_populated_from_env(self) -> None: + """Task queue is pre-populated from TEMPORAL_TASK_QUEUE env var.""" + deps = _make_test_deps() + task_queues: list[str | None] = [] + + def configure(config: LambdaWorkerConfig) -> None: + task_queues.append(config.worker_config.get("task_queue")) + + _run_worker_internal(TEST_VERSION, configure, deps) + assert task_queues[0] == "test-queue" + + def test_config_pre_populated_with_defaults(self) -> None: + """Configure callback receives pre-populated LambdaWorkerConfig.""" + deps = _make_test_deps() + captured: list[LambdaWorkerConfig] = [] + + def configure(config: LambdaWorkerConfig) -> None: + captured.append(config) + + _run_worker_internal(TEST_VERSION, configure, deps) + wc = captured[0].worker_config + assert wc.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert wc.get("disable_eager_activity_execution") is True + dc = wc.get("deployment_config") + assert dc is not None + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + def test_no_deadline_runs_until_complete(self) -> None: + """When no deadline is available, worker runs until it completes.""" + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + ctx = MagicMock(spec=["aws_request_id", "invoked_function_arn"]) + ctx.aws_request_id = "req-123" + ctx.invoked_function_arn = "arn:aws:lambda:us-east-1:123:function:f" + handler({}, ctx) diff --git a/tests/contrib/aws/lambda_worker/test_otel.py b/tests/contrib/aws/lambda_worker/test_otel.py new file mode 100644 index 000000000..97f3d9647 --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_otel.py @@ -0,0 +1,167 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.otel.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import patch + +import pytest + +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker.otel import ( + OtelOptions, + apply_defaults, + apply_tracing, + build_metrics_telemetry_config, +) +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin +from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig + + +class TestApplyTracing: + def test_adds_plugin(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) + + def test_appends_to_existing_plugins(self) -> None: + config = LambdaWorkerConfig() + existing = OpenTelemetryPlugin() + config.worker_config["plugins"] = [existing] + apply_tracing(config) + plugins = config.worker_config["plugins"] + assert len(plugins) == 2 + assert plugins[0] is existing + + def test_registers_flush_shutdown_hook(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + assert len(config.shutdown_hooks) == 1 + + @pytest.mark.asyncio + async def test_shutdown_hook_flushes(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + # Should not raise even with the default noop global provider. + await _run_shutdown_hooks(config) + + +class TestBuildMetricsTelemetryConfig: + def test_returns_telemetry_config(self) -> None: + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + assert isinstance(tc, TelemetryConfig) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_default_endpoint(self) -> None: + tc = build_metrics_telemetry_config() + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_service_name_as_global_tag(self) -> None: + tc = build_metrics_telemetry_config(service_name="my-svc") + assert tc.global_tags.get("service_name") == "my-svc" + + def test_no_service_name_no_tag(self) -> None: + tc = build_metrics_telemetry_config() + assert "service_name" not in tc.global_tags + + def test_metric_periodicity(self) -> None: + tc = build_metrics_telemetry_config(metric_periodicity=timedelta(seconds=30)) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.metric_periodicity == timedelta(seconds=30) + + def test_composable_with_custom_runtime(self) -> None: + """User can compose the returned config into a custom Runtime.""" + import dataclasses + + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + custom_tc = dataclasses.replace(tc, logging=None) + assert custom_tc.logging is None + assert isinstance(custom_tc.metrics, OpenTelemetryConfig) + + +class TestApplyDefaults: + def test_configures_metrics_and_tracing(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(collector_endpoint="http://localhost:4317")) + + # Metrics: runtime should be set. + assert "runtime" in config.client_connect_config + # Tracing: plugin should be added. + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) + # Shutdown hook for tracer flush. + assert len(config.shutdown_hooks) == 1 + + def test_sets_global_tracer_provider(self) -> None: + from opentelemetry.trace import get_tracer_provider + + from temporalio.contrib.opentelemetry._tracer_provider import ( + ReplaySafeTracerProvider, + ) + + config = LambdaWorkerConfig() + apply_defaults(config) + provider = get_tracer_provider() + assert isinstance(provider, ReplaySafeTracerProvider) + + def test_service_name_from_options(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(service_name="my-service")) + assert "runtime" in config.client_connect_config + + def test_service_name_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict("os.environ", {"OTEL_SERVICE_NAME": "env-service"}): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_service_name_from_lambda_function_name(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"AWS_LAMBDA_FUNCTION_NAME": "my-lambda"}, + clear=True, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_endpoint_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://custom:4317"}, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_default_options_used_when_none(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config) + assert "runtime" in config.client_connect_config + assert len(config.shutdown_hooks) == 1 + + +class TestOtelOptions: + def test_defaults(self) -> None: + opts = OtelOptions() + assert opts.service_name == "" + assert opts.collector_endpoint == "" + assert opts.metric_periodicity == timedelta(seconds=10) + + def test_custom_values(self) -> None: + opts = OtelOptions( + service_name="svc", + collector_endpoint="http://host:4317", + metric_periodicity=timedelta(seconds=30), + ) + assert opts.service_name == "svc" + assert opts.collector_endpoint == "http://host:4317" + assert opts.metric_periodicity == timedelta(seconds=30) diff --git a/tests/test_client.py b/tests/test_client.py index b8bebdaf7..530c166f0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1552,6 +1552,21 @@ def test_fork_create_client( self.run(mp_fork_ctx) +def test_client_connect_config_matches_connect_params(): + """ClientConnectConfig TypedDict keys must match Client.connect kwargs.""" + import inspect + + from temporalio.client import Client, ClientConnectConfig + + connect_params = set(inspect.signature(Client.connect).parameters.keys()) - {"cls"} + config_keys = set(ClientConnectConfig.__annotations__.keys()) + assert config_keys == connect_params, ( + f"ClientConnectConfig is out of sync with Client.connect. " + f"Missing from config: {connect_params - config_keys}. " + f"Extra in config: {config_keys - connect_params}." + ) + + class TestForkUseClient(_TestFork): async def coro(self): await self._client.start_workflow( diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 4c76a7ba9..4aa366735 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1543,3 +1543,18 @@ async def test_continue_as_new_with_version_upgrade( # Expect workflow to return "v2.0", indicating that it continued-as-new and completed on v2 result = await handle.result() assert result == "v2.0" + + +def test_worker_config_matches_init_params(): + """WorkerConfig TypedDict keys must match Worker.__init__ kwargs.""" + import inspect + + from temporalio.worker import Worker, WorkerConfig + + init_params = set(inspect.signature(Worker.__init__).parameters.keys()) - {"self"} + config_keys = set(WorkerConfig.__annotations__.keys()) + assert config_keys == init_params, ( + f"WorkerConfig is out of sync with Worker.__init__. " + f"Missing from config: {init_params - config_keys}. " + f"Extra in config: {config_keys - init_params}." + ) diff --git a/uv.lock b/uv.lock index c63faefad..8e1bc6b60 100644 --- a/uv.lock +++ b/uv.lock @@ -1768,7 +1768,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/38/3f/9859f655d11901e7b2996c6e3d33e0caa9a1d4572c3bc61ed0faa64b2f4c/greenlet-3.3.2-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:9bc885b89709d901859cf95179ec9f6bb67a3d2bb1f0e88456461bd4b7f8fd0d", size = 277747, upload-time = "2026-02-20T20:16:21.325Z" }, { url = "https://files.pythonhosted.org/packages/fb/07/cb284a8b5c6498dbd7cba35d31380bb123d7dceaa7907f606c8ff5993cbf/greenlet-3.3.2-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b568183cf65b94919be4438dc28416b234b678c608cafac8874dfeeb2a9bbe13", size = 579202, upload-time = "2026-02-20T20:47:28.955Z" }, { url = "https://files.pythonhosted.org/packages/ed/45/67922992b3a152f726163b19f890a85129a992f39607a2a53155de3448b8/greenlet-3.3.2-cp310-cp310-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:527fec58dc9f90efd594b9b700662ed3fb2493c2122067ac9c740d98080a620e", size = 590620, upload-time = "2026-02-20T20:55:55.581Z" }, - { url = "https://files.pythonhosted.org/packages/03/5f/6e2a7d80c353587751ef3d44bb947f0565ec008a2e0927821c007e96d3a7/greenlet-3.3.2-cp310-cp310-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:508c7f01f1791fbc8e011bd508f6794cb95397fdb198a46cb6635eb5b78d85a7", size = 602132, upload-time = "2026-02-20T21:02:43.261Z" }, { url = "https://files.pythonhosted.org/packages/ad/55/9f1ebb5a825215fadcc0f7d5073f6e79e3007e3282b14b22d6aba7ca6cb8/greenlet-3.3.2-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ad0c8917dd42a819fe77e6bdfcb84e3379c0de956469301d9fd36427a1ca501f", size = 591729, upload-time = "2026-02-20T20:20:58.395Z" }, { url = "https://files.pythonhosted.org/packages/24/b4/21f5455773d37f94b866eb3cf5caed88d6cea6dd2c6e1f9c34f463cba3ec/greenlet-3.3.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:97245cc10e5515dbc8c3104b2928f7f02b6813002770cfaffaf9a6e0fc2b94ef", size = 1551946, upload-time = "2026-02-20T20:49:31.102Z" }, { url = "https://files.pythonhosted.org/packages/00/68/91f061a926abead128fe1a87f0b453ccf07368666bd59ffa46016627a930/greenlet-3.3.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:8c1fdd7d1b309ff0da81d60a9688a8bd044ac4e18b250320a96fc68d31c209ca", size = 1618494, upload-time = "2026-02-20T20:21:06.541Z" }, @@ -1776,7 +1775,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f3/47/16400cb42d18d7a6bb46f0626852c1718612e35dcb0dffa16bbaffdf5dd2/greenlet-3.3.2-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:c56692189a7d1c7606cb794be0a8381470d95c57ce5be03fb3d0ef57c7853b86", size = 278890, upload-time = "2026-02-20T20:19:39.263Z" }, { url = "https://files.pythonhosted.org/packages/a3/90/42762b77a5b6aa96cd8c0e80612663d39211e8ae8a6cd47c7f1249a66262/greenlet-3.3.2-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1ebd458fa8285960f382841da585e02201b53a5ec2bac6b156fc623b5ce4499f", size = 581120, upload-time = "2026-02-20T20:47:30.161Z" }, { url = "https://files.pythonhosted.org/packages/bf/6f/f3d64f4fa0a9c7b5c5b3c810ff1df614540d5aa7d519261b53fba55d4df9/greenlet-3.3.2-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a443358b33c4ec7b05b79a7c8b466f5d275025e750298be7340f8fc63dff2a55", size = 594363, upload-time = "2026-02-20T20:55:56.965Z" }, - { url = "https://files.pythonhosted.org/packages/9c/8b/1430a04657735a3f23116c2e0d5eb10220928846e4537a938a41b350bed6/greenlet-3.3.2-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:4375a58e49522698d3e70cc0b801c19433021b5c37686f7ce9c65b0d5c8677d2", size = 605046, upload-time = "2026-02-20T21:02:45.234Z" }, { url = "https://files.pythonhosted.org/packages/72/83/3e06a52aca8128bdd4dcd67e932b809e76a96ab8c232a8b025b2850264c5/greenlet-3.3.2-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8e2cd90d413acbf5e77ae41e5d3c9b3ac1d011a756d7284d7f3f2b806bbd6358", size = 594156, upload-time = "2026-02-20T20:20:59.955Z" }, { url = "https://files.pythonhosted.org/packages/70/79/0de5e62b873e08fe3cef7dbe84e5c4bc0e8ed0c7ff131bccb8405cd107c8/greenlet-3.3.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:442b6057453c8cb29b4fb36a2ac689382fc71112273726e2423f7f17dc73bf99", size = 1554649, upload-time = "2026-02-20T20:49:32.293Z" }, { url = "https://files.pythonhosted.org/packages/5a/00/32d30dee8389dc36d42170a9c66217757289e2afb0de59a3565260f38373/greenlet-3.3.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:45abe8eb6339518180d5a7fa47fa01945414d7cca5ecb745346fc6a87d2750be", size = 1619472, upload-time = "2026-02-20T20:21:07.966Z" }, @@ -1785,7 +1783,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ea/ab/1608e5a7578e62113506740b88066bf09888322a311cff602105e619bd87/greenlet-3.3.2-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:ac8d61d4343b799d1e526db579833d72f23759c71e07181c2d2944e429eb09cd", size = 280358, upload-time = "2026-02-20T20:17:43.971Z" }, { url = "https://files.pythonhosted.org/packages/a5/23/0eae412a4ade4e6623ff7626e38998cb9b11e9ff1ebacaa021e4e108ec15/greenlet-3.3.2-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ceec72030dae6ac0c8ed7591b96b70410a8be370b6a477b1dbc072856ad02bd", size = 601217, upload-time = "2026-02-20T20:47:31.462Z" }, { url = "https://files.pythonhosted.org/packages/f8/16/5b1678a9c07098ecb9ab2dd159fafaf12e963293e61ee8d10ecb55273e5e/greenlet-3.3.2-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a2a5be83a45ce6188c045bcc44b0ee037d6a518978de9a5d97438548b953a1ac", size = 611792, upload-time = "2026-02-20T20:55:58.423Z" }, - { url = "https://files.pythonhosted.org/packages/5c/c5/cc09412a29e43406eba18d61c70baa936e299bc27e074e2be3806ed29098/greenlet-3.3.2-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:ae9e21c84035c490506c17002f5c8ab25f980205c3e61ddb3a2a2a2e6c411fcb", size = 626250, upload-time = "2026-02-20T21:02:46.596Z" }, { url = "https://files.pythonhosted.org/packages/50/1f/5155f55bd71cabd03765a4aac9ac446be129895271f73872c36ebd4b04b6/greenlet-3.3.2-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:43e99d1749147ac21dde49b99c9abffcbc1e2d55c67501465ef0930d6e78e070", size = 613875, upload-time = "2026-02-20T20:21:01.102Z" }, { url = "https://files.pythonhosted.org/packages/fc/dd/845f249c3fcd69e32df80cdab059b4be8b766ef5830a3d0aa9d6cad55beb/greenlet-3.3.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:4c956a19350e2c37f2c48b336a3afb4bff120b36076d9d7fb68cb44e05d95b79", size = 1571467, upload-time = "2026-02-20T20:49:33.495Z" }, { url = "https://files.pythonhosted.org/packages/2a/50/2649fe21fcc2b56659a452868e695634722a6655ba245d9f77f5656010bf/greenlet-3.3.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6c6f8ba97d17a1e7d664151284cb3315fc5f8353e75221ed4324f84eb162b395", size = 1640001, upload-time = "2026-02-20T20:21:09.154Z" }, @@ -1794,7 +1791,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ac/48/f8b875fa7dea7dd9b33245e37f065af59df6a25af2f9561efa8d822fde51/greenlet-3.3.2-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:aa6ac98bdfd716a749b84d4034486863fd81c3abde9aa3cf8eff9127981a4ae4", size = 279120, upload-time = "2026-02-20T20:19:01.9Z" }, { url = "https://files.pythonhosted.org/packages/49/8d/9771d03e7a8b1ee456511961e1b97a6d77ae1dea4a34a5b98eee706689d3/greenlet-3.3.2-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ab0c7e7901a00bc0a7284907273dc165b32e0d109a6713babd04471327ff7986", size = 603238, upload-time = "2026-02-20T20:47:32.873Z" }, { url = "https://files.pythonhosted.org/packages/59/0e/4223c2bbb63cd5c97f28ffb2a8aee71bdfb30b323c35d409450f51b91e3e/greenlet-3.3.2-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:d248d8c23c67d2291ffd47af766e2a3aa9fa1c6703155c099feb11f526c63a92", size = 614219, upload-time = "2026-02-20T20:55:59.817Z" }, - { url = "https://files.pythonhosted.org/packages/94/2b/4d012a69759ac9d77210b8bfb128bc621125f5b20fc398bce3940d036b1c/greenlet-3.3.2-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:ccd21bb86944ca9be6d967cf7691e658e43417782bce90b5d2faeda0ff78a7dd", size = 628268, upload-time = "2026-02-20T21:02:48.024Z" }, { url = "https://files.pythonhosted.org/packages/7a/34/259b28ea7a2a0c904b11cd36c79b8cef8019b26ee5dbe24e73b469dea347/greenlet-3.3.2-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b6997d360a4e6a4e936c0f9625b1c20416b8a0ea18a8e19cabbefc712e7397ab", size = 616774, upload-time = "2026-02-20T20:21:02.454Z" }, { url = "https://files.pythonhosted.org/packages/0a/03/996c2d1689d486a6e199cb0f1cf9e4aa940c500e01bdf201299d7d61fa69/greenlet-3.3.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:64970c33a50551c7c50491671265d8954046cb6e8e2999aacdd60e439b70418a", size = 1571277, upload-time = "2026-02-20T20:49:34.795Z" }, { url = "https://files.pythonhosted.org/packages/d9/c4/2570fc07f34a39f2caf0bf9f24b0a1a0a47bc2e8e465b2c2424821389dfc/greenlet-3.3.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:1a9172f5bf6bd88e6ba5a84e0a68afeac9dc7b6b412b245dd64f52d83c81e55b", size = 1640455, upload-time = "2026-02-20T20:21:10.261Z" }, @@ -1803,7 +1799,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3f/ae/8bffcbd373b57a5992cd077cbe8858fff39110480a9d50697091faea6f39/greenlet-3.3.2-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:8d1658d7291f9859beed69a776c10822a0a799bc4bfe1bd4272bb60e62507dab", size = 279650, upload-time = "2026-02-20T20:18:00.783Z" }, { url = "https://files.pythonhosted.org/packages/d1/c0/45f93f348fa49abf32ac8439938726c480bd96b2a3c6f4d949ec0124b69f/greenlet-3.3.2-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:18cb1b7337bca281915b3c5d5ae19f4e76d35e1df80f4ad3c1a7be91fadf1082", size = 650295, upload-time = "2026-02-20T20:47:34.036Z" }, { url = "https://files.pythonhosted.org/packages/b3/de/dd7589b3f2b8372069ab3e4763ea5329940fc7ad9dcd3e272a37516d7c9b/greenlet-3.3.2-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:c2e47408e8ce1c6f1ceea0dffcdf6ebb85cc09e55c7af407c99f1112016e45e9", size = 662163, upload-time = "2026-02-20T20:56:01.295Z" }, - { url = "https://files.pythonhosted.org/packages/cd/ac/85804f74f1ccea31ba518dcc8ee6f14c79f73fe36fa1beba38930806df09/greenlet-3.3.2-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:e3cb43ce200f59483eb82949bf1835a99cf43d7571e900d7c8d5c62cdf25d2f9", size = 675371, upload-time = "2026-02-20T21:02:49.664Z" }, { url = "https://files.pythonhosted.org/packages/d2/d8/09bfa816572a4d83bccd6750df1926f79158b1c36c5f73786e26dbe4ee38/greenlet-3.3.2-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:63d10328839d1973e5ba35e98cccbca71b232b14051fd957b6f8b6e8e80d0506", size = 664160, upload-time = "2026-02-20T20:21:04.015Z" }, { url = "https://files.pythonhosted.org/packages/48/cf/56832f0c8255d27f6c35d41b5ec91168d74ec721d85f01a12131eec6b93c/greenlet-3.3.2-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:8e4ab3cfb02993c8cc248ea73d7dae6cec0253e9afa311c9b37e603ca9fad2ce", size = 1619181, upload-time = "2026-02-20T20:49:36.052Z" }, { url = "https://files.pythonhosted.org/packages/0a/23/b90b60a4aabb4cec0796e55f25ffbfb579a907c3898cd2905c8918acaa16/greenlet-3.3.2-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:94ad81f0fd3c0c0681a018a976e5c2bd2ca2d9d94895f23e7bb1af4e8af4e2d5", size = 1687713, upload-time = "2026-02-20T20:21:11.684Z" }, @@ -1812,7 +1807,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/98/6d/8f2ef704e614bcf58ed43cfb8d87afa1c285e98194ab2cfad351bf04f81e/greenlet-3.3.2-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:e26e72bec7ab387ac80caa7496e0f908ff954f31065b0ffc1f8ecb1338b11b54", size = 286617, upload-time = "2026-02-20T20:19:29.856Z" }, { url = "https://files.pythonhosted.org/packages/5e/0d/93894161d307c6ea237a43988f27eba0947b360b99ac5239ad3fe09f0b47/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8b466dff7a4ffda6ca975979bab80bdadde979e29fc947ac3be4451428d8b0e4", size = 655189, upload-time = "2026-02-20T20:47:35.742Z" }, { url = "https://files.pythonhosted.org/packages/f5/2c/d2d506ebd8abcb57386ec4f7ba20f4030cbe56eae541bc6fd6ef399c0b41/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:b8bddc5b73c9720bea487b3bffdb1840fe4e3656fba3bd40aa1489e9f37877ff", size = 658225, upload-time = "2026-02-20T20:56:02.527Z" }, - { url = "https://files.pythonhosted.org/packages/d1/67/8197b7e7e602150938049d8e7f30de1660cfb87e4c8ee349b42b67bdb2e1/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:59b3e2c40f6706b05a9cd299c836c6aa2378cabe25d021acd80f13abf81181cf", size = 666581, upload-time = "2026-02-20T21:02:51.526Z" }, { url = "https://files.pythonhosted.org/packages/8e/30/3a09155fbf728673a1dea713572d2d31159f824a37c22da82127056c44e4/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b26b0f4428b871a751968285a1ac9648944cea09807177ac639b030bddebcea4", size = 657907, upload-time = "2026-02-20T20:21:05.259Z" }, { url = "https://files.pythonhosted.org/packages/f3/fd/d05a4b7acd0154ed758797f0a43b4c0962a843bedfe980115e842c5b2d08/greenlet-3.3.2-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:1fb39a11ee2e4d94be9a76671482be9398560955c9e568550de0224e41104727", size = 1618857, upload-time = "2026-02-20T20:49:37.309Z" }, { url = "https://files.pythonhosted.org/packages/6f/e1/50ee92a5db521de8f35075b5eff060dd43d39ebd46c2181a2042f7070385/greenlet-3.3.2-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:20154044d9085151bc309e7689d6f7ba10027f8f5a8c0676ad398b951913d89e", size = 1680010, upload-time = "2026-02-20T20:21:13.427Z" }, @@ -3379,6 +3373,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/13/b4ef09837409a777f3c0af2a5b4ba9b7af34872bc43609dda0c209e4060d/opentelemetry_exporter_otlp_proto_common-1.37.0-py3-none-any.whl", hash = "sha256:53038428449c559b0c564b8d718df3314da387109c4d36bd1b94c9a641b0292e", size = 18359, upload-time = "2025-09-11T10:28:44.939Z" }, ] +[[package]] +name = "opentelemetry-exporter-otlp-proto-grpc" +version = "1.37.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/11/4ad0979d0bb13ae5a845214e97c8d42da43980034c30d6f72d8e0ebe580e/opentelemetry_exporter_otlp_proto_grpc-1.37.0.tar.gz", hash = "sha256:f55bcb9fc848ce05ad3dd954058bc7b126624d22c4d9e958da24d8537763bec5", size = 24465, upload-time = "2025-09-11T10:29:04.172Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/17/46630b74751031a658706bef23ac99cdc2953cd3b2d28ec90590a0766b3e/opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl", hash = "sha256:aee5104835bf7993b7ddaaf380b6467472abaedb1f1dbfcc54a52a7d781a3890", size = 19305, upload-time = "2025-09-11T10:28:45.776Z" }, +] + [[package]] name = "opentelemetry-exporter-otlp-proto-http" version = "1.37.0" @@ -3453,6 +3465,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/62/9f4ad6a54126fb00f7ed4bb5034964c6e4f00fcd5a905e115bd22707e20d/opentelemetry_sdk-1.37.0-py3-none-any.whl", hash = "sha256:8f3c3c22063e52475c5dbced7209495c2c16723d016d39287dfc215d1771257c", size = 131941, upload-time = "2025-09-11T10:28:57.83Z" }, ] +[[package]] +name = "opentelemetry-sdk-extension-aws" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f5/b3/825c93fe4c238845f1356297abea33d03b2adaafb5ae98fc257b394de124/opentelemetry_sdk_extension_aws-2.1.0.tar.gz", hash = "sha256:ff68ddecc1910f62c019d22ec0f7461713ead7f662d6a2304d4089c1a0b20416", size = 16334, upload-time = "2024-12-24T15:01:57.387Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/02/61/47a6a43b7935d54b5734fbf3fb0357dd5a7d0dfaa9677b7318518fe8d507/opentelemetry_sdk_extension_aws-2.1.0-py3-none-any.whl", hash = "sha256:c7cf6efc275d2c24108a468d954287ce5aab9733bac816a080cfb3117374e63a", size = 18776, upload-time = "2024-12-24T15:01:56.053Z" }, +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.58b0" @@ -4801,6 +4825,13 @@ google-adk = [ grpc = [ { name = "grpcio" }, ] +lambda-worker-otel = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, +] openai-agents = [ { name = "mcp" }, { name = "openai-agents" }, @@ -4828,6 +4859,9 @@ dev = [ { name = "openai-agents", extra = ["litellm"], marker = "python_full_version < '3.14'" }, { name = "openinference-instrumentation-google-adk" }, { name = "openinference-instrumentation-openai-agents" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, { name = "psutil" }, { name = "pydocstyle" }, { name = "pydoctor" }, @@ -4851,8 +4885,13 @@ requires-dist = [ { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, { name = "nexus-rpc", specifier = "==1.4.0" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, + { name = "opentelemetry-api", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", marker = "extra == 'lambda-worker-otel'", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", marker = "extra == 'lambda-worker-otel'", specifier = ">=0.40b0,<1" }, { name = "protobuf", specifier = ">=3.20,<7.0.0" }, { name = "pydantic", marker = "extra == 'pydantic'", specifier = ">=2.0.0,<3" }, { name = "python-dateutil", marker = "python_full_version < '3.11'", specifier = ">=2.8.2,<3" }, @@ -4860,7 +4899,7 @@ requires-dist = [ { name = "types-protobuf", specifier = ">=3.20" }, { name = "typing-extensions", specifier = ">=4.2.0,<5" }, ] -provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "aioboto3"] +provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "lambda-worker-otel", "aioboto3"] [package.metadata.requires-dev] dev = [ @@ -4877,6 +4916,9 @@ dev = [ { name = "openai-agents", extras = ["litellm"], marker = "python_full_version < '3.14'", specifier = ">=0.3,<0.7" }, { name = "openinference-instrumentation-google-adk", specifier = ">=0.1.8" }, { name = "openinference-instrumentation-openai-agents", specifier = ">=0.1.0" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", specifier = ">=0.40b0,<1" }, { name = "psutil", specifier = ">=5.9.3,<6" }, { name = "pydocstyle", specifier = ">=6.3.0,<7" }, { name = "pydoctor", specifier = ">=25.10.1,<26" },