Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ Some things to note about external storage:
* Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
* External storage applies transparently to all payloads, whether they are workflow inputs/outputs, activity inputs/outputs, signal inputs, query outputs, update inputs/outputs, or failure details.
* The `DataConverter`'s `payload_codec` (if configured) is applied to the payload *before* it is handed to the storage driver, so the driver always stores encoded bytes. The reference payload written to workflow history is not encoded by the `DataConverter` codec.
* Setting `ExternalStorage.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size.
* Setting `ExternalStorage.payload_size_threshold` to `0` causes every payload to be considered for external storage regardless of size.

###### Driver Selection

Expand Down
2 changes: 1 addition & 1 deletion temporalio/contrib/aws/s3driver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Payloads are stored under content-addressable keys derived from a SHA-256 hash o
* Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them.
* The target S3 bucket must already exist; the driver will not create it.
* Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers.
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size.
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `0` to offload every payload regardless of size.
* `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB.
* Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list.

Expand Down
17 changes: 9 additions & 8 deletions temporalio/converter/_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ class ExternalStorage(WithSerializationContext):
one driver is registered, that driver is used for all store operations.
"""

payload_size_threshold: int | None = 256 * 1024
payload_size_threshold: int = 256 * 1024
"""Minimum payload size in bytes before external storage is considered.
Defaults to 256 KiB. Set to ``None`` to consider every payload for
external storage regardless of size.
Defaults to 256 KiB. Must be greater than or equal to zero.
"""

_driver_map: dict[str, StorageDriver] = dataclasses.field(
Expand All @@ -234,14 +233,19 @@ class ExternalStorage(WithSerializationContext):
def __post_init__(self) -> None:
"""Validate drivers and build the internal name-keyed driver map.

Raises :exc:`ValueError` if no drivers are provided, if more than one
Raises :exc:`ValueError` if no drivers are provided, if
:attr:`payload_size_threshold` is less than zero, if more than one
driver is registered without a :attr:`driver_selector`, or if any two
drivers share the same name.
"""
if not self.drivers:
raise ValueError(
"ExternalStorage.drivers must contain at least one driver."
)
if self.payload_size_threshold < 0:
raise ValueError(
"ExternalStorage.payload_size_threshold must be greater than or equal to zero."
)
if len(self.drivers) > 1 and self.driver_selector is None:
raise ValueError(
"ExternalStorage.driver_selector must be specified if multiple drivers are registered."
Expand All @@ -267,10 +271,7 @@ def _select_driver(
self, context: StorageDriverStoreContext, payload: Payload
) -> StorageDriver | None:
"""Returns the driver to use for this payload, or None to pass through."""
if (
self.payload_size_threshold is not None
and payload.ByteSize() < self.payload_size_threshold
):
if payload.ByteSize() < self.payload_size_threshold:
return None
selector = self.driver_selector
if selector is None:
Expand Down
7 changes: 5 additions & 2 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,12 @@ def __init__(
See https://docs.temporal.io/troubleshooting/blob-size-limit-error for more
details.
max_workflow_task_external_storage_concurrency: Maximum number of
external storage I/O operations (store/retrieve) that may run
external storage payload operations (store/retrieve) that may run
concurrently within a single workflow task activation.
Defaults to 10. WARNING: This setting is experimental.
Defaults to 3. Adjust this value based on your workload's needs.
Please report any issues you encounter with this setting or if you
feel the default should be changed.
WARNING: This setting is experimental.

"""
config = WorkerConfig(
Expand Down
8 changes: 7 additions & 1 deletion temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@
# Set to true to log all activations and completions
LOG_PROTOS = False

_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 10
# Value was chosen abitrarily as a small number that allows some concurrency and prevents
# large numbers of concurrent external storage operations causing resource contention.
# This default limit is per workflow task activation and does not limit the total number
# of concurrent external storage operations across all workflow task activations.
# Advise customers to adjust based on their workload needs and to report issues with the
# value if problems are encountered. This setting is experimental.
_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 3


class _WorkflowWorker: # type:ignore[reportUnusedClass]
Expand Down
21 changes: 18 additions & 3 deletions tests/test_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async def store(
external_storage=ExternalStorage(
drivers=drivers,
driver_selector=lambda ctx, p: next(drivers_iter),
payload_size_threshold=None,
payload_size_threshold=0,
)
)

Expand Down Expand Up @@ -337,7 +337,7 @@ async def retrieve(
external_storage=ExternalStorage(
drivers=drivers,
driver_selector=lambda ctx, p: next(drivers_iter),
payload_size_threshold=None,
payload_size_threshold=0,
)
)
encoded = await converter.encode(["payload_a", "payload_b"])
Expand Down Expand Up @@ -631,7 +631,7 @@ def selector(_ctx: object, payload: Payload) -> StorageDriver:
external_storage=ExternalStorage(
drivers=[driver_a, driver_b],
driver_selector=selector,
payload_size_threshold=None,
payload_size_threshold=0,
)
)

Expand Down Expand Up @@ -678,6 +678,21 @@ def test_duplicate_driver_names_raises(self):
payload_size_threshold=50,
)

@pytest.mark.parametrize("threshold", [-1, -1000])
def test_negative_payload_size_threshold_raises(self, threshold: int):
"""A negative payload_size_threshold raises ValueError immediately
when constructing ExternalStorage."""
driver = InMemoryTestDriver()

with pytest.raises(
ValueError,
match=r"^ExternalStorage\.payload_size_threshold must be greater than or equal to zero\.$",
):
ExternalStorage(
drivers=[driver],
payload_size_threshold=threshold,
)


if __name__ == "__main__":
pytest.main([__file__, "-v"])
2 changes: 1 addition & 1 deletion tests/test_serialization_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,7 @@ async def test_child_workflow_external_storage_with_context(client: Client):
DataConverter.default,
external_storage=ExternalStorage(
drivers=[driver],
payload_size_threshold=None,
payload_size_threshold=0,
),
)
client = Client(**config)
Expand Down
2 changes: 1 addition & 1 deletion tests/worker/test_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def __init__(self, driver_name: str):
external_storage=ExternalStorage(
drivers=[driver1, driver2, driver3],
driver_selector=lambda _context, _payload: driver1,
payload_size_threshold=None,
payload_size_threshold=0,
),
),
)
Expand Down
Loading