From 26a4337f0759b443fb3d5b1e5dcaec01a0c3e702 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Wed, 1 Apr 2026 09:10:29 -0700 Subject: [PATCH 1/5] Validate that payload_size_threshold is greater than zero --- temporalio/converter/_extstore.py | 11 ++++++++--- tests/test_extstore.py | 21 +++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/temporalio/converter/_extstore.py b/temporalio/converter/_extstore.py index 078d36d98..a35b6419f 100644 --- a/temporalio/converter/_extstore.py +++ b/temporalio/converter/_extstore.py @@ -234,14 +234,19 @@ class ExternalStorage(WithSerializationContext): def __post_init__(self) -> None: """Validate drivers and build the internal name-keyed driver map. - Raises :exc:`ValueError` if no drivers are provided, if more than one - driver is registered without a :attr:`driver_selector`, or if any two - drivers share the same name. + Raises :exc:`ValueError` if no drivers are provided, if + :attr:`payload_size_threshold` is zero or less than zero, if more + than one driver is registered without a :attr:`driver_selector`, or if + any two drivers share the same name. """ if not self.drivers: raise ValueError( "ExternalStorage.drivers must contain at least one driver." ) + if self.payload_size_threshold is not None and self.payload_size_threshold <= 0: + raise ValueError( + "ExternalStorage.payload_size_threshold must be greater than zero." + ) if len(self.drivers) > 1 and self.driver_selector is None: raise ValueError( "ExternalStorage.driver_selector must be specified if multiple drivers are registered." diff --git a/tests/test_extstore.py b/tests/test_extstore.py index 7cff620ba..017cd3361 100644 --- a/tests/test_extstore.py +++ b/tests/test_extstore.py @@ -678,6 +678,27 @@ def test_duplicate_driver_names_raises(self): payload_size_threshold=50, ) + @pytest.mark.parametrize("threshold", [0, -1, -1000]) + def test_non_positive_payload_size_threshold_raises(self, threshold: int): + """A payload_size_threshold of zero or negative raises ValueError + immediately when constructing ExternalStorage.""" + driver = InMemoryTestDriver() + + with pytest.raises( + ValueError, + match=r"^ExternalStorage\.payload_size_threshold must be greater than zero\.$", + ): + ExternalStorage( + drivers=[driver], + payload_size_threshold=threshold, + ) + + def test_none_payload_size_threshold_is_allowed(self): + """payload_size_threshold=None is valid and should not raise.""" + driver = InMemoryTestDriver() + ext = ExternalStorage(drivers=[driver], payload_size_threshold=None) + assert ext.payload_size_threshold is None + if __name__ == "__main__": pytest.main([__file__, "-v"]) From 45066cefd86d9bee3d306d53d3da9c0281289a01 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Wed, 1 Apr 2026 12:36:26 -0700 Subject: [PATCH 2/5] Change setting payload_size_threshold to None means default value --- temporalio/converter/_extstore.py | 17 ++++++++++------- tests/test_extstore.py | 6 +++--- tests/test_serialization_context.py | 2 +- tests/worker/test_extstore.py | 2 +- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/temporalio/converter/_extstore.py b/temporalio/converter/_extstore.py index a35b6419f..fe9ea1b5b 100644 --- a/temporalio/converter/_extstore.py +++ b/temporalio/converter/_extstore.py @@ -27,6 +27,7 @@ _T = TypeVar("_T") _REFERENCE_ENCODING = b"json/external-storage-reference" +_DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024 @dataclass @@ -210,10 +211,10 @@ class ExternalStorage(WithSerializationContext): one driver is registered, that driver is used for all store operations. """ - payload_size_threshold: int | None = 256 * 1024 + payload_size_threshold: int | None = _DEFAULT_PAYLOAD_SIZE_THRESHOLD """Minimum payload size in bytes before external storage is considered. - Defaults to 256 KiB. Set to ``None`` to consider every payload for - external storage regardless of size. + Defaults to 256 KiB. Must be greater than zero when set. Set to ``1`` to consider + every payload for external storage regardless of size. """ _driver_map: dict[str, StorageDriver] = dataclasses.field( @@ -272,10 +273,12 @@ def _select_driver( self, context: StorageDriverStoreContext, payload: Payload ) -> StorageDriver | None: """Returns the driver to use for this payload, or None to pass through.""" - if ( - self.payload_size_threshold is not None - and payload.ByteSize() < self.payload_size_threshold - ): + threshold = ( + self.payload_size_threshold + if self.payload_size_threshold is not None + else _DEFAULT_PAYLOAD_SIZE_THRESHOLD + ) + if payload.ByteSize() < threshold: return None selector = self.driver_selector if selector is None: diff --git a/tests/test_extstore.py b/tests/test_extstore.py index 017cd3361..ce8431436 100644 --- a/tests/test_extstore.py +++ b/tests/test_extstore.py @@ -283,7 +283,7 @@ async def store( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=None, + payload_size_threshold=1, ) ) @@ -337,7 +337,7 @@ async def retrieve( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=None, + payload_size_threshold=1, ) ) encoded = await converter.encode(["payload_a", "payload_b"]) @@ -631,7 +631,7 @@ def selector(_ctx: object, payload: Payload) -> StorageDriver: external_storage=ExternalStorage( drivers=[driver_a, driver_b], driver_selector=selector, - payload_size_threshold=None, + payload_size_threshold=1, ) ) diff --git a/tests/test_serialization_context.py b/tests/test_serialization_context.py index ad3768ec8..bb6702074 100644 --- a/tests/test_serialization_context.py +++ b/tests/test_serialization_context.py @@ -1972,7 +1972,7 @@ async def test_child_workflow_external_storage_with_context(client: Client): DataConverter.default, external_storage=ExternalStorage( drivers=[driver], - payload_size_threshold=None, + payload_size_threshold=1, ), ) client = Client(**config) diff --git a/tests/worker/test_extstore.py b/tests/worker/test_extstore.py index eb4270d08..9b08039c0 100644 --- a/tests/worker/test_extstore.py +++ b/tests/worker/test_extstore.py @@ -581,7 +581,7 @@ def __init__(self, driver_name: str): external_storage=ExternalStorage( drivers=[driver1, driver2, driver3], driver_selector=lambda _context, _payload: driver1, - payload_size_threshold=None, + payload_size_threshold=1, ), ), ) From 9157a8f88f722ca5993e6110f80d65645a4ceeb6 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Wed, 1 Apr 2026 12:47:49 -0700 Subject: [PATCH 3/5] Readjust storage concurrency with lower default --- temporalio/worker/_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 914b14370..818950445 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -47,7 +47,7 @@ # Set to true to log all activations and completions LOG_PROTOS = False -_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 10 +_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 3 class _WorkflowWorker: # type:ignore[reportUnusedClass] From b23ea1a7fe3eb7dee9e337c372aafefc7e1270f2 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:29:20 -0700 Subject: [PATCH 4/5] Allow 0, remove None, more comments on max concurrency --- README.md | 2 +- temporalio/contrib/aws/s3driver/README.md | 2 +- temporalio/converter/_extstore.py | 23 ++++++++--------------- temporalio/worker/_worker.py | 3 ++- tests/test_extstore.py | 22 ++++++++-------------- tests/test_serialization_context.py | 2 +- tests/worker/test_extstore.py | 2 +- 7 files changed, 22 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index ca8a000f6..6fcd6fecb 100644 --- a/README.md +++ b/README.md @@ -503,7 +503,7 @@ Some things to note about external storage: * Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal. * External storage applies transparently to all payloads, whether they are workflow inputs/outputs, activity inputs/outputs, signal inputs, query outputs, update inputs/outputs, or failure details. * The `DataConverter`'s `payload_codec` (if configured) is applied to the payload *before* it is handed to the storage driver, so the driver always stores encoded bytes. The reference payload written to workflow history is not encoded by the `DataConverter` codec. -* Setting `ExternalStorage.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size. +* Setting `ExternalStorage.payload_size_threshold` to `0` causes every payload to be considered for external storage regardless of size. ###### Driver Selection diff --git a/temporalio/contrib/aws/s3driver/README.md b/temporalio/contrib/aws/s3driver/README.md index 8e6a3e365..c9520a688 100644 --- a/temporalio/contrib/aws/s3driver/README.md +++ b/temporalio/contrib/aws/s3driver/README.md @@ -66,7 +66,7 @@ Payloads are stored under content-addressable keys derived from a SHA-256 hash o * Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them. * The target S3 bucket must already exist; the driver will not create it. * Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers. -* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size. +* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `0` to offload every payload regardless of size. * `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB. * Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list. diff --git a/temporalio/converter/_extstore.py b/temporalio/converter/_extstore.py index fe9ea1b5b..28fad00a4 100644 --- a/temporalio/converter/_extstore.py +++ b/temporalio/converter/_extstore.py @@ -27,7 +27,6 @@ _T = TypeVar("_T") _REFERENCE_ENCODING = b"json/external-storage-reference" -_DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024 @dataclass @@ -211,10 +210,9 @@ class ExternalStorage(WithSerializationContext): one driver is registered, that driver is used for all store operations. """ - payload_size_threshold: int | None = _DEFAULT_PAYLOAD_SIZE_THRESHOLD + payload_size_threshold: int = 256 * 1024 """Minimum payload size in bytes before external storage is considered. - Defaults to 256 KiB. Must be greater than zero when set. Set to ``1`` to consider - every payload for external storage regardless of size. + Defaults to 256 KiB. Must be greater than or equal to zero. """ _driver_map: dict[str, StorageDriver] = dataclasses.field( @@ -236,17 +234,17 @@ def __post_init__(self) -> None: """Validate drivers and build the internal name-keyed driver map. Raises :exc:`ValueError` if no drivers are provided, if - :attr:`payload_size_threshold` is zero or less than zero, if more - than one driver is registered without a :attr:`driver_selector`, or if - any two drivers share the same name. + :attr:`payload_size_threshold` is less than zero, if more than one + driver is registered without a :attr:`driver_selector`, or if any two + drivers share the same name. """ if not self.drivers: raise ValueError( "ExternalStorage.drivers must contain at least one driver." ) - if self.payload_size_threshold is not None and self.payload_size_threshold <= 0: + if self.payload_size_threshold < 0: raise ValueError( - "ExternalStorage.payload_size_threshold must be greater than zero." + "ExternalStorage.payload_size_threshold must be greater than or equal to zero." ) if len(self.drivers) > 1 and self.driver_selector is None: raise ValueError( @@ -273,12 +271,7 @@ def _select_driver( self, context: StorageDriverStoreContext, payload: Payload ) -> StorageDriver | None: """Returns the driver to use for this payload, or None to pass through.""" - threshold = ( - self.payload_size_threshold - if self.payload_size_threshold is not None - else _DEFAULT_PAYLOAD_SIZE_THRESHOLD - ) - if payload.ByteSize() < threshold: + if payload.ByteSize() < self.payload_size_threshold: return None selector = self.driver_selector if selector is None: diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 9057e1449..eb27ce429 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -323,7 +323,8 @@ def __init__( max_workflow_task_external_storage_concurrency: Maximum number of external storage I/O operations (store/retrieve) that may run concurrently within a single workflow task activation. - Defaults to 10. WARNING: This setting is experimental. + Defaults to 3. Adjust this value based on your workload's needs. + WARNING: This setting is experimental. """ config = WorkerConfig( diff --git a/tests/test_extstore.py b/tests/test_extstore.py index ce8431436..1771778a7 100644 --- a/tests/test_extstore.py +++ b/tests/test_extstore.py @@ -283,7 +283,7 @@ async def store( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=1, + payload_size_threshold=0, ) ) @@ -337,7 +337,7 @@ async def retrieve( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=1, + payload_size_threshold=0, ) ) encoded = await converter.encode(["payload_a", "payload_b"]) @@ -631,7 +631,7 @@ def selector(_ctx: object, payload: Payload) -> StorageDriver: external_storage=ExternalStorage( drivers=[driver_a, driver_b], driver_selector=selector, - payload_size_threshold=1, + payload_size_threshold=0, ) ) @@ -678,27 +678,21 @@ def test_duplicate_driver_names_raises(self): payload_size_threshold=50, ) - @pytest.mark.parametrize("threshold", [0, -1, -1000]) - def test_non_positive_payload_size_threshold_raises(self, threshold: int): - """A payload_size_threshold of zero or negative raises ValueError - immediately when constructing ExternalStorage.""" + @pytest.mark.parametrize("threshold", [-1, -1000]) + def test_negative_payload_size_threshold_raises(self, threshold: int): + """A negative payload_size_threshold raises ValueError immediately + when constructing ExternalStorage.""" driver = InMemoryTestDriver() with pytest.raises( ValueError, - match=r"^ExternalStorage\.payload_size_threshold must be greater than zero\.$", + match=r"^ExternalStorage\.payload_size_threshold must be greater than or equal to zero\.$", ): ExternalStorage( drivers=[driver], payload_size_threshold=threshold, ) - def test_none_payload_size_threshold_is_allowed(self): - """payload_size_threshold=None is valid and should not raise.""" - driver = InMemoryTestDriver() - ext = ExternalStorage(drivers=[driver], payload_size_threshold=None) - assert ext.payload_size_threshold is None - if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/test_serialization_context.py b/tests/test_serialization_context.py index bb6702074..580926b4b 100644 --- a/tests/test_serialization_context.py +++ b/tests/test_serialization_context.py @@ -1972,7 +1972,7 @@ async def test_child_workflow_external_storage_with_context(client: Client): DataConverter.default, external_storage=ExternalStorage( drivers=[driver], - payload_size_threshold=1, + payload_size_threshold=0, ), ) client = Client(**config) diff --git a/tests/worker/test_extstore.py b/tests/worker/test_extstore.py index 9b08039c0..8e2ee763a 100644 --- a/tests/worker/test_extstore.py +++ b/tests/worker/test_extstore.py @@ -581,7 +581,7 @@ def __init__(self, driver_name: str): external_storage=ExternalStorage( drivers=[driver1, driver2, driver3], driver_selector=lambda _context, _payload: driver1, - payload_size_threshold=1, + payload_size_threshold=0, ), ), ) From 6c62087df833e0338d7f9e43c87170284885052a Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Wed, 1 Apr 2026 19:20:55 -0700 Subject: [PATCH 5/5] Elaborate more on the concurrency setting --- temporalio/worker/_worker.py | 4 +++- temporalio/worker/_workflow.py | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index eb27ce429..332e2ead7 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -321,9 +321,11 @@ def __init__( See https://docs.temporal.io/troubleshooting/blob-size-limit-error for more details. max_workflow_task_external_storage_concurrency: Maximum number of - external storage I/O operations (store/retrieve) that may run + external storage payload operations (store/retrieve) that may run concurrently within a single workflow task activation. Defaults to 3. Adjust this value based on your workload's needs. + Please report any issues you encounter with this setting or if you + feel the default should be changed. WARNING: This setting is experimental. """ diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 818950445..fb104b414 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -47,6 +47,12 @@ # Set to true to log all activations and completions LOG_PROTOS = False +# Value was chosen abitrarily as a small number that allows some concurrency and prevents +# large numbers of concurrent external storage operations causing resource contention. +# This default limit is per workflow task activation and does not limit the total number +# of concurrent external storage operations across all workflow task activations. +# Advise customers to adjust based on their workload needs and to report issues with the +# value if problems are encountered. This setting is experimental. _DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 3