Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
## Breaking Changes

* (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275))
* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)).
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ cdef class _OutputHandler(OutputHandler):
cdef object output_batch_converter
cdef bint _process_batch_yields_elements
cdef bint _process_yields_batches
cdef bint _check_user_dofn_output

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ def __init__(
do_fn_signature.process_batch_method.method_value,
'_beam_yields_elements',
False),
check_user_dofn_output=not isinstance(fn, core.CallableWrapperDoFn),
)

if do_fn_signature.is_stateful_dofn() and not user_state_context:
Expand Down Expand Up @@ -1633,6 +1634,7 @@ def __init__(
output_batch_converter, # type: Optional[BatchConverter]
process_yields_batches, # type: bool
process_batch_yields_elements, # type: bool
check_user_dofn_output=False, # type: bool
):
"""Initializes ``_OutputHandler``.

Expand All @@ -1642,6 +1644,12 @@ def __init__(
tagged_receivers: main receiver object.
per_element_output_counter: per_element_output_counter of one work_item.
could be none if experimental flag turn off
check_user_dofn_output: if True, validate that a user-class DoFn does not
return a str/bytes/dict (a common bug — see
https://github.com/apache/beam/issues/18712).
Skipped for callable-wrapped DoFns (Map/FlatMap)
where iterating a returned str/bytes/dict is a
legitimate flatten use case.
"""
self.window_fn = window_fn
self.main_receivers = main_receivers
Expand All @@ -1654,6 +1662,7 @@ def __init__(
self.output_batch_converter = output_batch_converter
self._process_yields_batches = process_yields_batches
self._process_batch_yields_elements = process_batch_yields_elements
self._check_user_dofn_output = check_user_dofn_output

def handle_process_outputs(
self, windowed_input_element, results, watermark_estimator=None):
Expand All @@ -1667,6 +1676,13 @@ def handle_process_outputs(
if results is None:
results = []

if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)):
object_type = type(results).__name__
raise TypeError(
'Returning a %s from a ParDo or FlatMap is not allowed. '
'Please use list(%r) if you really want this behavior.' %
(object_type, results))
Comment thread
chrisqiqiu marked this conversation as resolved.

# TODO(https://github.com/apache/beam/issues/20404): Verify that the
# results object is a valid iterable type if
# performance_runtime_type_check is active, without harming performance
Expand Down
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/runners/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,47 @@ def process(self, element, mykey=DoFn.KeyParam):
test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
(p | test_stream | beam.ParDo(DoFnProcessWithKeyparam()))

def test_dofn_returning_str_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712.

A DoFn returning a str instead of an iterable wrapping one used to
silently iterate per-character. It should now raise a clear TypeError.
"""
class BadDoFn(DoFn):
def process(self, element):
return 'hello'

# Use base Exception (matching existing convention in
# typecheck_test.py::test_do_fn_returning_non_iterable_throws_error)
# because the runner's _reraise_augmented wraps the TypeError before
# it surfaces to the test framework.
with self.assertRaisesRegex(
Exception, 'Returning a str from a ParDo or FlatMap is not allowed'):
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_bytes_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return b'hello'

with self.assertRaisesRegex(
Exception, 'Returning a bytes from a ParDo or FlatMap is not allowed'):
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_dict_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return {'k': 'v'}

with self.assertRaisesRegex(
Exception, 'Returning a dict from a ParDo or FlatMap is not allowed'):
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_pardo_with_unbounded_per_element_dofn(self):
class UnboundedDoFn(beam.DoFn):
@beam.DoFn.unbounded_per_element()
Expand Down
Loading