From b7669e9716ae762fd60e2ca899636108f719cc41 Mon Sep 17 00:00:00 2001 From: Chris Qiu Date: Sun, 10 May 2026 16:46:14 +0800 Subject: [PATCH 1/3] Fix silent per-char iteration when DoFn returns str/bytes/dict --- CHANGES.md | 1 + sdks/python/apache_beam/runners/common.pxd | 1 + sdks/python/apache_beam/runners/common.py | 16 ++++++++ .../python/apache_beam/runners/common_test.py | 37 +++++++++++++++++++ 4 files changed, 55 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 0db7fddba4f1..c96fa3a11c93 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). * Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)). +* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a clear `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)). ## Security Fixes diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index a9c1b91d9d54..93c9e83875ec 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -150,6 +150,7 @@ cdef class _OutputHandler(OutputHandler): cdef object output_batch_converter cdef bint _process_batch_yields_elements cdef bint _process_yields_batches + cdef bint _check_user_dofn_output @cython.locals(windowed_value=WindowedValue, windowed_batch=WindowedBatch, diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index c22072dbf8b9..37c1ef964294 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1475,6 +1475,7 @@ def __init__( do_fn_signature.process_batch_method.method_value, '_beam_yields_elements', False), + check_user_dofn_output=not isinstance(fn, core.CallableWrapperDoFn), ) if do_fn_signature.is_stateful_dofn() and not user_state_context: @@ -1633,6 +1634,7 @@ def __init__( output_batch_converter, # type: Optional[BatchConverter] process_yields_batches, # type: bool process_batch_yields_elements, # type: bool + check_user_dofn_output=False, # type: bool ): """Initializes ``_OutputHandler``. @@ -1642,6 +1644,12 @@ def __init__( tagged_receivers: main receiver object. per_element_output_counter: per_element_output_counter of one work_item. could be none if experimental flag turn off + check_user_dofn_output: if True, validate that a user-class DoFn does not + return a str/bytes/dict (a common bug — see + https://github.com/apache/beam/issues/18712). + Skipped for callable-wrapped DoFns (Map/FlatMap) + where iterating a returned str/bytes/dict is a + legitimate flatten use case. """ self.window_fn = window_fn self.main_receivers = main_receivers @@ -1654,6 +1662,7 @@ def __init__( self.output_batch_converter = output_batch_converter self._process_yields_batches = process_yields_batches self._process_batch_yields_elements = process_batch_yields_elements + self._check_user_dofn_output = check_user_dofn_output def handle_process_outputs( self, windowed_input_element, results, watermark_estimator=None): @@ -1667,6 +1676,13 @@ def handle_process_outputs( if results is None: results = [] + if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)): + object_type = type(results).__name__ + raise TypeError( + 'Returning a %s from a ParDo or FlatMap is discouraged. ' + 'Please use list("%s") if you really want this behavior.' % + (object_type, results)) + # TODO(https://github.com/apache/beam/issues/20404): Verify that the # results object is a valid iterable type if # performance_runtime_type_check is active, without harming performance diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index cc4e8218e8af..9d03192783a0 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -154,6 +154,43 @@ def process(self, element, mykey=DoFn.KeyParam): test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2])) (p | test_stream | beam.ParDo(DoFnProcessWithKeyparam())) + def test_dofn_returning_str_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712. + + A DoFn returning a str instead of an iterable wrapping one used to + silently iterate per-character. It should now raise a clear TypeError. + """ + class BadDoFn(DoFn): + def process(self, element): + return 'hello' + + with self.assertRaisesRegex( + Exception, 'Returning a str from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + + def test_dofn_returning_bytes_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712.""" + class BadDoFn(DoFn): + def process(self, element): + return b'hello' + + with self.assertRaisesRegex( + Exception, 'Returning a bytes from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + + def test_dofn_returning_dict_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712.""" + class BadDoFn(DoFn): + def process(self, element): + return {'k': 'v'} + + with self.assertRaisesRegex( + Exception, 'Returning a dict from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + def test_pardo_with_unbounded_per_element_dofn(self): class UnboundedDoFn(beam.DoFn): @beam.DoFn.unbounded_per_element() From bab7af76669cb853ed4e98c5410d0255a47861ce Mon Sep 17 00:00:00 2001 From: Chris Qiu Date: Sun, 10 May 2026 23:55:38 +0800 Subject: [PATCH 2/3] Use %r in error message for correct repr of dict/bytes --- sdks/python/apache_beam/runners/common.py | 2 +- sdks/python/apache_beam/runners/common_test.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 37c1ef964294..d1688e9182dc 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1680,7 +1680,7 @@ def handle_process_outputs( object_type = type(results).__name__ raise TypeError( 'Returning a %s from a ParDo or FlatMap is discouraged. ' - 'Please use list("%s") if you really want this behavior.' % + 'Please use list(%r) if you really want this behavior.' % (object_type, results)) # TODO(https://github.com/apache/beam/issues/20404): Verify that the diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index 9d03192783a0..6470c3a52b32 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -164,6 +164,10 @@ class BadDoFn(DoFn): def process(self, element): return 'hello' + # Use base Exception (matching existing convention in + # typecheck_test.py::test_do_fn_returning_non_iterable_throws_error) + # because the runner's _reraise_augmented wraps the TypeError before + # it surfaces to the test framework. with self.assertRaisesRegex( Exception, 'Returning a str from a ParDo or FlatMap is discouraged'): with TestPipeline() as p: From c1dea34df35bdaff2dc6ce8b18afd6bdbff389b8 Mon Sep 17 00:00:00 2001 From: Chris Qiu Date: Sun, 24 May 2026 15:52:30 +0800 Subject: [PATCH 3/3] Address review: 'is not allowed' wording, move entry to Breaking Changes --- CHANGES.md | 2 +- sdks/python/apache_beam/runners/common.py | 2 +- sdks/python/apache_beam/runners/common_test.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2af67fb5adaa..98364497411d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -87,6 +87,7 @@ ## Breaking Changes * (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275)) +* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)). * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations @@ -99,7 +100,6 @@ * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). * Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)). -* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a clear `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)). * Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)). ## Security Fixes diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d1688e9182dc..96687c90cb8e 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1679,7 +1679,7 @@ def handle_process_outputs( if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)): object_type = type(results).__name__ raise TypeError( - 'Returning a %s from a ParDo or FlatMap is discouraged. ' + 'Returning a %s from a ParDo or FlatMap is not allowed. ' 'Please use list(%r) if you really want this behavior.' % (object_type, results)) diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index 6470c3a52b32..ce043611ec85 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -169,7 +169,7 @@ def process(self, element): # because the runner's _reraise_augmented wraps the TypeError before # it surfaces to the test framework. with self.assertRaisesRegex( - Exception, 'Returning a str from a ParDo or FlatMap is discouraged'): + Exception, 'Returning a str from a ParDo or FlatMap is not allowed'): with TestPipeline() as p: _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) @@ -180,7 +180,7 @@ def process(self, element): return b'hello' with self.assertRaisesRegex( - Exception, 'Returning a bytes from a ParDo or FlatMap is discouraged'): + Exception, 'Returning a bytes from a ParDo or FlatMap is not allowed'): with TestPipeline() as p: _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) @@ -191,7 +191,7 @@ def process(self, element): return {'k': 'v'} with self.assertRaisesRegex( - Exception, 'Returning a dict from a ParDo or FlatMap is discouraged'): + Exception, 'Returning a dict from a ParDo or FlatMap is not allowed'): with TestPipeline() as p: _ = p | beam.Create([0]) | beam.ParDo(BadDoFn())