diff --git a/README.md b/README.md index ca8a000f6..6fcd6fecb 100644 --- a/README.md +++ b/README.md @@ -503,7 +503,7 @@ Some things to note about external storage: * Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal. * External storage applies transparently to all payloads, whether they are workflow inputs/outputs, activity inputs/outputs, signal inputs, query outputs, update inputs/outputs, or failure details. * The `DataConverter`'s `payload_codec` (if configured) is applied to the payload *before* it is handed to the storage driver, so the driver always stores encoded bytes. The reference payload written to workflow history is not encoded by the `DataConverter` codec. -* Setting `ExternalStorage.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size. +* Setting `ExternalStorage.payload_size_threshold` to `0` causes every payload to be considered for external storage regardless of size. ###### Driver Selection diff --git a/temporalio/contrib/aws/s3driver/README.md b/temporalio/contrib/aws/s3driver/README.md index 8e6a3e365..c9520a688 100644 --- a/temporalio/contrib/aws/s3driver/README.md +++ b/temporalio/contrib/aws/s3driver/README.md @@ -66,7 +66,7 @@ Payloads are stored under content-addressable keys derived from a SHA-256 hash o * Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them. * The target S3 bucket must already exist; the driver will not create it. * Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers. -* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size. +* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `0` to offload every payload regardless of size. * `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB. * Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list. diff --git a/temporalio/converter/_extstore.py b/temporalio/converter/_extstore.py index 078d36d98..28fad00a4 100644 --- a/temporalio/converter/_extstore.py +++ b/temporalio/converter/_extstore.py @@ -210,10 +210,9 @@ class ExternalStorage(WithSerializationContext): one driver is registered, that driver is used for all store operations. """ - payload_size_threshold: int | None = 256 * 1024 + payload_size_threshold: int = 256 * 1024 """Minimum payload size in bytes before external storage is considered. - Defaults to 256 KiB. Set to ``None`` to consider every payload for - external storage regardless of size. + Defaults to 256 KiB. Must be greater than or equal to zero. """ _driver_map: dict[str, StorageDriver] = dataclasses.field( @@ -234,7 +233,8 @@ class ExternalStorage(WithSerializationContext): def __post_init__(self) -> None: """Validate drivers and build the internal name-keyed driver map. - Raises :exc:`ValueError` if no drivers are provided, if more than one + Raises :exc:`ValueError` if no drivers are provided, if + :attr:`payload_size_threshold` is less than zero, if more than one driver is registered without a :attr:`driver_selector`, or if any two drivers share the same name. """ @@ -242,6 +242,10 @@ def __post_init__(self) -> None: raise ValueError( "ExternalStorage.drivers must contain at least one driver." ) + if self.payload_size_threshold < 0: + raise ValueError( + "ExternalStorage.payload_size_threshold must be greater than or equal to zero." + ) if len(self.drivers) > 1 and self.driver_selector is None: raise ValueError( "ExternalStorage.driver_selector must be specified if multiple drivers are registered." @@ -267,10 +271,7 @@ def _select_driver( self, context: StorageDriverStoreContext, payload: Payload ) -> StorageDriver | None: """Returns the driver to use for this payload, or None to pass through.""" - if ( - self.payload_size_threshold is not None - and payload.ByteSize() < self.payload_size_threshold - ): + if payload.ByteSize() < self.payload_size_threshold: return None selector = self.driver_selector if selector is None: diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 9057e1449..332e2ead7 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -321,9 +321,12 @@ def __init__( See https://docs.temporal.io/troubleshooting/blob-size-limit-error for more details. max_workflow_task_external_storage_concurrency: Maximum number of - external storage I/O operations (store/retrieve) that may run + external storage payload operations (store/retrieve) that may run concurrently within a single workflow task activation. - Defaults to 10. WARNING: This setting is experimental. + Defaults to 3. Adjust this value based on your workload's needs. + Please report any issues you encounter with this setting or if you + feel the default should be changed. + WARNING: This setting is experimental. """ config = WorkerConfig( diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 914b14370..fb104b414 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -47,7 +47,13 @@ # Set to true to log all activations and completions LOG_PROTOS = False -_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 10 +# Value was chosen abitrarily as a small number that allows some concurrency and prevents +# large numbers of concurrent external storage operations causing resource contention. +# This default limit is per workflow task activation and does not limit the total number +# of concurrent external storage operations across all workflow task activations. +# Advise customers to adjust based on their workload needs and to report issues with the +# value if problems are encountered. This setting is experimental. +_DEFAULT_WORKFLOW_TASK_EXTERNAL_STORAGE_CONCURRENCY: int = 3 class _WorkflowWorker: # type:ignore[reportUnusedClass] diff --git a/tests/test_extstore.py b/tests/test_extstore.py index 7cff620ba..1771778a7 100644 --- a/tests/test_extstore.py +++ b/tests/test_extstore.py @@ -283,7 +283,7 @@ async def store( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=None, + payload_size_threshold=0, ) ) @@ -337,7 +337,7 @@ async def retrieve( external_storage=ExternalStorage( drivers=drivers, driver_selector=lambda ctx, p: next(drivers_iter), - payload_size_threshold=None, + payload_size_threshold=0, ) ) encoded = await converter.encode(["payload_a", "payload_b"]) @@ -631,7 +631,7 @@ def selector(_ctx: object, payload: Payload) -> StorageDriver: external_storage=ExternalStorage( drivers=[driver_a, driver_b], driver_selector=selector, - payload_size_threshold=None, + payload_size_threshold=0, ) ) @@ -678,6 +678,21 @@ def test_duplicate_driver_names_raises(self): payload_size_threshold=50, ) + @pytest.mark.parametrize("threshold", [-1, -1000]) + def test_negative_payload_size_threshold_raises(self, threshold: int): + """A negative payload_size_threshold raises ValueError immediately + when constructing ExternalStorage.""" + driver = InMemoryTestDriver() + + with pytest.raises( + ValueError, + match=r"^ExternalStorage\.payload_size_threshold must be greater than or equal to zero\.$", + ): + ExternalStorage( + drivers=[driver], + payload_size_threshold=threshold, + ) + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/test_serialization_context.py b/tests/test_serialization_context.py index ad3768ec8..580926b4b 100644 --- a/tests/test_serialization_context.py +++ b/tests/test_serialization_context.py @@ -1972,7 +1972,7 @@ async def test_child_workflow_external_storage_with_context(client: Client): DataConverter.default, external_storage=ExternalStorage( drivers=[driver], - payload_size_threshold=None, + payload_size_threshold=0, ), ) client = Client(**config) diff --git a/tests/worker/test_extstore.py b/tests/worker/test_extstore.py index eb4270d08..8e2ee763a 100644 --- a/tests/worker/test_extstore.py +++ b/tests/worker/test_extstore.py @@ -581,7 +581,7 @@ def __init__(self, driver_name: str): external_storage=ExternalStorage( drivers=[driver1, driver2, driver3], driver_selector=lambda _context, _payload: driver1, - payload_size_threshold=None, + payload_size_threshold=0, ), ), )