Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
lambda-worker-otel = [
"opentelemetry-api>=1.11.1,<2",
"opentelemetry-sdk>=1.11.1,<2",
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
"opentelemetry-semantic-conventions>=0.40b0,<1",
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
]
aioboto3 = [
"aioboto3>=10.4.0",
"types-aioboto3[s3]>=10.4.0",
Expand Down Expand Up @@ -69,6 +76,9 @@ dev = [
"googleapis-common-protos==1.70.0",
"pytest-rerunfailures>=16.1",
"moto[s3,server]>=5",
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
"opentelemetry-semantic-conventions>=0.40b0,<1",
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
]

[tool.poe.tasks]
Expand Down
1 change: 1 addition & 0 deletions temporalio/contrib/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""AWS integrations for Temporal SDK."""
104 changes: 104 additions & 0 deletions temporalio/contrib/aws/lambda_worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# lambda_worker

A wrapper for running [Temporal](https://temporal.io) workers inside AWS Lambda. A single
`run_worker` call handles the full per-invocation lifecycle: connecting to the Temporal server,
creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully shutting down before
the invocation deadline.

## Quick start

```python
# handler.py
from temporalio.common import WorkerDeploymentVersion
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker

from my_workflows import MyWorkflow
from my_activities import my_activity


def configure(config: LambdaWorkerConfig) -> None:
config.worker_config["task_queue"] = "my-task-queue"
config.worker_config["workflows"] = [MyWorkflow]
config.worker_config["activities"] = [my_activity]


lambda_handler = run_worker(
WorkerDeploymentVersion(
deployment_name="my-service",
build_id="v1.0",
),
configure,
)
```

## Configuration

Client connection settings (address, namespace, TLS, API key) are loaded
automatically from a TOML config file and/or environment variables via
`temporalio.envconfig`. The config file is resolved in order:

1. `TEMPORAL_CONFIG_FILE` env var, if set.
2. `temporal.toml` in `$LAMBDA_TASK_ROOT` (typically `/var/task`).
3. `temporal.toml` in the current working directory.

The file is optional -- if absent, only environment variables are used.

The configure callback receives a `LambdaWorkerConfig` dataclass with fields
pre-populated with Lambda-appropriate defaults. Override any field directly in
the callback. The `task_queue` key in `worker_config` is pre-populated from the
`TEMPORAL_TASK_QUEUE` environment variable if set.

## Lambda-tuned worker defaults

The package applies conservative concurrency limits suited to Lambda's resource
constraints:

| Setting | Default |
| --- | --- |
| `max_concurrent_activities` | 2 |
| `max_concurrent_workflow_tasks` | 10 |
| `max_concurrent_local_activities` | 2 |
| `max_concurrent_nexus_tasks` | 5 |
| `workflow_task_poller_behavior` | `SimpleMaximum(2)` |
| `activity_task_poller_behavior` | `SimpleMaximum(1)` |
| `nexus_task_poller_behavior` | `SimpleMaximum(1)` |
| `graceful_shutdown_timeout` | 5 seconds |
| `max_cached_workflows` | 100 |
| `disable_eager_activity_execution` | Always `True` |

Worker Deployment Versioning is always enabled.

## Observability

Metrics and tracing are opt-in. The `otel` module provides convenience helpers
for AWS Distro for OpenTelemetry (ADOT):

```python
from temporalio.common import WorkerDeploymentVersion
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker
from temporalio.contrib.aws.lambda_worker.otel import apply_defaults, OtelOptions


def configure(config: LambdaWorkerConfig) -> None:
config.worker_config["task_queue"] = "my-task-queue"
config.worker_config["workflows"] = [MyWorkflow]
config.worker_config["activities"] = [my_activity]
apply_defaults(config, OtelOptions())


lambda_handler = run_worker(
WorkerDeploymentVersion(
deployment_name="my-service",
build_id="v1.0",
),
configure,
)
```

You can also use `apply_metrics` or `apply_tracing` individually.

If you use OTEL, you can use
[ADOT](https://aws-otel.github.io/docs/getting-started/lambda/lambda-python)
(the AWS Distro For OpenTelemetry) to automatically integrate with AWS
observability functionality. Namely, you will want to add the Lambda layer in
the aforementioned link. We'll handle setting up the SDK for you.
53 changes: 53 additions & 0 deletions temporalio/contrib/aws/lambda_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""A wrapper for running Temporal workers inside AWS Lambda.

A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the
Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully
shutting down before the invocation deadline.

Quick start::

from temporalio.common import WorkerDeploymentVersion
from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker

def configure(config: LambdaWorkerConfig) -> None:
config.worker_config["task_queue"] = "my-task-queue"
config.worker_config["workflows"] = [MyWorkflow]
config.worker_config["activities"] = [my_activity]

lambda_handler = run_worker(
WorkerDeploymentVersion(
deployment_name="my-service",
build_id="v1.0",
),
configure,
)

Configuration
-------------
Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML
config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is
resolved in order:

1. ``TEMPORAL_CONFIG_FILE`` env var, if set.
2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``).
3. ``temporal.toml`` in the current working directory.

The file is optional -- if absent, only environment variables are used.

The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated
with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue``
key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
set.
"""

from temporalio.contrib.aws.lambda_worker._configure import (
LambdaClientConnectConfig,
LambdaWorkerConfig,
)
from temporalio.contrib.aws.lambda_worker._run_worker import run_worker

__all__ = [
"LambdaClientConnectConfig",
"LambdaWorkerConfig",
"run_worker",
]
88 changes: 88 additions & 0 deletions temporalio/contrib/aws/lambda_worker/_configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Configuration for the Lambda worker."""

from __future__ import annotations

import asyncio
import sys
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any

import temporalio.runtime
from temporalio.envconfig import ClientConnectConfig
from temporalio.worker import WorkerConfig


class LambdaClientConnectConfig(ClientConnectConfig, total=False):
"""Keyword arguments for :py:meth:`temporalio.client.Client.connect`.

Extends :py:class:`~temporalio.envconfig.ClientConnectConfig` with additional keys that may be
set by the Lambda worker or its OTel helpers.
"""

identity: str
runtime: temporalio.runtime.Runtime
interceptors: Sequence[Any]
plugins: Sequence[Any]


@dataclass
class LambdaWorkerConfig:
"""Passed to the configure callback of :py:func:`run_worker`.

Fields are pre-populated with Lambda-appropriate defaults before the configure callback is
invoked; the callback may read and override any of them.

Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options.
The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if
set.

Attributes:
client_connect_config: Keyword arguments that will be passed to
:py:meth:`temporalio.client.Client.connect`. Pre-populated from the
config file / environment variables via envconfig, with Lambda
defaults applied.
worker_config: Keyword arguments that will be passed to the
:py:class:`~temporalio.worker.Worker` constructor (the ``client``
key is managed internally). Pre-populated with Lambda-appropriate
defaults (low concurrency, eager activities disabled) and
``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set.
shutdown_deadline_buffer: How long before the Lambda invocation
deadline the worker begins its shutdown sequence (worker drain +
shutdown hooks). Pre-populated to
``graceful_shutdown_timeout + 2s``. If you change
``graceful_shutdown_timeout`` in ``worker_config``, adjust this
accordingly.
shutdown_hooks: Functions called at the end of each Lambda invocation,
after the worker has stopped. Run in list order. Each may be sync
or async. Use this to flush telemetry providers or release other
per-process resources.
"""

client_connect_config: LambdaClientConnectConfig = field(
default_factory=LambdaClientConnectConfig
)
worker_config: WorkerConfig = field(default_factory=WorkerConfig)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main design question is about this. With this decision, we auto-reuse all the worker init parameters, which is good, but the syntax looks like:

config.worker_config["task_queue"] = "my_task_queue"

Which, the key literal does autocomplete, but it's maybe not as nice as duplicating some of the more common fields so we can have

config.task_queue = "my_task_queue"

The downside there is, of course, that they're duplicative of the same fields as keys on worker_config.

I think I prefer how it is now, since the keys still autocomplete, but would like to hear input.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a type like WorkerConfig for client connect. We probably should - I could add it in this PR but it would mean making changes to the main part of the SDK (albeit easy ones)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback from SDK team: Let's stick with dict for now & add Client connection config class

shutdown_deadline_buffer: timedelta = field(
default_factory=lambda: timedelta(seconds=7)
)
shutdown_hooks: list[Callable[[], Awaitable[None] | None]] = field(
default_factory=list
)


async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction]
config: LambdaWorkerConfig,
) -> None:
"""Run all registered shutdown hooks in order, logging errors."""
for fn in config.shutdown_hooks:
try:
result = fn()
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(
f"lambda_worker: shutdown hook error: {e}",
file=sys.stderr,
)
84 changes: 84 additions & 0 deletions temporalio/contrib/aws/lambda_worker/_defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Lambda-tuned defaults for Temporal worker and client configuration."""

from __future__ import annotations

import os
from collections.abc import Callable
from datetime import timedelta
from pathlib import Path

from temporalio.worker import PollerBehaviorSimpleMaximum, WorkerConfig

# ---- Lambda-tuned worker defaults ----
# Conservative concurrency limits suited to Lambda's resource constraints.

DEFAULT_MAX_CONCURRENT_ACTIVITIES: int = 2
DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS: int = 10
DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES: int = 2
DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5)
DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2)
DEFAULT_MAX_CACHED_WORKFLOWS: int = 30

DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2)
DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1)
DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1)

# ---- Environment variable names ----
ENV_TASK_QUEUE = "TEMPORAL_TASK_QUEUE"
ENV_LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT"
ENV_CONFIG_FILE = "TEMPORAL_CONFIG_FILE"
DEFAULT_CONFIG_FILE = "temporal.toml"


def apply_lambda_worker_defaults(config: WorkerConfig) -> None:
"""Apply Lambda-appropriate defaults to worker config.

Only sets values that have not already been set (i.e. are absent from *config*).
``disable_eager_activity_execution`` is always set to ``True``.
"""
config.setdefault("max_concurrent_activities", DEFAULT_MAX_CONCURRENT_ACTIVITIES)
config.setdefault(
"max_concurrent_workflow_tasks", DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS
)
config.setdefault(
"max_concurrent_local_activities", DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES
)
config.setdefault("max_concurrent_nexus_tasks", DEFAULT_MAX_CONCURRENT_NEXUS_TASKS)
config.setdefault("graceful_shutdown_timeout", DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT)
config.setdefault("max_cached_workflows", DEFAULT_MAX_CACHED_WORKFLOWS)
config.setdefault(
"workflow_task_poller_behavior", DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR
)
config.setdefault(
"activity_task_poller_behavior", DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR
)
config.setdefault("nexus_task_poller_behavior", DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR)
# Always disable eager activities in Lambda.
config["disable_eager_activity_execution"] = True


def build_lambda_identity(request_id: str, function_arn: str) -> str:
"""Build a worker identity string from the Lambda invocation context.

Format: ``<request_id>@<function_arn>``.
"""
return f"{request_id or 'unknown'}@{function_arn or 'unknown'}"


def lambda_default_config_file_path(
getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment]
) -> Path:
"""Return the config file path for a Lambda environment.

Resolution order:

1. ``TEMPORAL_CONFIG_FILE`` env var, if set.
2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``).
3. ``temporal.toml`` in the current working directory.
"""
config_file = getenv(ENV_CONFIG_FILE)
if config_file:
return Path(config_file)
root = getenv(ENV_LAMBDA_TASK_ROOT) or "."
return Path(root) / DEFAULT_CONFIG_FILE
Loading
Loading