From e29cfb01a602392d336c442424b644086baf7509 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 2 Apr 2026 11:44:31 -0700 Subject: [PATCH 1/6] Add contrib package for lambda workers --- pyproject.toml | 10 + temporalio/contrib/aws/__init__.py | 1 + .../contrib/aws/lambda_worker/README.md | 104 ++++ .../contrib/aws/lambda_worker/__init__.py | 53 ++ .../contrib/aws/lambda_worker/_configure.py | 89 +++ .../contrib/aws/lambda_worker/_defaults.py | 84 +++ .../contrib/aws/lambda_worker/_run_worker.py | 263 +++++++++ temporalio/contrib/aws/lambda_worker/otel.py | 225 ++++++++ tests/contrib/aws/lambda_worker/__init__.py | 0 .../aws/lambda_worker/test_lambda_worker.py | 536 ++++++++++++++++++ tests/contrib/aws/lambda_worker/test_otel.py | 163 ++++++ uv.lock | 56 +- 12 files changed, 1577 insertions(+), 7 deletions(-) create mode 100644 temporalio/contrib/aws/__init__.py create mode 100644 temporalio/contrib/aws/lambda_worker/README.md create mode 100644 temporalio/contrib/aws/lambda_worker/__init__.py create mode 100644 temporalio/contrib/aws/lambda_worker/_configure.py create mode 100644 temporalio/contrib/aws/lambda_worker/_defaults.py create mode 100644 temporalio/contrib/aws/lambda_worker/_run_worker.py create mode 100644 temporalio/contrib/aws/lambda_worker/otel.py create mode 100644 tests/contrib/aws/lambda_worker/__init__.py create mode 100644 tests/contrib/aws/lambda_worker/test_lambda_worker.py create mode 100644 tests/contrib/aws/lambda_worker/test_otel.py diff --git a/pyproject.toml b/pyproject.toml index c8fa96a8d..e52ad5ead 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,13 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"] pydantic = ["pydantic>=2.0.0,<3"] openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"] google-adk = ["google-adk>=1.27.0,<2"] +lambda-worker-otel = [ + "opentelemetry-api>=1.11.1,<2", + "opentelemetry-sdk>=1.11.1,<2", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", +] aioboto3 = [ "aioboto3>=10.4.0", "types-aioboto3[s3]>=10.4.0", @@ -69,6 +76,9 @@ dev = [ "googleapis-common-protos==1.70.0", "pytest-rerunfailures>=16.1", "moto[s3,server]>=5", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", ] [tool.poe.tasks] diff --git a/temporalio/contrib/aws/__init__.py b/temporalio/contrib/aws/__init__.py new file mode 100644 index 000000000..a8b8c648f --- /dev/null +++ b/temporalio/contrib/aws/__init__.py @@ -0,0 +1 @@ +"""AWS integrations for Temporal SDK.""" diff --git a/temporalio/contrib/aws/lambda_worker/README.md b/temporalio/contrib/aws/lambda_worker/README.md new file mode 100644 index 000000000..f9166b13d --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/README.md @@ -0,0 +1,104 @@ +# lambda_worker + +A wrapper for running [Temporal](https://temporal.io) workers inside AWS Lambda. A single +`run_worker` call handles the full per-invocation lifecycle: connecting to the Temporal server, +creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully shutting down before +the invocation deadline. + +## Quick start + +```python +# handler.py +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + +from my_workflows import MyWorkflow +from my_activities import my_activity + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +## Configuration + +Client connection settings (address, namespace, TLS, API key) are loaded +automatically from a TOML config file and/or environment variables via +`temporalio.envconfig`. The config file is resolved in order: + +1. `TEMPORAL_CONFIG_FILE` env var, if set. +2. `temporal.toml` in `$LAMBDA_TASK_ROOT` (typically `/var/task`). +3. `temporal.toml` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a `LambdaWorkerConfig` dataclass with fields +pre-populated with Lambda-appropriate defaults. Override any field directly in +the callback. The `task_queue` key in `worker_config` is pre-populated from the +`TEMPORAL_TASK_QUEUE` environment variable if set. + +## Lambda-tuned worker defaults + +The package applies conservative concurrency limits suited to Lambda's resource +constraints: + +| Setting | Default | +| --- | --- | +| `max_concurrent_activities` | 2 | +| `max_concurrent_workflow_tasks` | 10 | +| `max_concurrent_local_activities` | 2 | +| `max_concurrent_nexus_tasks` | 5 | +| `workflow_task_poller_behavior` | `SimpleMaximum(2)` | +| `activity_task_poller_behavior` | `SimpleMaximum(1)` | +| `nexus_task_poller_behavior` | `SimpleMaximum(1)` | +| `graceful_shutdown_timeout` | 5 seconds | +| `max_cached_workflows` | 100 | +| `disable_eager_activity_execution` | Always `True` | + +Worker Deployment Versioning is always enabled. + +## Observability + +Metrics and tracing are opt-in. The `otel` module provides convenience helpers +for AWS Distro for OpenTelemetry (ADOT): + +```python +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker +from temporalio.contrib.aws.lambda_worker.otel import apply_defaults, OtelOptions + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + apply_defaults(config, OtelOptions()) + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +You can also use `apply_metrics` or `apply_tracing` individually. + +If you use OTEL, you can use +[ADOT](https://aws-otel.github.io/docs/getting-started/lambda/lambda-python) +(the AWS Distro For OpenTelemetry) to automatically integrate with AWS +observability functionality. Namely, you will want to add the Lambda layer in +the aforementioned link. We'll handle setting up the SDK for you. diff --git a/temporalio/contrib/aws/lambda_worker/__init__.py b/temporalio/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..25df52b9c --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/__init__.py @@ -0,0 +1,53 @@ +"""A wrapper for running Temporal workers inside AWS Lambda. + +A single :py:func:`run_worker` call handles the full per-invocation lifecycle: +connecting to the Temporal server, creating a worker with Lambda-tuned defaults, +polling for tasks, and gracefully shutting down before the invocation deadline. + +Quick start:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + +Configuration +------------- +Client connection settings (address, namespace, TLS, API key) are loaded +automatically from a TOML config file and/or environment variables via +:py:mod:`temporalio.envconfig`. The config file is resolved in order: + +1. ``TEMPORAL_CONFIG_FILE`` env var, if set. +2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). +3. ``temporal.toml`` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with +fields pre-populated with Lambda-appropriate defaults. Override any field +directly in the callback. The ``task_queue`` key in ``worker_config`` is +pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if set. +""" + +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaClientConnectConfig, + LambdaWorkerConfig, +) +from temporalio.contrib.aws.lambda_worker._run_worker import run_worker + +__all__ = [ + "LambdaClientConnectConfig", + "LambdaWorkerConfig", + "run_worker", +] diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py new file mode 100644 index 000000000..dc67c52fb --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -0,0 +1,89 @@ +"""Configuration for the Lambda worker.""" + +from __future__ import annotations + +import asyncio +import sys +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any + +import temporalio.runtime +from temporalio.envconfig import ClientConnectConfig +from temporalio.worker import WorkerConfig + + +class LambdaClientConnectConfig(ClientConnectConfig, total=False): + """Keyword arguments for :py:meth:`temporalio.client.Client.connect`. + + Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with + additional keys that may be set by the Lambda worker or its OTel helpers. + """ + + identity: str + runtime: temporalio.runtime.Runtime + interceptors: Sequence[Any] + plugins: Sequence[Any] + + +@dataclass +class LambdaWorkerConfig: + """Passed to the configure callback of :py:func:`run_worker`. + + Fields are pre-populated with Lambda-appropriate defaults before the + configure callback is invoked; the callback may read and override any of + them. + + Use ``worker_config`` to set task queue, register workflows/activities, and + tune worker options. The ``task_queue`` key is pre-populated from the + ``TEMPORAL_TASK_QUEUE`` environment variable if set. + + Attributes: + client_connect_config: Keyword arguments that will be passed to + :py:meth:`temporalio.client.Client.connect`. Pre-populated from the + config file / environment variables via envconfig, with Lambda + defaults applied. + worker_config: Keyword arguments that will be passed to the + :py:class:`~temporalio.worker.Worker` constructor (the ``client`` + key is managed internally). Pre-populated with Lambda-appropriate + defaults (low concurrency, eager activities disabled) and + ``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set. + shutdown_deadline_buffer: How long before the Lambda invocation + deadline the worker begins its shutdown sequence (worker drain + + shutdown hooks). Pre-populated to + ``graceful_shutdown_timeout + 2s``. If you change + ``graceful_shutdown_timeout`` in ``worker_config``, adjust this + accordingly. + shutdown_hooks: Functions called at the end of each Lambda invocation, + after the worker has stopped. Run in list order. Each may be sync + or async. Use this to flush telemetry providers or release other + per-process resources. + """ + + client_connect_config: LambdaClientConnectConfig = field( + default_factory=LambdaClientConnectConfig + ) + worker_config: WorkerConfig = field(default_factory=WorkerConfig) + shutdown_deadline_buffer: timedelta = field( + default_factory=lambda: timedelta(seconds=7) + ) + shutdown_hooks: list[Callable[[], Awaitable[None] | None]] = field( + default_factory=list + ) + + +async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction] + config: LambdaWorkerConfig, +) -> None: + """Run all registered shutdown hooks in order, logging errors.""" + for fn in config.shutdown_hooks: + try: + result = fn() + if asyncio.iscoroutine(result): + await result + except Exception as e: + print( + f"lambda_worker: shutdown hook error: {e}", + file=sys.stderr, + ) diff --git a/temporalio/contrib/aws/lambda_worker/_defaults.py b/temporalio/contrib/aws/lambda_worker/_defaults.py new file mode 100644 index 000000000..796f5b3d5 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_defaults.py @@ -0,0 +1,84 @@ +"""Lambda-tuned defaults for Temporal worker and client configuration.""" + +from __future__ import annotations + +import os +from collections.abc import Callable +from datetime import timedelta +from pathlib import Path + +from temporalio.worker import PollerBehaviorSimpleMaximum, WorkerConfig + +# ---- Lambda-tuned worker defaults ---- +# Conservative concurrency limits suited to Lambda's resource constraints. + +DEFAULT_MAX_CONCURRENT_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS: int = 10 +DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5 +DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5) +DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2) +DEFAULT_MAX_CACHED_WORKFLOWS: int = 100 + +DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2) +DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) +DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) + +# ---- Environment variable names ---- +ENV_TASK_QUEUE = "TEMPORAL_TASK_QUEUE" +ENV_LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" +ENV_CONFIG_FILE = "TEMPORAL_CONFIG_FILE" +DEFAULT_CONFIG_FILE = "temporal.toml" + + +def apply_lambda_worker_defaults(config: WorkerConfig) -> None: + """Apply Lambda-appropriate defaults to worker config. + + Only sets values that have not already been set (i.e. are absent from *config*). + ``disable_eager_activity_execution`` is always set to ``True``. + """ + config.setdefault("max_concurrent_activities", DEFAULT_MAX_CONCURRENT_ACTIVITIES) + config.setdefault( + "max_concurrent_workflow_tasks", DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + config.setdefault( + "max_concurrent_local_activities", DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + config.setdefault("max_concurrent_nexus_tasks", DEFAULT_MAX_CONCURRENT_NEXUS_TASKS) + config.setdefault("graceful_shutdown_timeout", DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT) + config.setdefault("max_cached_workflows", DEFAULT_MAX_CACHED_WORKFLOWS) + config.setdefault( + "workflow_task_poller_behavior", DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR + ) + config.setdefault( + "activity_task_poller_behavior", DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR + ) + config.setdefault("nexus_task_poller_behavior", DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR) + # Always disable eager activities in Lambda. + config["disable_eager_activity_execution"] = True + + +def build_lambda_identity(request_id: str, function_arn: str) -> str: + """Build a worker identity string from the Lambda invocation context. + + Format: ``@``. + """ + return f"{request_id or 'unknown'}@{function_arn or 'unknown'}" + + +def lambda_default_config_file_path( + getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment] +) -> str: + """Return the config file path for a Lambda environment. + + Resolution order: + + 1. ``TEMPORAL_CONFIG_FILE`` env var, if set. + 2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). + 3. ``temporal.toml`` in the current working directory. + """ + config_file = getenv(ENV_CONFIG_FILE) + if config_file: + return config_file + root = getenv(ENV_LAMBDA_TASK_ROOT) or "." + return str(Path(root) / DEFAULT_CONFIG_FILE) diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py new file mode 100644 index 000000000..de431abee --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -0,0 +1,263 @@ +"""Core run_worker implementation for Lambda.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sys +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any + +import temporalio.client +import temporalio.worker +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaClientConnectConfig, + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_SHUTDOWN_HOOK_BUFFER, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig, WorkerDeploymentConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class _WorkerDeps: + """External dependencies injected for testability.""" + + connect: Callable[..., Awaitable[temporalio.client.Client]] = field( + default_factory=lambda: temporalio.client.Client.connect + ) + create_worker: Callable[..., temporalio.worker.Worker] = field( + default_factory=lambda: temporalio.worker.Worker + ) + load_config: Callable[[], ClientConfigProfile] | None = None + getenv: Callable[[str], str | None] = field(default_factory=lambda: os.environ.get) + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None] | None = None + + +def _default_load_config(getenv: Callable[[str], str | None]) -> ClientConfigProfile: + config_path = lambda_default_config_file_path(getenv) # type: ignore[arg-type] + return ClientConfigProfile.load(config_source=config_path) + + +def _default_extract_lambda_ctx( + lambda_context: Any, +) -> tuple[str, str] | None: + """Extract (request_id, function_arn) from a Lambda context object.""" + if lambda_context is None: + return None + request_id = getattr(lambda_context, "aws_request_id", None) + function_arn = getattr(lambda_context, "invoked_function_arn", None) + if request_id is not None and function_arn is not None: + return (request_id, function_arn) + return None + + +def run_worker( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], +) -> Callable[[Any, Any], Awaitable[None]]: + """Create a Temporal worker Lambda handler. + + Calls the *configure* callback to collect workflow/activity registrations + and option overrides, then returns an async Lambda handler function. On + each invocation the handler connects to the Temporal server, starts a + worker with Lambda-tuned defaults, polls for tasks until the invocation + deadline approaches, and then gracefully shuts down. + + The *version* parameter identifies this worker's deployment version. + ``run_worker`` always enables Worker Deployment Versioning + (``use_worker_versioning=True``). To provide a default versioning behavior + for workflows that do not specify one at registration time, set + ``deployment_config`` in ``worker_config`` in the configure callback. + + The returned handler has the signature ``async handler(event, context)`` + and should be set as your Lambda function's handler entry point. + + Args: + version: The worker deployment version. Required. + configure: A callback that receives a :py:class:`LambdaWorkerConfig` + (pre-populated with Lambda defaults) and configures workflows, + activities, and options on it. + + Returns: + An async Lambda handler function. + + Example:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import ( + LambdaWorkerConfig, + run_worker, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + """ + deps = _WorkerDeps() + try: + return _run_worker_internal(version, configure, deps) + except Exception as e: + print(f"lambda_worker: fatal: {e}", file=sys.stderr) + sys.exit(1) + + +def _run_worker_internal( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], + deps: _WorkerDeps, +) -> Callable[[Any, Any], Awaitable[None]]: + """Core logic with injected dependencies for testability.""" + if not version.deployment_name or not version.build_id: + raise ValueError( + "version is required (deployment_name and build_id must be set)" + ) + + # Load client config from envconfig / TOML. + load_config = deps.load_config or (lambda: _default_load_config(deps.getenv)) + profile = load_config() + connect_config: LambdaClientConnectConfig = {**profile.to_client_connect_config()} + + # Build worker config with Lambda defaults. + worker_config: WorkerConfig = {} + apply_lambda_worker_defaults(worker_config) + + # Always enable deployment versioning. + worker_config["deployment_config"] = WorkerDeploymentConfig( + version=version, + use_worker_versioning=True, + ) + + # Calculate default shutdown buffer. + graceful_timeout = worker_config.get( + "graceful_shutdown_timeout", timedelta(seconds=5) + ) + shutdown_buffer = graceful_timeout + DEFAULT_SHUTDOWN_HOOK_BUFFER + + # Pre-populate config with defaults. + config = LambdaWorkerConfig( + client_connect_config=connect_config, + worker_config=worker_config, + shutdown_deadline_buffer=shutdown_buffer, + ) + + # Pre-populate task queue from environment if available. + env_tq = deps.getenv("TEMPORAL_TASK_QUEUE") + if env_tq: + config.worker_config["task_queue"] = env_tq + + # Call user configure callback with pre-populated config. + configure(config) + + # Validate task queue. + if not config.worker_config.get("task_queue"): + raise ValueError( + "task queue not configured: set " + 'worker_config["task_queue"] or the ' + "TEMPORAL_TASK_QUEUE environment variable" + ) + + extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx + + async def _handler(_event: Any, lambda_context: Any) -> None: + await _invocation_handler( + lambda_context=lambda_context, + config=config, + deps=deps, + extract_lambda_ctx=extract_lambda_ctx, + ) + + return _handler + + +async def _invocation_handler( + *, + lambda_context: Any, + config: LambdaWorkerConfig, + deps: _WorkerDeps, + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None], +) -> None: + """Handle a single Lambda invocation.""" + shutdown_buffer = config.shutdown_deadline_buffer + + # Check deadline feasibility. + remaining_ms_fn = getattr(lambda_context, "get_remaining_time_in_millis", None) + deadline_available = remaining_ms_fn is not None + if deadline_available: + assert remaining_ms_fn is not None + remaining = timedelta(milliseconds=remaining_ms_fn()) + work_time = remaining - shutdown_buffer + if work_time <= timedelta(seconds=1): + raise RuntimeError( + f"Lambda timeout leaves almost no time for work " + f"(work_time={work_time}, shutdown_buffer={shutdown_buffer}); " + f"increase the function timeout or decrease the shutdown " + f"deadline buffer" + ) + elif work_time < timedelta(seconds=5): + logger.warning( + "Lambda timeout leaves less than 5s for work after " + "shutdown buffer; consider increasing the function " + "timeout or decreasing the shutdown deadline buffer " + "(work_time=%s, shutdown_buffer=%s)", + work_time, + shutdown_buffer, + ) + + # Build per-invocation connect kwargs with identity from Lambda context. + invocation_connect_kwargs: LambdaClientConnectConfig = { + **config.client_connect_config + } + if "identity" not in invocation_connect_kwargs: + ctx_info = extract_lambda_ctx(lambda_context) + if ctx_info is not None: + request_id, function_arn = ctx_info + invocation_connect_kwargs["identity"] = build_lambda_identity( + request_id, function_arn + ) + + # Connect to Temporal. + client = await deps.connect(**invocation_connect_kwargs) + + # Create the worker. + worker = deps.create_worker(client, **config.worker_config) + + # Run the worker until the deadline approaches or context is done. + if deadline_available: + assert remaining_ms_fn is not None + work_time_secs = ( + timedelta(milliseconds=remaining_ms_fn()) - shutdown_buffer + ).total_seconds() + if work_time_secs > 0: + try: + async with asyncio.timeout(work_time_secs): + await worker.run() + except TimeoutError: + pass + else: + # No deadline - run until cancelled. + await worker.run() + + # Run shutdown hooks after worker has stopped. + await _run_shutdown_hooks(config) diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py new file mode 100644 index 000000000..4cb33ef6f --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -0,0 +1,225 @@ +"""OpenTelemetry helpers for Temporal workers running inside AWS Lambda. + +Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker` +configure callback for a batteries-included setup that creates an OTel +collector exporter and tracing interceptor, suitable for use with the AWS +Distro for OpenTelemetry (ADOT) Lambda layer. + +Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` +individually if you only need one. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from datetime import timedelta + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME + +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig + + +@dataclass +class OtelOptions: + """Options for :py:func:`apply_defaults`. + + Attributes: + metric_periodicity: How often the Core SDK exports metrics to the + collector. Defaults to 10 seconds. Set this shorter than your + Lambda timeout to ensure at least one export per invocation. + service_name: OTel service name resource attribute. If empty, + falls back to ``OTEL_SERVICE_NAME``, then + ``AWS_LAMBDA_FUNCTION_NAME``, then + ``"temporal-lambda-worker"``. + collector_endpoint: OTLP collector endpoint (e.g. + ``"http://localhost:4317"``). If empty, falls back to + ``OTEL_EXPORTER_OTLP_ENDPOINT``, then + ``"http://localhost:4317"``. + """ + + metric_periodicity: timedelta = field(default_factory=lambda: timedelta(seconds=10)) + service_name: str = "" + collector_endpoint: str = "" + + +def _resolve_service_name(options: OtelOptions) -> str: + service_name = options.service_name + if not service_name: + service_name = os.environ.get("OTEL_SERVICE_NAME", "") + if not service_name: + service_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "") + if not service_name: + service_name = "temporal-lambda-worker" + return service_name + + +def _resolve_endpoint(options: OtelOptions) -> str: + endpoint = options.collector_endpoint + if not endpoint: + endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "") + if not endpoint: + endpoint = "http://localhost:4317" + return endpoint + + +def apply_defaults( + config: LambdaWorkerConfig, + options: OtelOptions | None = None, +) -> None: + """Configure OTel metrics and tracing with AWS Lambda defaults. + + Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime` + with an :py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the + OTLP collector, and adds a + :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for + distributed tracing. + + The collector endpoint defaults to ``http://localhost:4317``, which is the + endpoint expected by the ADOT collector Lambda layer. + + Registers a per-invocation ``ForceFlush`` shutdown hook for the + ``TracerProvider`` so pending traces are exported before each Lambda + invocation completes. + + Core SDK metrics are exported on the ``metric_periodicity`` interval by + the runtime's internal thread. There is no explicit flush API for Core + metrics; set ``metric_periodicity`` short enough to ensure at least one + export per invocation. + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + options: Optional overrides for service name, endpoint, etc. + """ + if options is None: + options = OtelOptions() + + endpoint = _resolve_endpoint(options) + service_name = _resolve_service_name(options) + + telemetry_config = build_metrics_telemetry_config( + endpoint=endpoint, + service_name=service_name, + metric_periodicity=options.metric_periodicity, + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + resource = Resource.create({SERVICE_NAME: service_name}) + + # Try to use X-Ray ID generator if available. + try: + from opentelemetry.sdk.extension.aws.trace import ( # type: ignore[reportMissingTypeStubs] + AwsXRayIdGenerator, + ) + + tracer_provider = TracerProvider( + resource=resource, id_generator=AwsXRayIdGenerator() + ) + except ImportError: + tracer_provider = TracerProvider(resource=resource) + + # Use OTLP gRPC exporter if available, otherwise skip trace export. + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + tracer_provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) + ) + except ImportError: + pass + + apply_tracing(config, tracer_provider) + + +def build_metrics_telemetry_config( + *, + endpoint: str = "", + service_name: str = "", + metric_periodicity: timedelta | None = None, +) -> TelemetryConfig: + """Build a :py:class:`~temporalio.runtime.TelemetryConfig` for OTel metrics. + + Returns a ``TelemetryConfig`` with + :py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics pointed at the + given OTLP collector endpoint. Use this when you need to compose metrics + config with other telemetry settings (e.g. custom logging) into your own + :py:class:`~temporalio.runtime.Runtime`. + + Core SDK metrics are exported on the ``metric_periodicity`` interval by + the runtime's internal thread. There is no explicit flush API; set + ``metric_periodicity`` short enough to ensure at least one export per + Lambda invocation. + + Example:: + + telemetry = build_metrics_telemetry_config( + endpoint="http://localhost:4317", + service_name="my-service", + ) + # Customize further: + telemetry_config = dataclasses.replace( + telemetry, logging=my_logging_config + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + Args: + endpoint: OTLP collector endpoint. Defaults to + ``http://localhost:4317``. + service_name: OTel service name. Used as a global tag. + metric_periodicity: How often metrics are exported. + + Returns: + A ``TelemetryConfig`` ready to pass to + :py:class:`~temporalio.runtime.Runtime`. + """ + if not endpoint: + endpoint = "http://localhost:4317" + + otel_config = OpenTelemetryConfig( + url=endpoint, + metric_periodicity=metric_periodicity, + ) + + global_tags: dict[str, str] = {} + if service_name: + global_tags["service_name"] = service_name + + return TelemetryConfig( + metrics=otel_config, + global_tags=global_tags, + ) + + +def apply_tracing( + config: LambdaWorkerConfig, + tracer_provider: TracerProvider, +) -> None: + """Configure only OTel tracing (no metrics) on the Lambda worker config. + + Adds a :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` to + ``config.client_connect_config["interceptors"]`` and registers a + ``ForceFlush`` shutdown hook for the provider. + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + tracer_provider: The ``TracerProvider`` to use for tracing. + """ + from temporalio.contrib.opentelemetry import TracingInterceptor + + interceptor = TracingInterceptor(tracer=tracer_provider.get_tracer("temporal-sdk")) + interceptors = list(config.client_connect_config.get("interceptors", [])) + interceptors.append(interceptor) + config.client_connect_config["interceptors"] = interceptors + + async def _flush() -> None: + tracer_provider.force_flush() + + config.shutdown_hooks.append(_flush) diff --git a/tests/contrib/aws/lambda_worker/__init__.py b/tests/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/aws/lambda_worker/test_lambda_worker.py b/tests/contrib/aws/lambda_worker/test_lambda_worker.py new file mode 100644 index 000000000..1d0f3cb11 --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_lambda_worker.py @@ -0,0 +1,536 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.""" + +from __future__ import annotations + +from datetime import timedelta +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_MAX_CACHED_WORKFLOWS, + DEFAULT_MAX_CONCURRENT_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_NEXUS_TASKS, + DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.contrib.aws.lambda_worker._run_worker import ( + _run_worker_internal, + _WorkerDeps, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig + +TEST_VERSION = WorkerDeploymentVersion( + deployment_name="test-deployment", + build_id="test-build", +) + + +# ---- LambdaWorkerConfig tests ---- + + +class TestLambdaWorkerConfig: + def test_worker_config_task_queue(self) -> None: + config = LambdaWorkerConfig() + assert config.worker_config.get("task_queue") is None + config.worker_config["task_queue"] = "my-queue" + assert config.worker_config["task_queue"] == "my-queue" + + def test_worker_config_workflows(self) -> None: + config = LambdaWorkerConfig() + + class FakeWorkflow: + pass + + config.worker_config["workflows"] = [FakeWorkflow] + assert FakeWorkflow in config.worker_config["workflows"] + + def test_worker_config_activities(self) -> None: + config = LambdaWorkerConfig() + + def fake_activity() -> None: + pass + + config.worker_config["activities"] = [fake_activity] + assert fake_activity in config.worker_config["activities"] + + def test_client_connect_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.client_connect_config["namespace"] = "custom-ns" + assert config.client_connect_config["namespace"] == "custom-ns" + + def test_worker_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.worker_config["max_concurrent_activities"] = 42 + assert config.worker_config["max_concurrent_activities"] == 42 + + def test_shutdown_deadline_buffer(self) -> None: + config = LambdaWorkerConfig() + config.shutdown_deadline_buffer = timedelta(seconds=5) + assert config.shutdown_deadline_buffer == timedelta(seconds=5) + + def test_shutdown_hooks_list(self) -> None: + config = LambdaWorkerConfig() + fn = MagicMock() + config.shutdown_hooks.append(fn) + assert fn in config.shutdown_hooks + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_in_order(self) -> None: + config = LambdaWorkerConfig() + order: list[str] = [] + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + await _run_shutdown_hooks(config) + assert order == ["first", "second"] + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_async(self) -> None: + config = LambdaWorkerConfig() + called = False + + async def async_hook() -> None: + nonlocal called + called = True + + config.shutdown_hooks.append(async_hook) + await _run_shutdown_hooks(config) + assert called + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_error_continues(self) -> None: + config = LambdaWorkerConfig() + second_called = False + + def failing_hook() -> None: + raise RuntimeError("flush failed") + + def second_hook() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing_hook) + config.shutdown_hooks.append(second_hook) + await _run_shutdown_hooks(config) + assert second_called + + def test_is_dataclass(self) -> None: + import dataclasses + + assert dataclasses.is_dataclass(LambdaWorkerConfig) + + def test_default_field_independence(self) -> None: + """Each instance gets its own mutable containers.""" + a = LambdaWorkerConfig() + b = LambdaWorkerConfig() + a.worker_config["max_concurrent_activities"] = 99 + assert "max_concurrent_activities" not in b.worker_config + + +# ---- Defaults tests ---- + + +class TestDefaults: + def test_apply_lambda_worker_defaults(self) -> None: + config: WorkerConfig = {} + apply_lambda_worker_defaults(config) + assert ( + config.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + ) + assert ( + config.get("max_concurrent_workflow_tasks") + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert ( + config.get("max_concurrent_local_activities") + == DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + assert ( + config.get("max_concurrent_nexus_tasks") + == DEFAULT_MAX_CONCURRENT_NEXUS_TASKS + ) + assert config.get("max_cached_workflows") == DEFAULT_MAX_CACHED_WORKFLOWS + assert config.get("disable_eager_activity_execution") is True + + def test_apply_lambda_worker_defaults_preserves_existing(self) -> None: + config: WorkerConfig = { + "max_concurrent_activities": 50, + "graceful_shutdown_timeout": timedelta(seconds=10), + } + apply_lambda_worker_defaults(config) + assert config.get("max_concurrent_activities") == 50 + assert config.get("graceful_shutdown_timeout") == timedelta(seconds=10) + assert config.get("disable_eager_activity_execution") is True + + def test_build_lambda_identity(self) -> None: + assert ( + build_lambda_identity("req-123", "arn:aws:lambda:us-east-1:123:function:f") + == "req-123@arn:aws:lambda:us-east-1:123:function:f" + ) + + def test_build_lambda_identity_empty(self) -> None: + assert build_lambda_identity("", "") == "unknown@unknown" + + def test_lambda_default_config_file_path_env_var(self) -> None: + env = {"TEMPORAL_CONFIG_FILE": "/custom/path.toml"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == "/custom/path.toml" + ) + + def test_lambda_default_config_file_path_lambda_root(self) -> None: + env = {"LAMBDA_TASK_ROOT": "/var/task"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == "/var/task/temporal.toml" + ) + + def test_lambda_default_config_file_path_cwd(self) -> None: + env: dict[str, str] = {} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == "temporal.toml" + ) + + +# ---- RunWorker tests ---- + + +def _make_lambda_context( + *, + remaining_ms: int = 3_600_000, + request_id: str = "req-123", + function_arn: str = "arn:aws:lambda:us-east-1:123:function:my-func", +) -> Any: + """Create a mock Lambda context object.""" + ctx = MagicMock() + ctx.get_remaining_time_in_millis.return_value = remaining_ms + ctx.aws_request_id = request_id + ctx.invoked_function_arn = function_arn + return ctx + + +def _make_test_deps( + *, + connect_kwargs_capture: list[dict[str, Any]] | None = None, + worker_kwargs_capture: list[dict[str, Any]] | None = None, +) -> _WorkerDeps: + """Create test deps with mocked connect and worker.""" + mock_client = MagicMock() + mock_worker = MagicMock() + mock_worker.run = AsyncMock() + + async def fake_connect(**kwargs: Any) -> Any: + if connect_kwargs_capture is not None: + connect_kwargs_capture.append(kwargs) + return mock_client + + def fake_create_worker(_client: Any, **kwargs: Any) -> Any: + if worker_kwargs_capture is not None: + worker_kwargs_capture.append(kwargs) + return mock_worker + + return _WorkerDeps( + connect=fake_connect, + create_worker=fake_create_worker, + load_config=lambda: ClientConfigProfile(), + getenv={"TEMPORAL_TASK_QUEUE": "test-queue"}.get, # type: ignore[arg-type] + extract_lambda_ctx=lambda ctx: ( + ctx.aws_request_id, + ctx.invoked_function_arn, + ) + if hasattr(ctx, "aws_request_id") + else None, + ) + + +class TestRunWorkerInternal: + def test_returns_handler(self) -> None: + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + assert callable(handler) + + @pytest.mark.asyncio + async def test_success(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["workflows"] = [type("FakeWf", (), {})] + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + + def test_configure_callback_error(self) -> None: + deps = _make_test_deps() + + def bad_configure(_config: LambdaWorkerConfig) -> None: + raise RuntimeError("bad config") + + with pytest.raises(RuntimeError, match="bad config"): + _run_worker_internal(TEST_VERSION, bad_configure, deps) + + def test_missing_task_queue(self) -> None: + deps = _make_test_deps() + deps.getenv = lambda _: None # type: ignore[assignment] + with pytest.raises(ValueError, match="task queue not configured"): + _run_worker_internal(TEST_VERSION, lambda config: None, deps) + + def test_missing_version(self) -> None: + deps = _make_test_deps() + with pytest.raises(ValueError, match="version is required"): + _run_worker_internal( + WorkerDeploymentVersion(deployment_name="", build_id=""), + lambda config: None, + deps, + ) + + @pytest.mark.asyncio + async def test_user_overrides_applied(self) -> None: + connect_capture: list[dict[str, Any]] = [] + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps( + connect_kwargs_capture=connect_capture, + worker_kwargs_capture=worker_capture, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "user-queue" + config.client_connect_config["namespace"] = "custom-ns" + config.worker_config["max_concurrent_activities"] = 99 + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + + assert connect_capture[0]["namespace"] == "custom-ns" + assert worker_capture[0]["max_concurrent_activities"] == 99 + + @pytest.mark.asyncio + async def test_lambda_defaults_applied(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + await handler({}, _make_lambda_context()) + + kwargs = worker_capture[0] + assert kwargs["max_concurrent_activities"] == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert ( + kwargs["max_concurrent_workflow_tasks"] + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert kwargs["disable_eager_activity_execution"] is True + dc = kwargs["deployment_config"] + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + @pytest.mark.asyncio + async def test_identity_from_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + await handler( + {}, + _make_lambda_context( + request_id="req-abc-123", + function_arn="arn:aws:lambda:us-east-1:123456:function:my-func", + ), + ) + + assert ( + connect_capture[0]["identity"] + == "req-abc-123@arn:aws:lambda:us-east-1:123456:function:my-func" + ) + + @pytest.mark.asyncio + async def test_identity_user_override_wins(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + + def configure(config: LambdaWorkerConfig) -> None: + config.client_connect_config["identity"] = "my-custom-identity" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + assert connect_capture[0]["identity"] == "my-custom-identity" + + @pytest.mark.asyncio + async def test_identity_no_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + deps.extract_lambda_ctx = lambda ctx: None + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + await handler({}, MagicMock(spec=[])) + assert "identity" not in connect_capture[0] + + @pytest.mark.asyncio + async def test_shutdown_hooks_called(self) -> None: + deps = _make_test_deps() + shutdown_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_called + shutdown_called = True + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + assert shutdown_called + + @pytest.mark.asyncio + async def test_shutdown_hooks_called_per_invocation(self) -> None: + deps = _make_test_deps() + shutdown_count = 0 + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_count + shutdown_count += 1 + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + await handler({}, _make_lambda_context()) + await handler({}, _make_lambda_context()) + assert shutdown_count == 3 + + @pytest.mark.asyncio + async def test_shutdown_hooks_multiple_funcs_order(self) -> None: + deps = _make_test_deps() + order: list[str] = [] + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + assert order == ["first", "second"] + + @pytest.mark.asyncio + async def test_shutdown_hooks_error_continues(self) -> None: + deps = _make_test_deps() + second_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def failing() -> None: + raise RuntimeError("flush failed") + + def second() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing) + config.shutdown_hooks.append(second) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + assert second_called + + @pytest.mark.asyncio + async def test_tight_deadline_raises_error(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=1500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with pytest.raises(RuntimeError, match="almost no time for work"): + await handler({}, _make_lambda_context(remaining_ms=2000)) + + @pytest.mark.asyncio + async def test_tight_deadline_logs_warning(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with patch( + "temporalio.contrib.aws.lambda_worker._run_worker.logger" + ) as mock_logger: + await handler({}, _make_lambda_context(remaining_ms=2000)) + mock_logger.warning.assert_called_once() + assert "less than 5s" in mock_logger.warning.call_args[0][0] + + @pytest.mark.asyncio + async def test_per_invocation_lifecycle(self) -> None: + """Each invocation creates its own client and worker.""" + connect_count = 0 + deps = _make_test_deps() + original_connect = deps.connect + + async def counting_connect(**kwargs: Any) -> Any: + nonlocal connect_count + connect_count += 1 + return await original_connect(**kwargs) + + deps.connect = counting_connect + + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + await handler({}, _make_lambda_context()) + await handler({}, _make_lambda_context()) + await handler({}, _make_lambda_context()) + assert connect_count == 3 + + @pytest.mark.asyncio + async def test_task_queue_from_config(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + deps.getenv = lambda _: None # type: ignore[assignment] + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "explicit-queue" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + await handler({}, _make_lambda_context()) + assert worker_capture[0]["task_queue"] == "explicit-queue" + + def test_task_queue_pre_populated_from_env(self) -> None: + """Task queue is pre-populated from TEMPORAL_TASK_QUEUE env var.""" + deps = _make_test_deps() + task_queues: list[str | None] = [] + + def configure(config: LambdaWorkerConfig) -> None: + task_queues.append(config.worker_config.get("task_queue")) + + _run_worker_internal(TEST_VERSION, configure, deps) + assert task_queues[0] == "test-queue" + + def test_config_pre_populated_with_defaults(self) -> None: + """Configure callback receives pre-populated LambdaWorkerConfig.""" + deps = _make_test_deps() + captured: list[LambdaWorkerConfig] = [] + + def configure(config: LambdaWorkerConfig) -> None: + captured.append(config) + + _run_worker_internal(TEST_VERSION, configure, deps) + wc = captured[0].worker_config + assert wc.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert wc.get("disable_eager_activity_execution") is True + dc = wc.get("deployment_config") + assert dc is not None + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + @pytest.mark.asyncio + async def test_no_deadline_runs_until_complete(self) -> None: + """When no deadline is available, worker runs until it completes.""" + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + ctx = MagicMock(spec=["aws_request_id", "invoked_function_arn"]) + ctx.aws_request_id = "req-123" + ctx.invoked_function_arn = "arn:aws:lambda:us-east-1:123:function:f" + await handler({}, ctx) diff --git a/tests/contrib/aws/lambda_worker/test_otel.py b/tests/contrib/aws/lambda_worker/test_otel.py new file mode 100644 index 000000000..69cc48a64 --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_otel.py @@ -0,0 +1,163 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.otel.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import patch + +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker.otel import ( + OtelOptions, + apply_defaults, + apply_tracing, + build_metrics_telemetry_config, +) +from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig + + +def _make_tracer_provider() -> tuple[TracerProvider, InMemorySpanExporter]: + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + return tracer_provider, span_exporter + + +class TestApplyTracing: + def test_adds_interceptor(self) -> None: + config = LambdaWorkerConfig() + tracer_provider, _ = _make_tracer_provider() + apply_tracing(config, tracer_provider) + interceptors = config.client_connect_config.get("interceptors", []) + assert len(interceptors) == 1 + + def test_appends_to_existing_interceptors(self) -> None: + config = LambdaWorkerConfig() + sentinel = object() + config.client_connect_config["interceptors"] = [sentinel] + tracer_provider, _ = _make_tracer_provider() + apply_tracing(config, tracer_provider) + interceptors = config.client_connect_config["interceptors"] + assert len(interceptors) == 2 + assert interceptors[0] is sentinel + + def test_registers_flush_shutdown_hook(self) -> None: + config = LambdaWorkerConfig() + tracer_provider, _ = _make_tracer_provider() + apply_tracing(config, tracer_provider) + assert len(config.shutdown_hooks) == 1 + + @pytest.mark.asyncio + async def test_shutdown_hook_flushes(self) -> None: + config = LambdaWorkerConfig() + tracer_provider, _ = _make_tracer_provider() + apply_tracing(config, tracer_provider) + await _run_shutdown_hooks(config) + + +class TestBuildMetricsTelemetryConfig: + def test_returns_telemetry_config(self) -> None: + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + assert isinstance(tc, TelemetryConfig) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_default_endpoint(self) -> None: + tc = build_metrics_telemetry_config() + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_service_name_as_global_tag(self) -> None: + tc = build_metrics_telemetry_config(service_name="my-svc") + assert tc.global_tags.get("service_name") == "my-svc" + + def test_no_service_name_no_tag(self) -> None: + tc = build_metrics_telemetry_config() + assert "service_name" not in tc.global_tags + + def test_metric_periodicity(self) -> None: + tc = build_metrics_telemetry_config(metric_periodicity=timedelta(seconds=30)) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.metric_periodicity == timedelta(seconds=30) + + def test_composable_with_custom_runtime(self) -> None: + """User can compose the returned config into a custom Runtime.""" + import dataclasses + + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + # Replace logging config to demonstrate composability. + custom_tc = dataclasses.replace(tc, logging=None) + assert custom_tc.logging is None + assert isinstance(custom_tc.metrics, OpenTelemetryConfig) + + +class TestApplyDefaults: + def test_configures_metrics_and_tracing(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(collector_endpoint="http://localhost:4317")) + + assert "runtime" in config.client_connect_config + interceptors = config.client_connect_config.get("interceptors", []) + assert len(interceptors) == 1 + assert len(config.shutdown_hooks) == 1 + + def test_service_name_from_options(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(service_name="my-service")) + assert "runtime" in config.client_connect_config + + def test_service_name_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict("os.environ", {"OTEL_SERVICE_NAME": "env-service"}): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_service_name_from_lambda_function_name(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"AWS_LAMBDA_FUNCTION_NAME": "my-lambda"}, + clear=True, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_endpoint_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://custom:4317"}, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_default_options_used_when_none(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config) + assert "runtime" in config.client_connect_config + assert len(config.shutdown_hooks) == 1 + + +class TestOtelOptions: + def test_defaults(self) -> None: + opts = OtelOptions() + assert opts.service_name == "" + assert opts.collector_endpoint == "" + assert opts.metric_periodicity == timedelta(seconds=10) + + def test_custom_values(self) -> None: + opts = OtelOptions( + service_name="svc", + collector_endpoint="http://host:4317", + metric_periodicity=timedelta(seconds=30), + ) + assert opts.service_name == "svc" + assert opts.collector_endpoint == "http://host:4317" + assert opts.metric_periodicity == timedelta(seconds=30) diff --git a/uv.lock b/uv.lock index c63faefad..8e1bc6b60 100644 --- a/uv.lock +++ b/uv.lock @@ -1768,7 +1768,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/38/3f/9859f655d11901e7b2996c6e3d33e0caa9a1d4572c3bc61ed0faa64b2f4c/greenlet-3.3.2-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:9bc885b89709d901859cf95179ec9f6bb67a3d2bb1f0e88456461bd4b7f8fd0d", size = 277747, upload-time = "2026-02-20T20:16:21.325Z" }, { url = "https://files.pythonhosted.org/packages/fb/07/cb284a8b5c6498dbd7cba35d31380bb123d7dceaa7907f606c8ff5993cbf/greenlet-3.3.2-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b568183cf65b94919be4438dc28416b234b678c608cafac8874dfeeb2a9bbe13", size = 579202, upload-time = "2026-02-20T20:47:28.955Z" }, { url = "https://files.pythonhosted.org/packages/ed/45/67922992b3a152f726163b19f890a85129a992f39607a2a53155de3448b8/greenlet-3.3.2-cp310-cp310-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:527fec58dc9f90efd594b9b700662ed3fb2493c2122067ac9c740d98080a620e", size = 590620, upload-time = "2026-02-20T20:55:55.581Z" }, - { url = "https://files.pythonhosted.org/packages/03/5f/6e2a7d80c353587751ef3d44bb947f0565ec008a2e0927821c007e96d3a7/greenlet-3.3.2-cp310-cp310-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:508c7f01f1791fbc8e011bd508f6794cb95397fdb198a46cb6635eb5b78d85a7", size = 602132, upload-time = "2026-02-20T21:02:43.261Z" }, { url = "https://files.pythonhosted.org/packages/ad/55/9f1ebb5a825215fadcc0f7d5073f6e79e3007e3282b14b22d6aba7ca6cb8/greenlet-3.3.2-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ad0c8917dd42a819fe77e6bdfcb84e3379c0de956469301d9fd36427a1ca501f", size = 591729, upload-time = "2026-02-20T20:20:58.395Z" }, { url = "https://files.pythonhosted.org/packages/24/b4/21f5455773d37f94b866eb3cf5caed88d6cea6dd2c6e1f9c34f463cba3ec/greenlet-3.3.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:97245cc10e5515dbc8c3104b2928f7f02b6813002770cfaffaf9a6e0fc2b94ef", size = 1551946, upload-time = "2026-02-20T20:49:31.102Z" }, { url = "https://files.pythonhosted.org/packages/00/68/91f061a926abead128fe1a87f0b453ccf07368666bd59ffa46016627a930/greenlet-3.3.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:8c1fdd7d1b309ff0da81d60a9688a8bd044ac4e18b250320a96fc68d31c209ca", size = 1618494, upload-time = "2026-02-20T20:21:06.541Z" }, @@ -1776,7 +1775,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f3/47/16400cb42d18d7a6bb46f0626852c1718612e35dcb0dffa16bbaffdf5dd2/greenlet-3.3.2-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:c56692189a7d1c7606cb794be0a8381470d95c57ce5be03fb3d0ef57c7853b86", size = 278890, upload-time = "2026-02-20T20:19:39.263Z" }, { url = "https://files.pythonhosted.org/packages/a3/90/42762b77a5b6aa96cd8c0e80612663d39211e8ae8a6cd47c7f1249a66262/greenlet-3.3.2-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1ebd458fa8285960f382841da585e02201b53a5ec2bac6b156fc623b5ce4499f", size = 581120, upload-time = "2026-02-20T20:47:30.161Z" }, { url = "https://files.pythonhosted.org/packages/bf/6f/f3d64f4fa0a9c7b5c5b3c810ff1df614540d5aa7d519261b53fba55d4df9/greenlet-3.3.2-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a443358b33c4ec7b05b79a7c8b466f5d275025e750298be7340f8fc63dff2a55", size = 594363, upload-time = "2026-02-20T20:55:56.965Z" }, - { url = "https://files.pythonhosted.org/packages/9c/8b/1430a04657735a3f23116c2e0d5eb10220928846e4537a938a41b350bed6/greenlet-3.3.2-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:4375a58e49522698d3e70cc0b801c19433021b5c37686f7ce9c65b0d5c8677d2", size = 605046, upload-time = "2026-02-20T21:02:45.234Z" }, { url = "https://files.pythonhosted.org/packages/72/83/3e06a52aca8128bdd4dcd67e932b809e76a96ab8c232a8b025b2850264c5/greenlet-3.3.2-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8e2cd90d413acbf5e77ae41e5d3c9b3ac1d011a756d7284d7f3f2b806bbd6358", size = 594156, upload-time = "2026-02-20T20:20:59.955Z" }, { url = "https://files.pythonhosted.org/packages/70/79/0de5e62b873e08fe3cef7dbe84e5c4bc0e8ed0c7ff131bccb8405cd107c8/greenlet-3.3.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:442b6057453c8cb29b4fb36a2ac689382fc71112273726e2423f7f17dc73bf99", size = 1554649, upload-time = "2026-02-20T20:49:32.293Z" }, { url = "https://files.pythonhosted.org/packages/5a/00/32d30dee8389dc36d42170a9c66217757289e2afb0de59a3565260f38373/greenlet-3.3.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:45abe8eb6339518180d5a7fa47fa01945414d7cca5ecb745346fc6a87d2750be", size = 1619472, upload-time = "2026-02-20T20:21:07.966Z" }, @@ -1785,7 +1783,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ea/ab/1608e5a7578e62113506740b88066bf09888322a311cff602105e619bd87/greenlet-3.3.2-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:ac8d61d4343b799d1e526db579833d72f23759c71e07181c2d2944e429eb09cd", size = 280358, upload-time = "2026-02-20T20:17:43.971Z" }, { url = "https://files.pythonhosted.org/packages/a5/23/0eae412a4ade4e6623ff7626e38998cb9b11e9ff1ebacaa021e4e108ec15/greenlet-3.3.2-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ceec72030dae6ac0c8ed7591b96b70410a8be370b6a477b1dbc072856ad02bd", size = 601217, upload-time = "2026-02-20T20:47:31.462Z" }, { url = "https://files.pythonhosted.org/packages/f8/16/5b1678a9c07098ecb9ab2dd159fafaf12e963293e61ee8d10ecb55273e5e/greenlet-3.3.2-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a2a5be83a45ce6188c045bcc44b0ee037d6a518978de9a5d97438548b953a1ac", size = 611792, upload-time = "2026-02-20T20:55:58.423Z" }, - { url = "https://files.pythonhosted.org/packages/5c/c5/cc09412a29e43406eba18d61c70baa936e299bc27e074e2be3806ed29098/greenlet-3.3.2-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:ae9e21c84035c490506c17002f5c8ab25f980205c3e61ddb3a2a2a2e6c411fcb", size = 626250, upload-time = "2026-02-20T21:02:46.596Z" }, { url = "https://files.pythonhosted.org/packages/50/1f/5155f55bd71cabd03765a4aac9ac446be129895271f73872c36ebd4b04b6/greenlet-3.3.2-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:43e99d1749147ac21dde49b99c9abffcbc1e2d55c67501465ef0930d6e78e070", size = 613875, upload-time = "2026-02-20T20:21:01.102Z" }, { url = "https://files.pythonhosted.org/packages/fc/dd/845f249c3fcd69e32df80cdab059b4be8b766ef5830a3d0aa9d6cad55beb/greenlet-3.3.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:4c956a19350e2c37f2c48b336a3afb4bff120b36076d9d7fb68cb44e05d95b79", size = 1571467, upload-time = "2026-02-20T20:49:33.495Z" }, { url = "https://files.pythonhosted.org/packages/2a/50/2649fe21fcc2b56659a452868e695634722a6655ba245d9f77f5656010bf/greenlet-3.3.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6c6f8ba97d17a1e7d664151284cb3315fc5f8353e75221ed4324f84eb162b395", size = 1640001, upload-time = "2026-02-20T20:21:09.154Z" }, @@ -1794,7 +1791,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ac/48/f8b875fa7dea7dd9b33245e37f065af59df6a25af2f9561efa8d822fde51/greenlet-3.3.2-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:aa6ac98bdfd716a749b84d4034486863fd81c3abde9aa3cf8eff9127981a4ae4", size = 279120, upload-time = "2026-02-20T20:19:01.9Z" }, { url = "https://files.pythonhosted.org/packages/49/8d/9771d03e7a8b1ee456511961e1b97a6d77ae1dea4a34a5b98eee706689d3/greenlet-3.3.2-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ab0c7e7901a00bc0a7284907273dc165b32e0d109a6713babd04471327ff7986", size = 603238, upload-time = "2026-02-20T20:47:32.873Z" }, { url = "https://files.pythonhosted.org/packages/59/0e/4223c2bbb63cd5c97f28ffb2a8aee71bdfb30b323c35d409450f51b91e3e/greenlet-3.3.2-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:d248d8c23c67d2291ffd47af766e2a3aa9fa1c6703155c099feb11f526c63a92", size = 614219, upload-time = "2026-02-20T20:55:59.817Z" }, - { url = "https://files.pythonhosted.org/packages/94/2b/4d012a69759ac9d77210b8bfb128bc621125f5b20fc398bce3940d036b1c/greenlet-3.3.2-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:ccd21bb86944ca9be6d967cf7691e658e43417782bce90b5d2faeda0ff78a7dd", size = 628268, upload-time = "2026-02-20T21:02:48.024Z" }, { url = "https://files.pythonhosted.org/packages/7a/34/259b28ea7a2a0c904b11cd36c79b8cef8019b26ee5dbe24e73b469dea347/greenlet-3.3.2-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b6997d360a4e6a4e936c0f9625b1c20416b8a0ea18a8e19cabbefc712e7397ab", size = 616774, upload-time = "2026-02-20T20:21:02.454Z" }, { url = "https://files.pythonhosted.org/packages/0a/03/996c2d1689d486a6e199cb0f1cf9e4aa940c500e01bdf201299d7d61fa69/greenlet-3.3.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:64970c33a50551c7c50491671265d8954046cb6e8e2999aacdd60e439b70418a", size = 1571277, upload-time = "2026-02-20T20:49:34.795Z" }, { url = "https://files.pythonhosted.org/packages/d9/c4/2570fc07f34a39f2caf0bf9f24b0a1a0a47bc2e8e465b2c2424821389dfc/greenlet-3.3.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:1a9172f5bf6bd88e6ba5a84e0a68afeac9dc7b6b412b245dd64f52d83c81e55b", size = 1640455, upload-time = "2026-02-20T20:21:10.261Z" }, @@ -1803,7 +1799,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3f/ae/8bffcbd373b57a5992cd077cbe8858fff39110480a9d50697091faea6f39/greenlet-3.3.2-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:8d1658d7291f9859beed69a776c10822a0a799bc4bfe1bd4272bb60e62507dab", size = 279650, upload-time = "2026-02-20T20:18:00.783Z" }, { url = "https://files.pythonhosted.org/packages/d1/c0/45f93f348fa49abf32ac8439938726c480bd96b2a3c6f4d949ec0124b69f/greenlet-3.3.2-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:18cb1b7337bca281915b3c5d5ae19f4e76d35e1df80f4ad3c1a7be91fadf1082", size = 650295, upload-time = "2026-02-20T20:47:34.036Z" }, { url = "https://files.pythonhosted.org/packages/b3/de/dd7589b3f2b8372069ab3e4763ea5329940fc7ad9dcd3e272a37516d7c9b/greenlet-3.3.2-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:c2e47408e8ce1c6f1ceea0dffcdf6ebb85cc09e55c7af407c99f1112016e45e9", size = 662163, upload-time = "2026-02-20T20:56:01.295Z" }, - { url = "https://files.pythonhosted.org/packages/cd/ac/85804f74f1ccea31ba518dcc8ee6f14c79f73fe36fa1beba38930806df09/greenlet-3.3.2-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:e3cb43ce200f59483eb82949bf1835a99cf43d7571e900d7c8d5c62cdf25d2f9", size = 675371, upload-time = "2026-02-20T21:02:49.664Z" }, { url = "https://files.pythonhosted.org/packages/d2/d8/09bfa816572a4d83bccd6750df1926f79158b1c36c5f73786e26dbe4ee38/greenlet-3.3.2-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:63d10328839d1973e5ba35e98cccbca71b232b14051fd957b6f8b6e8e80d0506", size = 664160, upload-time = "2026-02-20T20:21:04.015Z" }, { url = "https://files.pythonhosted.org/packages/48/cf/56832f0c8255d27f6c35d41b5ec91168d74ec721d85f01a12131eec6b93c/greenlet-3.3.2-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:8e4ab3cfb02993c8cc248ea73d7dae6cec0253e9afa311c9b37e603ca9fad2ce", size = 1619181, upload-time = "2026-02-20T20:49:36.052Z" }, { url = "https://files.pythonhosted.org/packages/0a/23/b90b60a4aabb4cec0796e55f25ffbfb579a907c3898cd2905c8918acaa16/greenlet-3.3.2-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:94ad81f0fd3c0c0681a018a976e5c2bd2ca2d9d94895f23e7bb1af4e8af4e2d5", size = 1687713, upload-time = "2026-02-20T20:21:11.684Z" }, @@ -1812,7 +1807,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/98/6d/8f2ef704e614bcf58ed43cfb8d87afa1c285e98194ab2cfad351bf04f81e/greenlet-3.3.2-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:e26e72bec7ab387ac80caa7496e0f908ff954f31065b0ffc1f8ecb1338b11b54", size = 286617, upload-time = "2026-02-20T20:19:29.856Z" }, { url = "https://files.pythonhosted.org/packages/5e/0d/93894161d307c6ea237a43988f27eba0947b360b99ac5239ad3fe09f0b47/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8b466dff7a4ffda6ca975979bab80bdadde979e29fc947ac3be4451428d8b0e4", size = 655189, upload-time = "2026-02-20T20:47:35.742Z" }, { url = "https://files.pythonhosted.org/packages/f5/2c/d2d506ebd8abcb57386ec4f7ba20f4030cbe56eae541bc6fd6ef399c0b41/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:b8bddc5b73c9720bea487b3bffdb1840fe4e3656fba3bd40aa1489e9f37877ff", size = 658225, upload-time = "2026-02-20T20:56:02.527Z" }, - { url = "https://files.pythonhosted.org/packages/d1/67/8197b7e7e602150938049d8e7f30de1660cfb87e4c8ee349b42b67bdb2e1/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:59b3e2c40f6706b05a9cd299c836c6aa2378cabe25d021acd80f13abf81181cf", size = 666581, upload-time = "2026-02-20T21:02:51.526Z" }, { url = "https://files.pythonhosted.org/packages/8e/30/3a09155fbf728673a1dea713572d2d31159f824a37c22da82127056c44e4/greenlet-3.3.2-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b26b0f4428b871a751968285a1ac9648944cea09807177ac639b030bddebcea4", size = 657907, upload-time = "2026-02-20T20:21:05.259Z" }, { url = "https://files.pythonhosted.org/packages/f3/fd/d05a4b7acd0154ed758797f0a43b4c0962a843bedfe980115e842c5b2d08/greenlet-3.3.2-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:1fb39a11ee2e4d94be9a76671482be9398560955c9e568550de0224e41104727", size = 1618857, upload-time = "2026-02-20T20:49:37.309Z" }, { url = "https://files.pythonhosted.org/packages/6f/e1/50ee92a5db521de8f35075b5eff060dd43d39ebd46c2181a2042f7070385/greenlet-3.3.2-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:20154044d9085151bc309e7689d6f7ba10027f8f5a8c0676ad398b951913d89e", size = 1680010, upload-time = "2026-02-20T20:21:13.427Z" }, @@ -3379,6 +3373,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/13/b4ef09837409a777f3c0af2a5b4ba9b7af34872bc43609dda0c209e4060d/opentelemetry_exporter_otlp_proto_common-1.37.0-py3-none-any.whl", hash = "sha256:53038428449c559b0c564b8d718df3314da387109c4d36bd1b94c9a641b0292e", size = 18359, upload-time = "2025-09-11T10:28:44.939Z" }, ] +[[package]] +name = "opentelemetry-exporter-otlp-proto-grpc" +version = "1.37.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/11/4ad0979d0bb13ae5a845214e97c8d42da43980034c30d6f72d8e0ebe580e/opentelemetry_exporter_otlp_proto_grpc-1.37.0.tar.gz", hash = "sha256:f55bcb9fc848ce05ad3dd954058bc7b126624d22c4d9e958da24d8537763bec5", size = 24465, upload-time = "2025-09-11T10:29:04.172Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/17/46630b74751031a658706bef23ac99cdc2953cd3b2d28ec90590a0766b3e/opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl", hash = "sha256:aee5104835bf7993b7ddaaf380b6467472abaedb1f1dbfcc54a52a7d781a3890", size = 19305, upload-time = "2025-09-11T10:28:45.776Z" }, +] + [[package]] name = "opentelemetry-exporter-otlp-proto-http" version = "1.37.0" @@ -3453,6 +3465,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/62/9f4ad6a54126fb00f7ed4bb5034964c6e4f00fcd5a905e115bd22707e20d/opentelemetry_sdk-1.37.0-py3-none-any.whl", hash = "sha256:8f3c3c22063e52475c5dbced7209495c2c16723d016d39287dfc215d1771257c", size = 131941, upload-time = "2025-09-11T10:28:57.83Z" }, ] +[[package]] +name = "opentelemetry-sdk-extension-aws" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f5/b3/825c93fe4c238845f1356297abea33d03b2adaafb5ae98fc257b394de124/opentelemetry_sdk_extension_aws-2.1.0.tar.gz", hash = "sha256:ff68ddecc1910f62c019d22ec0f7461713ead7f662d6a2304d4089c1a0b20416", size = 16334, upload-time = "2024-12-24T15:01:57.387Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/02/61/47a6a43b7935d54b5734fbf3fb0357dd5a7d0dfaa9677b7318518fe8d507/opentelemetry_sdk_extension_aws-2.1.0-py3-none-any.whl", hash = "sha256:c7cf6efc275d2c24108a468d954287ce5aab9733bac816a080cfb3117374e63a", size = 18776, upload-time = "2024-12-24T15:01:56.053Z" }, +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.58b0" @@ -4801,6 +4825,13 @@ google-adk = [ grpc = [ { name = "grpcio" }, ] +lambda-worker-otel = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, +] openai-agents = [ { name = "mcp" }, { name = "openai-agents" }, @@ -4828,6 +4859,9 @@ dev = [ { name = "openai-agents", extra = ["litellm"], marker = "python_full_version < '3.14'" }, { name = "openinference-instrumentation-google-adk" }, { name = "openinference-instrumentation-openai-agents" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, { name = "psutil" }, { name = "pydocstyle" }, { name = "pydoctor" }, @@ -4851,8 +4885,13 @@ requires-dist = [ { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, { name = "nexus-rpc", specifier = "==1.4.0" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, + { name = "opentelemetry-api", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", marker = "extra == 'lambda-worker-otel'", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", marker = "extra == 'lambda-worker-otel'", specifier = ">=0.40b0,<1" }, { name = "protobuf", specifier = ">=3.20,<7.0.0" }, { name = "pydantic", marker = "extra == 'pydantic'", specifier = ">=2.0.0,<3" }, { name = "python-dateutil", marker = "python_full_version < '3.11'", specifier = ">=2.8.2,<3" }, @@ -4860,7 +4899,7 @@ requires-dist = [ { name = "types-protobuf", specifier = ">=3.20" }, { name = "typing-extensions", specifier = ">=4.2.0,<5" }, ] -provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "aioboto3"] +provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "lambda-worker-otel", "aioboto3"] [package.metadata.requires-dev] dev = [ @@ -4877,6 +4916,9 @@ dev = [ { name = "openai-agents", extras = ["litellm"], marker = "python_full_version < '3.14'", specifier = ">=0.3,<0.7" }, { name = "openinference-instrumentation-google-adk", specifier = ">=0.1.8" }, { name = "openinference-instrumentation-openai-agents", specifier = ">=0.1.0" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", specifier = ">=0.40b0,<1" }, { name = "psutil", specifier = ">=5.9.3,<6" }, { name = "pydocstyle", specifier = ">=6.3.0,<7" }, { name = "pydoctor", specifier = ">=25.10.1,<26" }, From f256fd88bf92321d0e4a2cafdae3d54d9b53749f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 2 Apr 2026 11:51:46 -0700 Subject: [PATCH 2/6] Don't use asyncio.timeout since it's 3.11+ --- temporalio/contrib/aws/lambda_worker/_run_worker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index de431abee..8f8611692 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -251,9 +251,8 @@ async def _invocation_handler( ).total_seconds() if work_time_secs > 0: try: - async with asyncio.timeout(work_time_secs): - await worker.run() - except TimeoutError: + await asyncio.wait_for(worker.run(), timeout=work_time_secs) + except asyncio.TimeoutError: pass else: # No deadline - run until cancelled. From 34c0ba24490191184787f2c9cd05e75df04a4c72 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 2 Apr 2026 17:51:13 -0700 Subject: [PATCH 3/6] Various fixes to make run with sample & cleanup --- .../contrib/aws/lambda_worker/__init__.py | 22 ++--- .../contrib/aws/lambda_worker/_configure.py | 15 ++- .../contrib/aws/lambda_worker/_defaults.py | 8 +- .../contrib/aws/lambda_worker/_run_worker.py | 47 +++++----- temporalio/contrib/aws/lambda_worker/otel.py | 53 +++++------ .../aws/lambda_worker/test_lambda_worker.py | 92 ++++++++----------- 6 files changed, 107 insertions(+), 130 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/__init__.py b/temporalio/contrib/aws/lambda_worker/__init__.py index 25df52b9c..7f786eacf 100644 --- a/temporalio/contrib/aws/lambda_worker/__init__.py +++ b/temporalio/contrib/aws/lambda_worker/__init__.py @@ -1,8 +1,8 @@ """A wrapper for running Temporal workers inside AWS Lambda. -A single :py:func:`run_worker` call handles the full per-invocation lifecycle: -connecting to the Temporal server, creating a worker with Lambda-tuned defaults, -polling for tasks, and gracefully shutting down before the invocation deadline. +A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the +Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully +shutting down before the invocation deadline. Quick start:: @@ -14,7 +14,7 @@ def configure(config: LambdaWorkerConfig) -> None: config.worker_config["workflows"] = [MyWorkflow] config.worker_config["activities"] = [my_activity] - handler = run_worker( + lambda_handler = run_worker( WorkerDeploymentVersion( deployment_name="my-service", build_id="v1.0", @@ -24,9 +24,9 @@ def configure(config: LambdaWorkerConfig) -> None: Configuration ------------- -Client connection settings (address, namespace, TLS, API key) are loaded -automatically from a TOML config file and/or environment variables via -:py:mod:`temporalio.envconfig`. The config file is resolved in order: +Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML +config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is +resolved in order: 1. ``TEMPORAL_CONFIG_FILE`` env var, if set. 2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). @@ -34,10 +34,10 @@ def configure(config: LambdaWorkerConfig) -> None: The file is optional -- if absent, only environment variables are used. -The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with -fields pre-populated with Lambda-appropriate defaults. Override any field -directly in the callback. The ``task_queue`` key in ``worker_config`` is -pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if set. +The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated +with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue`` +key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if +set. """ from temporalio.contrib.aws.lambda_worker._configure import ( diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py index dc67c52fb..ed3051df2 100644 --- a/temporalio/contrib/aws/lambda_worker/_configure.py +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -17,8 +17,8 @@ class LambdaClientConnectConfig(ClientConnectConfig, total=False): """Keyword arguments for :py:meth:`temporalio.client.Client.connect`. - Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with - additional keys that may be set by the Lambda worker or its OTel helpers. + Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with additional keys that may be + set by the Lambda worker or its OTel helpers. """ identity: str @@ -31,13 +31,12 @@ class LambdaClientConnectConfig(ClientConnectConfig, total=False): class LambdaWorkerConfig: """Passed to the configure callback of :py:func:`run_worker`. - Fields are pre-populated with Lambda-appropriate defaults before the - configure callback is invoked; the callback may read and override any of - them. + Fields are pre-populated with Lambda-appropriate defaults before the configure callback is + invoked; the callback may read and override any of them. - Use ``worker_config`` to set task queue, register workflows/activities, and - tune worker options. The ``task_queue`` key is pre-populated from the - ``TEMPORAL_TASK_QUEUE`` environment variable if set. + Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options. + The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if + set. Attributes: client_connect_config: Keyword arguments that will be passed to diff --git a/temporalio/contrib/aws/lambda_worker/_defaults.py b/temporalio/contrib/aws/lambda_worker/_defaults.py index 796f5b3d5..1b93e3407 100644 --- a/temporalio/contrib/aws/lambda_worker/_defaults.py +++ b/temporalio/contrib/aws/lambda_worker/_defaults.py @@ -18,7 +18,7 @@ DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5) DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2) -DEFAULT_MAX_CACHED_WORKFLOWS: int = 100 +DEFAULT_MAX_CACHED_WORKFLOWS: int = 30 DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2) DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) @@ -68,7 +68,7 @@ def build_lambda_identity(request_id: str, function_arn: str) -> str: def lambda_default_config_file_path( getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment] -) -> str: +) -> Path: """Return the config file path for a Lambda environment. Resolution order: @@ -79,6 +79,6 @@ def lambda_default_config_file_path( """ config_file = getenv(ENV_CONFIG_FILE) if config_file: - return config_file + return Path(config_file) root = getenv(ENV_LAMBDA_TASK_ROOT) or "." - return str(Path(root) / DEFAULT_CONFIG_FILE) + return Path(root) / DEFAULT_CONFIG_FILE diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index 8f8611692..f07fb5b44 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -1,5 +1,3 @@ -"""Core run_worker implementation for Lambda.""" - from __future__ import annotations import asyncio @@ -67,23 +65,21 @@ def _default_extract_lambda_ctx( def run_worker( version: WorkerDeploymentVersion, configure: Callable[[LambdaWorkerConfig], None], -) -> Callable[[Any, Any], Awaitable[None]]: +) -> Callable[[Any, Any], None]: """Create a Temporal worker Lambda handler. - Calls the *configure* callback to collect workflow/activity registrations - and option overrides, then returns an async Lambda handler function. On - each invocation the handler connects to the Temporal server, starts a - worker with Lambda-tuned defaults, polls for tasks until the invocation + Calls the *configure* callback to collect workflow/activity registrations and option overrides, + then returns a Lambda handler function. On each invocation the handler connects to the Temporal + server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation deadline approaches, and then gracefully shuts down. - The *version* parameter identifies this worker's deployment version. - ``run_worker`` always enables Worker Deployment Versioning - (``use_worker_versioning=True``). To provide a default versioning behavior - for workflows that do not specify one at registration time, set + The *version* parameter identifies this worker's deployment version. ``run_worker`` always + enables Worker Deployment Versioning (``use_worker_versioning=True``). To provide a default + versioning behavior for workflows that do not specify one at registration time, set ``deployment_config`` in ``worker_config`` in the configure callback. - The returned handler has the signature ``async handler(event, context)`` - and should be set as your Lambda function's handler entry point. + The returned handler has the signature ``handler(event, context)`` and should be set as your + Lambda function's handler entry point. Args: version: The worker deployment version. Required. @@ -92,7 +88,7 @@ def run_worker( activities, and options on it. Returns: - An async Lambda handler function. + A Lambda handler function. Example:: @@ -127,7 +123,7 @@ def _run_worker_internal( version: WorkerDeploymentVersion, configure: Callable[[LambdaWorkerConfig], None], deps: _WorkerDeps, -) -> Callable[[Any, Any], Awaitable[None]]: +) -> Callable[[Any, Any], None]: """Core logic with injected dependencies for testability.""" if not version.deployment_name or not version.build_id: raise ValueError( @@ -180,12 +176,14 @@ def _run_worker_internal( extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx - async def _handler(_event: Any, lambda_context: Any) -> None: - await _invocation_handler( - lambda_context=lambda_context, - config=config, - deps=deps, - extract_lambda_ctx=extract_lambda_ctx, + def _handler(_event: Any, lambda_context: Any) -> None: + asyncio.run( + _invocation_handler( + lambda_context=lambda_context, + config=config, + deps=deps, + extract_lambda_ctx=extract_lambda_ctx, + ) ) return _handler @@ -210,9 +208,10 @@ async def _invocation_handler( work_time = remaining - shutdown_buffer if work_time <= timedelta(seconds=1): raise RuntimeError( - f"Lambda timeout leaves almost no time for work " - f"(work_time={work_time}, shutdown_buffer={shutdown_buffer}); " - f"increase the function timeout or decrease the shutdown " + f"Lambda timeout is too short: {remaining.total_seconds():.1f}s " + f"remaining but {shutdown_buffer.total_seconds():.1f}s is " + f"reserved for shutdown, leaving no time for work. " + f"Increase the function timeout or decrease the shutdown " f"deadline buffer" ) elif work_time < timedelta(seconds=5): diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py index 4cb33ef6f..7513f80ff 100644 --- a/temporalio/contrib/aws/lambda_worker/otel.py +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -1,12 +1,11 @@ """OpenTelemetry helpers for Temporal workers running inside AWS Lambda. -Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker` -configure callback for a batteries-included setup that creates an OTel -collector exporter and tracing interceptor, suitable for use with the AWS -Distro for OpenTelemetry (ADOT) Lambda layer. +Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker` configure callback for a +batteries-included setup that creates an OTel collector exporter and tracing interceptor, suitable +for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer. -Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` -individually if you only need one. +Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only +need one. """ from __future__ import annotations @@ -73,23 +72,19 @@ def apply_defaults( ) -> None: """Configure OTel metrics and tracing with AWS Lambda defaults. - Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime` - with an :py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the - OTLP collector, and adds a - :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for - distributed tracing. + Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime` with an + :py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a + :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing. - The collector endpoint defaults to ``http://localhost:4317``, which is the - endpoint expected by the ADOT collector Lambda layer. + The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by + the ADOT collector Lambda layer. - Registers a per-invocation ``ForceFlush`` shutdown hook for the - ``TracerProvider`` so pending traces are exported before each Lambda - invocation completes. + Registers a per-invocation ``ForceFlush`` shutdown hook for the ``TracerProvider`` so pending + traces are exported before each Lambda invocation completes. - Core SDK metrics are exported on the ``metric_periodicity`` interval by - the runtime's internal thread. There is no explicit flush API for Core - metrics; set ``metric_periodicity`` short enough to ensure at least one - export per invocation. + Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread. + There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to + ensure at least one export per invocation. Args: config: The :py:class:`LambdaWorkerConfig` to configure. @@ -146,16 +141,14 @@ def build_metrics_telemetry_config( ) -> TelemetryConfig: """Build a :py:class:`~temporalio.runtime.TelemetryConfig` for OTel metrics. - Returns a ``TelemetryConfig`` with - :py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics pointed at the - given OTLP collector endpoint. Use this when you need to compose metrics - config with other telemetry settings (e.g. custom logging) into your own + Returns a ``TelemetryConfig`` with :py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics + pointed at the given OTLP collector endpoint. Use this when you need to compose metrics config + with other telemetry settings (e.g. custom logging) into your own :py:class:`~temporalio.runtime.Runtime`. - Core SDK metrics are exported on the ``metric_periodicity`` interval by - the runtime's internal thread. There is no explicit flush API; set - ``metric_periodicity`` short enough to ensure at least one export per - Lambda invocation. + Core SDK metrics are exported on the ``metric_periodicity`` interval by the runtime's internal + thread. There is no explicit flush API; set ``metric_periodicity`` short enough to ensure at + least one export per Lambda invocation. Example:: @@ -205,8 +198,8 @@ def apply_tracing( """Configure only OTel tracing (no metrics) on the Lambda worker config. Adds a :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` to - ``config.client_connect_config["interceptors"]`` and registers a - ``ForceFlush`` shutdown hook for the provider. + ``config.client_connect_config["interceptors"]`` and registers a ``ForceFlush`` shutdown hook + for the provider. Args: config: The :py:class:`LambdaWorkerConfig` to configure. diff --git a/tests/contrib/aws/lambda_worker/test_lambda_worker.py b/tests/contrib/aws/lambda_worker/test_lambda_worker.py index 1d0f3cb11..178e078ac 100644 --- a/tests/contrib/aws/lambda_worker/test_lambda_worker.py +++ b/tests/contrib/aws/lambda_worker/test_lambda_worker.py @@ -3,6 +3,7 @@ from __future__ import annotations from datetime import timedelta +from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, patch @@ -185,21 +186,21 @@ def test_lambda_default_config_file_path_env_var(self) -> None: env = {"TEMPORAL_CONFIG_FILE": "/custom/path.toml"} assert ( lambda_default_config_file_path(env.get) # type: ignore[arg-type] - == "/custom/path.toml" + == Path("/custom/path.toml") ) def test_lambda_default_config_file_path_lambda_root(self) -> None: env = {"LAMBDA_TASK_ROOT": "/var/task"} assert ( lambda_default_config_file_path(env.get) # type: ignore[arg-type] - == "/var/task/temporal.toml" + == Path("/var/task/temporal.toml") ) def test_lambda_default_config_file_path_cwd(self) -> None: env: dict[str, str] = {} assert ( lambda_default_config_file_path(env.get) # type: ignore[arg-type] - == "temporal.toml" + == Path("temporal.toml") ) @@ -260,15 +261,14 @@ def test_returns_handler(self) -> None: handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) assert callable(handler) - @pytest.mark.asyncio - async def test_success(self) -> None: + def test_success(self) -> None: deps = _make_test_deps() def configure(config: LambdaWorkerConfig) -> None: config.worker_config["workflows"] = [type("FakeWf", (), {})] handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) def test_configure_callback_error(self) -> None: deps = _make_test_deps() @@ -294,8 +294,7 @@ def test_missing_version(self) -> None: deps, ) - @pytest.mark.asyncio - async def test_user_overrides_applied(self) -> None: + def test_user_overrides_applied(self) -> None: connect_capture: list[dict[str, Any]] = [] worker_capture: list[dict[str, Any]] = [] deps = _make_test_deps( @@ -309,17 +308,16 @@ def configure(config: LambdaWorkerConfig) -> None: config.worker_config["max_concurrent_activities"] = 99 handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert connect_capture[0]["namespace"] == "custom-ns" assert worker_capture[0]["max_concurrent_activities"] == 99 - @pytest.mark.asyncio - async def test_lambda_defaults_applied(self) -> None: + def test_lambda_defaults_applied(self) -> None: worker_capture: list[dict[str, Any]] = [] deps = _make_test_deps(worker_kwargs_capture=worker_capture) handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) kwargs = worker_capture[0] assert kwargs["max_concurrent_activities"] == DEFAULT_MAX_CONCURRENT_ACTIVITIES @@ -332,12 +330,11 @@ async def test_lambda_defaults_applied(self) -> None: assert dc.use_worker_versioning is True assert dc.version == TEST_VERSION - @pytest.mark.asyncio - async def test_identity_from_lambda_context(self) -> None: + def test_identity_from_lambda_context(self) -> None: connect_capture: list[dict[str, Any]] = [] deps = _make_test_deps(connect_kwargs_capture=connect_capture) handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) - await handler( + handler( {}, _make_lambda_context( request_id="req-abc-123", @@ -350,8 +347,7 @@ async def test_identity_from_lambda_context(self) -> None: == "req-abc-123@arn:aws:lambda:us-east-1:123456:function:my-func" ) - @pytest.mark.asyncio - async def test_identity_user_override_wins(self) -> None: + def test_identity_user_override_wins(self) -> None: connect_capture: list[dict[str, Any]] = [] deps = _make_test_deps(connect_kwargs_capture=connect_capture) @@ -359,20 +355,18 @@ def configure(config: LambdaWorkerConfig) -> None: config.client_connect_config["identity"] = "my-custom-identity" handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert connect_capture[0]["identity"] == "my-custom-identity" - @pytest.mark.asyncio - async def test_identity_no_lambda_context(self) -> None: + def test_identity_no_lambda_context(self) -> None: connect_capture: list[dict[str, Any]] = [] deps = _make_test_deps(connect_kwargs_capture=connect_capture) deps.extract_lambda_ctx = lambda ctx: None handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) - await handler({}, MagicMock(spec=[])) + handler({}, MagicMock(spec=[])) assert "identity" not in connect_capture[0] - @pytest.mark.asyncio - async def test_shutdown_hooks_called(self) -> None: + def test_shutdown_hooks_called(self) -> None: deps = _make_test_deps() shutdown_called = False @@ -384,11 +378,10 @@ def hook() -> None: config.shutdown_hooks.append(hook) handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert shutdown_called - @pytest.mark.asyncio - async def test_shutdown_hooks_called_per_invocation(self) -> None: + def test_shutdown_hooks_called_per_invocation(self) -> None: deps = _make_test_deps() shutdown_count = 0 @@ -400,13 +393,12 @@ def hook() -> None: config.shutdown_hooks.append(hook) handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) - await handler({}, _make_lambda_context()) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert shutdown_count == 3 - @pytest.mark.asyncio - async def test_shutdown_hooks_multiple_funcs_order(self) -> None: + def test_shutdown_hooks_multiple_funcs_order(self) -> None: deps = _make_test_deps() order: list[str] = [] @@ -415,11 +407,10 @@ def configure(config: LambdaWorkerConfig) -> None: config.shutdown_hooks.append(lambda: order.append("second")) handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert order == ["first", "second"] - @pytest.mark.asyncio - async def test_shutdown_hooks_error_continues(self) -> None: + def test_shutdown_hooks_error_continues(self) -> None: deps = _make_test_deps() second_called = False @@ -435,22 +426,20 @@ def second() -> None: config.shutdown_hooks.append(second) handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert second_called - @pytest.mark.asyncio - async def test_tight_deadline_raises_error(self) -> None: + def test_tight_deadline_raises_error(self) -> None: deps = _make_test_deps() def configure(config: LambdaWorkerConfig) -> None: config.shutdown_deadline_buffer = timedelta(milliseconds=1500) handler = _run_worker_internal(TEST_VERSION, configure, deps) - with pytest.raises(RuntimeError, match="almost no time for work"): - await handler({}, _make_lambda_context(remaining_ms=2000)) + with pytest.raises(RuntimeError, match="Lambda timeout is too short"): + handler({}, _make_lambda_context(remaining_ms=2000)) - @pytest.mark.asyncio - async def test_tight_deadline_logs_warning(self) -> None: + def test_tight_deadline_logs_warning(self) -> None: deps = _make_test_deps() def configure(config: LambdaWorkerConfig) -> None: @@ -460,12 +449,11 @@ def configure(config: LambdaWorkerConfig) -> None: with patch( "temporalio.contrib.aws.lambda_worker._run_worker.logger" ) as mock_logger: - await handler({}, _make_lambda_context(remaining_ms=2000)) + handler({}, _make_lambda_context(remaining_ms=2000)) mock_logger.warning.assert_called_once() assert "less than 5s" in mock_logger.warning.call_args[0][0] - @pytest.mark.asyncio - async def test_per_invocation_lifecycle(self) -> None: + def test_per_invocation_lifecycle(self) -> None: """Each invocation creates its own client and worker.""" connect_count = 0 deps = _make_test_deps() @@ -479,13 +467,12 @@ async def counting_connect(**kwargs: Any) -> Any: deps.connect = counting_connect handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) - await handler({}, _make_lambda_context()) - await handler({}, _make_lambda_context()) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert connect_count == 3 - @pytest.mark.asyncio - async def test_task_queue_from_config(self) -> None: + def test_task_queue_from_config(self) -> None: worker_capture: list[dict[str, Any]] = [] deps = _make_test_deps(worker_kwargs_capture=worker_capture) deps.getenv = lambda _: None # type: ignore[assignment] @@ -494,7 +481,7 @@ def configure(config: LambdaWorkerConfig) -> None: config.worker_config["task_queue"] = "explicit-queue" handler = _run_worker_internal(TEST_VERSION, configure, deps) - await handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) assert worker_capture[0]["task_queue"] == "explicit-queue" def test_task_queue_pre_populated_from_env(self) -> None: @@ -525,12 +512,11 @@ def configure(config: LambdaWorkerConfig) -> None: assert dc.use_worker_versioning is True assert dc.version == TEST_VERSION - @pytest.mark.asyncio - async def test_no_deadline_runs_until_complete(self) -> None: + def test_no_deadline_runs_until_complete(self) -> None: """When no deadline is available, worker runs until it completes.""" deps = _make_test_deps() handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) ctx = MagicMock(spec=["aws_request_id", "invoked_function_arn"]) ctx.aws_request_id = "req-123" ctx.invoked_function_arn = "arn:aws:lambda:us-east-1:123:function:f" - await handler({}, ctx) + handler({}, ctx) From fed90fe25bffd098e433508b052c69dde260e149 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 3 Apr 2026 10:51:50 -0700 Subject: [PATCH 4/6] Add ClientConnectConfig --- temporalio/client.py | 23 +++++++++++++++++++ .../contrib/aws/lambda_worker/__init__.py | 6 +---- .../contrib/aws/lambda_worker/_configure.py | 23 ++++--------------- .../contrib/aws/lambda_worker/_run_worker.py | 8 +++---- tests/contrib/aws/lambda_worker/test_otel.py | 9 +++++--- tests/test_client.py | 15 ++++++++++++ tests/worker/test_worker.py | 15 ++++++++++++ 7 files changed, 67 insertions(+), 32 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index cc2750ec6..9e5e3494e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2802,6 +2802,29 @@ async def get_worker_task_reachability( ) +class ClientConnectConfig(TypedDict, total=False): + """TypedDict of keyword arguments for :py:meth:`Client.connect`.""" + + target_host: str + namespace: str + api_key: str | None + data_converter: temporalio.converter.DataConverter + plugins: Sequence[Plugin] + interceptors: Sequence[Interceptor] + default_workflow_query_reject_condition: ( + temporalio.common.QueryRejectCondition | None + ) + tls: bool | TLSConfig | None + retry_config: RetryConfig | None + keep_alive_config: KeepAliveConfig | None + rpc_metadata: Mapping[str, str | bytes] + identity: str | None + lazy: bool + runtime: temporalio.runtime.Runtime | None + http_connect_proxy_config: HttpConnectProxyConfig | None + header_codec_behavior: HeaderCodecBehavior + + class ClientConfig(TypedDict, total=False): """TypedDict of config originally passed to :py:meth:`Client`.""" diff --git a/temporalio/contrib/aws/lambda_worker/__init__.py b/temporalio/contrib/aws/lambda_worker/__init__.py index 7f786eacf..11f748c9b 100644 --- a/temporalio/contrib/aws/lambda_worker/__init__.py +++ b/temporalio/contrib/aws/lambda_worker/__init__.py @@ -40,14 +40,10 @@ def configure(config: LambdaWorkerConfig) -> None: set. """ -from temporalio.contrib.aws.lambda_worker._configure import ( - LambdaClientConnectConfig, - LambdaWorkerConfig, -) +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig from temporalio.contrib.aws.lambda_worker._run_worker import run_worker __all__ = [ - "LambdaClientConnectConfig", "LambdaWorkerConfig", "run_worker", ] diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py index ed3051df2..8ba048138 100644 --- a/temporalio/contrib/aws/lambda_worker/_configure.py +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -4,29 +4,14 @@ import asyncio import sys -from collections.abc import Awaitable, Callable, Sequence +from collections.abc import Awaitable, Callable from dataclasses import dataclass, field from datetime import timedelta -from typing import Any -import temporalio.runtime -from temporalio.envconfig import ClientConnectConfig +from temporalio.client import ClientConnectConfig from temporalio.worker import WorkerConfig -class LambdaClientConnectConfig(ClientConnectConfig, total=False): - """Keyword arguments for :py:meth:`temporalio.client.Client.connect`. - - Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with additional keys that may be - set by the Lambda worker or its OTel helpers. - """ - - identity: str - runtime: temporalio.runtime.Runtime - interceptors: Sequence[Any] - plugins: Sequence[Any] - - @dataclass class LambdaWorkerConfig: """Passed to the configure callback of :py:func:`run_worker`. @@ -60,8 +45,8 @@ class LambdaWorkerConfig: per-process resources. """ - client_connect_config: LambdaClientConnectConfig = field( - default_factory=LambdaClientConnectConfig + client_connect_config: ClientConnectConfig = field( + default_factory=ClientConnectConfig ) worker_config: WorkerConfig = field(default_factory=WorkerConfig) shutdown_deadline_buffer: timedelta = field( diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index f07fb5b44..a2e877f95 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -11,9 +11,9 @@ import temporalio.client import temporalio.worker +from temporalio.client import ClientConnectConfig from temporalio.common import WorkerDeploymentVersion from temporalio.contrib.aws.lambda_worker._configure import ( - LambdaClientConnectConfig, LambdaWorkerConfig, _run_shutdown_hooks, ) @@ -133,7 +133,7 @@ def _run_worker_internal( # Load client config from envconfig / TOML. load_config = deps.load_config or (lambda: _default_load_config(deps.getenv)) profile = load_config() - connect_config: LambdaClientConnectConfig = {**profile.to_client_connect_config()} + connect_config: ClientConnectConfig = {**profile.to_client_connect_config()} # Build worker config with Lambda defaults. worker_config: WorkerConfig = {} @@ -225,9 +225,7 @@ async def _invocation_handler( ) # Build per-invocation connect kwargs with identity from Lambda context. - invocation_connect_kwargs: LambdaClientConnectConfig = { - **config.client_connect_config - } + invocation_connect_kwargs: ClientConnectConfig = {**config.client_connect_config} if "identity" not in invocation_connect_kwargs: ctx_info = extract_lambda_ctx(lambda_context) if ctx_info is not None: diff --git a/tests/contrib/aws/lambda_worker/test_otel.py b/tests/contrib/aws/lambda_worker/test_otel.py index 69cc48a64..e209052e9 100644 --- a/tests/contrib/aws/lambda_worker/test_otel.py +++ b/tests/contrib/aws/lambda_worker/test_otel.py @@ -20,6 +20,7 @@ apply_tracing, build_metrics_telemetry_config, ) +from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig @@ -40,13 +41,15 @@ def test_adds_interceptor(self) -> None: def test_appends_to_existing_interceptors(self) -> None: config = LambdaWorkerConfig() - sentinel = object() - config.client_connect_config["interceptors"] = [sentinel] + # Use a TracingInterceptor as the existing interceptor. + existing_provider, _ = _make_tracer_provider() + existing = TracingInterceptor(tracer=existing_provider.get_tracer("existing")) + config.client_connect_config["interceptors"] = [existing] tracer_provider, _ = _make_tracer_provider() apply_tracing(config, tracer_provider) interceptors = config.client_connect_config["interceptors"] assert len(interceptors) == 2 - assert interceptors[0] is sentinel + assert interceptors[0] is existing def test_registers_flush_shutdown_hook(self) -> None: config = LambdaWorkerConfig() diff --git a/tests/test_client.py b/tests/test_client.py index b8bebdaf7..530c166f0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1552,6 +1552,21 @@ def test_fork_create_client( self.run(mp_fork_ctx) +def test_client_connect_config_matches_connect_params(): + """ClientConnectConfig TypedDict keys must match Client.connect kwargs.""" + import inspect + + from temporalio.client import Client, ClientConnectConfig + + connect_params = set(inspect.signature(Client.connect).parameters.keys()) - {"cls"} + config_keys = set(ClientConnectConfig.__annotations__.keys()) + assert config_keys == connect_params, ( + f"ClientConnectConfig is out of sync with Client.connect. " + f"Missing from config: {connect_params - config_keys}. " + f"Extra in config: {config_keys - connect_params}." + ) + + class TestForkUseClient(_TestFork): async def coro(self): await self._client.start_workflow( diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 4c76a7ba9..4aa366735 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1543,3 +1543,18 @@ async def test_continue_as_new_with_version_upgrade( # Expect workflow to return "v2.0", indicating that it continued-as-new and completed on v2 result = await handle.result() assert result == "v2.0" + + +def test_worker_config_matches_init_params(): + """WorkerConfig TypedDict keys must match Worker.__init__ kwargs.""" + import inspect + + from temporalio.worker import Worker, WorkerConfig + + init_params = set(inspect.signature(Worker.__init__).parameters.keys()) - {"self"} + config_keys = set(WorkerConfig.__annotations__.keys()) + assert config_keys == init_params, ( + f"WorkerConfig is out of sync with Worker.__init__. " + f"Missing from config: {init_params - config_keys}. " + f"Extra in config: {config_keys - init_params}." + ) From efcffca5ea9327ebb2eb759808ff3b5a03124abc Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 3 Apr 2026 11:38:16 -0700 Subject: [PATCH 5/6] Fix broken docstring links --- .../contrib/aws/lambda_worker/_configure.py | 2 +- temporalio/contrib/aws/lambda_worker/otel.py | 18 +++++++++--------- temporalio/contrib/aws/s3driver/_driver.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py index 8ba048138..4a6a8bfdb 100644 --- a/temporalio/contrib/aws/lambda_worker/_configure.py +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -29,7 +29,7 @@ class LambdaWorkerConfig: config file / environment variables via envconfig, with Lambda defaults applied. worker_config: Keyword arguments that will be passed to the - :py:class:`~temporalio.worker.Worker` constructor (the ``client`` + :py:class:`temporalio.worker.Worker` constructor (the ``client`` key is managed internally). Pre-populated with Lambda-appropriate defaults (low concurrency, eager activities disabled) and ``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set. diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py index 7513f80ff..2c63bde4b 100644 --- a/temporalio/contrib/aws/lambda_worker/otel.py +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -1,6 +1,6 @@ """OpenTelemetry helpers for Temporal workers running inside AWS Lambda. -Use :py:func:`apply_defaults` inside a :py:func:`~lambda_worker.run_worker` configure callback for a +Use :py:func:`apply_defaults` inside a :py:func:`run_worker` configure callback for a batteries-included setup that creates an OTel collector exporter and tracing interceptor, suitable for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer. @@ -72,9 +72,9 @@ def apply_defaults( ) -> None: """Configure OTel metrics and tracing with AWS Lambda defaults. - Sets up Core SDK metrics export via a :py:class:`~temporalio.runtime.Runtime` with an - :py:class:`~temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a - :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing. + Sets up Core SDK metrics export via a :py:class:`temporalio.runtime.Runtime` with an + :py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a + :py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing. The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by the ADOT collector Lambda layer. @@ -139,12 +139,12 @@ def build_metrics_telemetry_config( service_name: str = "", metric_periodicity: timedelta | None = None, ) -> TelemetryConfig: - """Build a :py:class:`~temporalio.runtime.TelemetryConfig` for OTel metrics. + """Build a :py:class:`temporalio.runtime.TelemetryConfig` for OTel metrics. - Returns a ``TelemetryConfig`` with :py:class:`~temporalio.runtime.OpenTelemetryConfig` metrics + Returns a ``TelemetryConfig`` with :py:class:`temporalio.runtime.OpenTelemetryConfig` metrics pointed at the given OTLP collector endpoint. Use this when you need to compose metrics config with other telemetry settings (e.g. custom logging) into your own - :py:class:`~temporalio.runtime.Runtime`. + :py:class:`temporalio.runtime.Runtime`. Core SDK metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread. There is no explicit flush API; set ``metric_periodicity`` short enough to ensure at @@ -171,7 +171,7 @@ def build_metrics_telemetry_config( Returns: A ``TelemetryConfig`` ready to pass to - :py:class:`~temporalio.runtime.Runtime`. + :py:class:`temporalio.runtime.Runtime`. """ if not endpoint: endpoint = "http://localhost:4317" @@ -197,7 +197,7 @@ def apply_tracing( ) -> None: """Configure only OTel tracing (no metrics) on the Lambda worker config. - Adds a :py:class:`~temporalio.contrib.opentelemetry.TracingInterceptor` to + Adds a :py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` to ``config.client_connect_config["interceptors"]`` and registers a ``ForceFlush`` shutdown hook for the provider. diff --git a/temporalio/contrib/aws/s3driver/_driver.py b/temporalio/contrib/aws/s3driver/_driver.py index 481e3a9d4..3b9858813 100644 --- a/temporalio/contrib/aws/s3driver/_driver.py +++ b/temporalio/contrib/aws/s3driver/_driver.py @@ -64,7 +64,7 @@ def __init__( Args: client: An :class:`S3StorageDriverClient` implementation. Use - :func:`~temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to + :func:`temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to wrap an aioboto3 S3 client. bucket: S3 bucket name, access point ARN, or a callable that accepts ``(StorageDriverStoreContext, Payload)`` and returns @@ -73,7 +73,7 @@ def __init__( driver_name: Name of this driver instance. Defaults to ``"aws.s3driver"``. Override when registering multiple S3StorageDriver instances with distinct configurations - under the same :attr:`~temporalio.extstore.Options.drivers` list. + under the same ``temporalio.extstore.Options.drivers`` list. max_payload_size: Maximum serialized payload size in bytes that the driver will accept. Defaults to 52428800 (50 MiB). Raise this value if your workload requires larger payloads; lower it to @@ -105,7 +105,7 @@ async def store( context: StorageDriverStoreContext, payloads: Sequence[Payload], ) -> list[StorageDriverClaim]: - """Stores payloads in S3 and returns a :class:`~temporalio.extstore.DriverClaim` for each one. + """Stores payloads in S3 and returns a ``temporalio.extstore.DriverClaim`` for each one. Payloads are keyed by their SHA-256 hash, so identical serialized bytes share the same S3 object. Deduplication is best-effort because the same @@ -190,7 +190,7 @@ async def retrieve( context: StorageDriverRetrieveContext, # noqa: ARG002 claims: Sequence[StorageDriverClaim], ) -> list[Payload]: - """Retrieves payloads from S3 for the given :class:`~temporalio.extstore.DriverClaim` list.""" + """Retrieves payloads from S3 for the given ``temporalio.extstore.DriverClaim`` list.""" async def _download(claim: StorageDriverClaim) -> Payload: bucket = claim.claim_data["bucket"] From 696fd88cbeb7e03feff4f4ab98f6e23a38c3370d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 3 Apr 2026 15:12:06 -0700 Subject: [PATCH 6/6] Review feedback --- .../contrib/aws/lambda_worker/_configure.py | 9 +-- .../contrib/aws/lambda_worker/_run_worker.py | 2 +- temporalio/contrib/aws/lambda_worker/otel.py | 73 ++++++++++++------- tests/contrib/aws/lambda_worker/test_otel.py | 67 ++++++++--------- 4 files changed, 87 insertions(+), 64 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py index 4a6a8bfdb..dd1657f6d 100644 --- a/temporalio/contrib/aws/lambda_worker/_configure.py +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio -import sys +import logging from collections.abc import Awaitable, Callable from dataclasses import dataclass, field from datetime import timedelta @@ -11,6 +11,8 @@ from temporalio.client import ClientConnectConfig from temporalio.worker import WorkerConfig +logger = logging.getLogger(__name__) + @dataclass class LambdaWorkerConfig: @@ -67,7 +69,4 @@ async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction] if asyncio.iscoroutine(result): await result except Exception as e: - print( - f"lambda_worker: shutdown hook error: {e}", - file=sys.stderr, - ) + logger.error(f"shutdown hook error: {e}") diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index a2e877f95..6a2cc75a3 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -115,7 +115,7 @@ def configure(config: LambdaWorkerConfig) -> None: try: return _run_worker_internal(version, configure, deps) except Exception as e: - print(f"lambda_worker: fatal: {e}", file=sys.stderr) + logger.error(f"fatal error running lambda worker: {e}") sys.exit(1) diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py index 2c63bde4b..216e80f48 100644 --- a/temporalio/contrib/aws/lambda_worker/otel.py +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -1,7 +1,7 @@ """OpenTelemetry helpers for Temporal workers running inside AWS Lambda. Use :py:func:`apply_defaults` inside a :py:func:`run_worker` configure callback for a -batteries-included setup that creates an OTel collector exporter and tracing interceptor, suitable +batteries-included setup that creates an OTel collector exporter and tracing plugin, suitable for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer. Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only @@ -10,18 +10,22 @@ from __future__ import annotations +import logging import os from dataclasses import dataclass, field from datetime import timedelta from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME +from opentelemetry.trace import get_tracer_provider, set_tracer_provider from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig +logger = logging.getLogger(__name__) + @dataclass class OtelOptions: @@ -73,14 +77,20 @@ def apply_defaults( """Configure OTel metrics and tracing with AWS Lambda defaults. Sets up Core SDK metrics export via a :py:class:`temporalio.runtime.Runtime` with an - :py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a - :py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing. + :py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds the + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` for distributed tracing with + workflow sandbox passthrough. + + Creates a replay-safe ``TracerProvider`` (with X-Ray ID generator and OTLP gRPC exporter if + available) and sets it as the global OpenTelemetry tracer provider. The + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` uses the global provider, so + it must be set before the worker starts. The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by the ADOT collector Lambda layer. - Registers a per-invocation ``ForceFlush`` shutdown hook for the ``TracerProvider`` so pending - traces are exported before each Lambda invocation completes. + Registers a per-invocation ``ForceFlush`` shutdown hook for the global ``TracerProvider`` so + pending traces are exported before each Lambda invocation completes. Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread. There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to @@ -112,11 +122,16 @@ def apply_defaults( AwsXRayIdGenerator, ) - tracer_provider = TracerProvider( + tracer_provider = create_tracer_provider( resource=resource, id_generator=AwsXRayIdGenerator() ) except ImportError: - tracer_provider = TracerProvider(resource=resource) + logger.warning( + "opentelemetry-sdk-extension-aws is not installed; " + "X-Ray trace ID generation is disabled. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + tracer_provider = create_tracer_provider(resource=resource) # Use OTLP gRPC exporter if available, otherwise skip trace export. try: @@ -128,9 +143,16 @@ def apply_defaults( BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) ) except ImportError: - pass + logger.warning( + "opentelemetry-exporter-otlp-proto-grpc is not installed; " + "traces will not be exported to the OTLP collector. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + + # Set as global so the OpenTelemetryPlugin picks it up. + set_tracer_provider(tracer_provider) - apply_tracing(config, tracer_provider) + apply_tracing(config) def build_metrics_telemetry_config( @@ -191,28 +213,29 @@ def build_metrics_telemetry_config( ) -def apply_tracing( - config: LambdaWorkerConfig, - tracer_provider: TracerProvider, -) -> None: +def apply_tracing(config: LambdaWorkerConfig) -> None: """Configure only OTel tracing (no metrics) on the Lambda worker config. - Adds a :py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` to - ``config.client_connect_config["interceptors"]`` and registers a ``ForceFlush`` shutdown hook - for the provider. + Adds an :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` to + ``config.worker_config["plugins"]``. The plugin uses the global + ``TracerProvider`` set via ``opentelemetry.trace.set_tracer_provider``. + Ensure your provider is set globally before the worker starts. + + Also registers a ``ForceFlush`` shutdown hook that flushes the global + ``TracerProvider`` (if it supports ``force_flush``). Args: config: The :py:class:`LambdaWorkerConfig` to configure. - tracer_provider: The ``TracerProvider`` to use for tracing. """ - from temporalio.contrib.opentelemetry import TracingInterceptor - - interceptor = TracingInterceptor(tracer=tracer_provider.get_tracer("temporal-sdk")) - interceptors = list(config.client_connect_config.get("interceptors", [])) - interceptors.append(interceptor) - config.client_connect_config["interceptors"] = interceptors + plugin = OpenTelemetryPlugin() + plugins = list(config.worker_config.get("plugins", [])) + plugins.append(plugin) + config.worker_config["plugins"] = plugins async def _flush() -> None: - tracer_provider.force_flush() + provider = get_tracer_provider() + flush = getattr(provider, "force_flush", None) + if flush is not None: + flush() config.shutdown_hooks.append(_flush) diff --git a/tests/contrib/aws/lambda_worker/test_otel.py b/tests/contrib/aws/lambda_worker/test_otel.py index e209052e9..97f3d9647 100644 --- a/tests/contrib/aws/lambda_worker/test_otel.py +++ b/tests/contrib/aws/lambda_worker/test_otel.py @@ -6,9 +6,6 @@ from unittest.mock import patch import pytest -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from temporalio.contrib.aws.lambda_worker._configure import ( LambdaWorkerConfig, @@ -20,48 +17,37 @@ apply_tracing, build_metrics_telemetry_config, ) -from temporalio.contrib.opentelemetry import TracingInterceptor +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig -def _make_tracer_provider() -> tuple[TracerProvider, InMemorySpanExporter]: - span_exporter = InMemorySpanExporter() - tracer_provider = TracerProvider() - tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) - return tracer_provider, span_exporter - - class TestApplyTracing: - def test_adds_interceptor(self) -> None: + def test_adds_plugin(self) -> None: config = LambdaWorkerConfig() - tracer_provider, _ = _make_tracer_provider() - apply_tracing(config, tracer_provider) - interceptors = config.client_connect_config.get("interceptors", []) - assert len(interceptors) == 1 + apply_tracing(config) + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) - def test_appends_to_existing_interceptors(self) -> None: + def test_appends_to_existing_plugins(self) -> None: config = LambdaWorkerConfig() - # Use a TracingInterceptor as the existing interceptor. - existing_provider, _ = _make_tracer_provider() - existing = TracingInterceptor(tracer=existing_provider.get_tracer("existing")) - config.client_connect_config["interceptors"] = [existing] - tracer_provider, _ = _make_tracer_provider() - apply_tracing(config, tracer_provider) - interceptors = config.client_connect_config["interceptors"] - assert len(interceptors) == 2 - assert interceptors[0] is existing + existing = OpenTelemetryPlugin() + config.worker_config["plugins"] = [existing] + apply_tracing(config) + plugins = config.worker_config["plugins"] + assert len(plugins) == 2 + assert plugins[0] is existing def test_registers_flush_shutdown_hook(self) -> None: config = LambdaWorkerConfig() - tracer_provider, _ = _make_tracer_provider() - apply_tracing(config, tracer_provider) + apply_tracing(config) assert len(config.shutdown_hooks) == 1 @pytest.mark.asyncio async def test_shutdown_hook_flushes(self) -> None: config = LambdaWorkerConfig() - tracer_provider, _ = _make_tracer_provider() - apply_tracing(config, tracer_provider) + apply_tracing(config) + # Should not raise even with the default noop global provider. await _run_shutdown_hooks(config) @@ -95,7 +81,6 @@ def test_composable_with_custom_runtime(self) -> None: import dataclasses tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") - # Replace logging config to demonstrate composability. custom_tc = dataclasses.replace(tc, logging=None) assert custom_tc.logging is None assert isinstance(custom_tc.metrics, OpenTelemetryConfig) @@ -106,11 +91,27 @@ def test_configures_metrics_and_tracing(self) -> None: config = LambdaWorkerConfig() apply_defaults(config, OtelOptions(collector_endpoint="http://localhost:4317")) + # Metrics: runtime should be set. assert "runtime" in config.client_connect_config - interceptors = config.client_connect_config.get("interceptors", []) - assert len(interceptors) == 1 + # Tracing: plugin should be added. + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) + # Shutdown hook for tracer flush. assert len(config.shutdown_hooks) == 1 + def test_sets_global_tracer_provider(self) -> None: + from opentelemetry.trace import get_tracer_provider + + from temporalio.contrib.opentelemetry._tracer_provider import ( + ReplaySafeTracerProvider, + ) + + config = LambdaWorkerConfig() + apply_defaults(config) + provider = get_tracer_provider() + assert isinstance(provider, ReplaySafeTracerProvider) + def test_service_name_from_options(self) -> None: config = LambdaWorkerConfig() apply_defaults(config, OtelOptions(service_name="my-service"))