diff --git a/flink-python/pyflink/fn_execution/table/async_function/operations.py b/flink-python/pyflink/fn_execution/table/async_function/operations.py index 5d536c4749663..da325ab9aa3bd 100644 --- a/flink-python/pyflink/fn_execution/table/async_function/operations.py +++ b/flink-python/pyflink/fn_execution/table/async_function/operations.py @@ -60,6 +60,11 @@ def __init__(self, serialized_fn): operation_utils.extract_user_defined_function( serialized_fn.udfs[0], one_arg_optimization=False) + # Mirror PythonScalarFunctionOperator.createInputCoderInfoDescriptor: + # Java picks FlattenRowCoder unless some UDF takes a row as input. + self._input_is_flatten_row = not any( + udf.takes_row_as_input for udf in serialized_fn.udfs) + # Create the eval function self._eval_func = eval('lambda value: %s' % scalar_function, variable_dict) @@ -140,6 +145,12 @@ def process_element(self, value): """ self._raise_exception_if_exists() + # The Cython FlattenRowCoderImpl returns a reused list whose slots are + # overwritten by the next decode, so we must snapshot the row before + # the async closure can capture it. + if self._input_is_flatten_row: + value = list(value) + entry = self._queue.put(None, 0, 0, value) async def execute_async(rh):