[FLINK-38233] Adds support to StreamNonDeterministicUpdatePlanVisitor for PTF changelogs#28061
[FLINK-38233] Adds support to StreamNonDeterministicUpdatePlanVisitor for PTF changelogs#28061AlanConfluent wants to merge 2 commits intoapache:masterfrom
Conversation
| * that Concern 1 (PTF own non-determinism) is caught by the NDU visitor when downstream | ||
| * requires deterministic output columns. | ||
| */ | ||
| public static class NonDeterministicUpdatingRetractFunction extends UpdatingRetractFunction { |
There was a problem hiding this comment.
Remove all the disabling of the NDU analyzer in semantic and other PTFs tests. If I remember correctly, there are a couple of TODOs in various PTF tests for this PR.
gustavodemorais
left a comment
There was a problem hiding this comment.
Hey @AlanConfluent, thanks for the clean PR. I added some comments in the first pass and I'm thinking of how we can make this more complete. We're only using the traits but not using some internal planner medatada. Some things to check:
1 - requireDeterminismExcludeUpsertKey is being skipped. Every other recursive visitor pushes input requirements through visitInputs, which calls requireDeterminismExcludeUpsertKey(input, requireDeterminism). Take a look if that makes sense here
2 - getNonDeterministicCallName doesn't catch dynamic functions. FlinkRexUtil.getNonDeterministicCallName recurses into operands but only checks !operator.isDeterministic. It does not check isDynamicFunction() which is done in other parts here. Check if that is relevant
And one observation to the logic that you might want to adjust in the comments. Per SUPPORT_UPDATES doc (lines 167-172):
"The function receives {+I,+U,-D} if the input table is upserting using the same upsert key as the partition key. Otherwise, retractions {+I,-U,+U,-D} (i.e. including UPDATE_BEFORE) enter the function."
So even without REQUIRE_UPDATE_BEFORE declared, the function physically receives UB when partition_key ⊄ upsert_key. The visitor only consults the static trait. Practically I believe this doesn't affect correctness (the function isn't required to consume UB just because it arrives). So we can assume the UB is not relevant but there might be UBs.
| // Concern 1: PTF function itself is non-deterministic. | ||
| // requireDeterminism is the set of PTF output columns downstream requires to be | ||
| // deterministic (for correct retract matching). All PTF output columns are assumed to | ||
| // come from PTF's computation — there are pass-through input columns potentially, but | ||
| // they are not considered specially for now. If the PTF is non-deterministic and any of | ||
| // those columns are required, we cannot satisfy the requirement. |
There was a problem hiding this comment.
Suggestion: the code is self-explanatory, we can simplify the comment
| // Concern 1: PTF function itself is non-deterministic. | |
| // requireDeterminism is the set of PTF output columns downstream requires to be | |
| // deterministic (for correct retract matching). All PTF output columns are assumed to | |
| // come from PTF's computation — there are pass-through input columns potentially, but | |
| // they are not considered specially for now. If the PTF is non-deterministic and any of | |
| // those columns are required, we cannot satisfy the requirement. | |
| // Concern 1: PTF function itself is non-deterministic and downstream nodes | |
| // require determinism. PTFs can have pass-through input columns, but they | |
| // are not considered specially for now. |
| // they are not considered specially for now. If the PTF is non-deterministic and any of | ||
| // those columns are required, we cannot satisfy the requirement. | ||
| if (!requireDeterminism.isEmpty()) { | ||
| final Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallName(call); |
There was a problem hiding this comment.
Checking if the function output is deterministic fits the current code well since everything is there. From a correctness and completeness perspective, we would have optimally an option so users tell exactly which columns are deterministic and which aren't so we check that against the ImmutableBitSet that requireDeterminism.
I don't see user's taking the time to configure that properly but I see builtin PTFs requiring this to make sure the planner can properly check determinism per column
| // Insert-only input: the PTF manages retract correctness internally via its own | ||
| // state, so no requirement is pushed to inputs. |
There was a problem hiding this comment.
The PTF doesn't even see retracts so it doesn't have any retract correctness to manage internally
| // Insert-only input: the PTF manages retract correctness internally via its own | |
| // state, so no requirement is pushed to inputs. | |
| // No retracts arrive at the PTF input, so input-column determinism does | |
| // not affect retract correctness |
| // Retracts are routed by partition key only — only partition key columns must | ||
| // be deterministic. |
There was a problem hiding this comment.
It thing we are worried about in this case aren't the retractions - but rather a I, U and D events should land in the same node
| // Retracts are routed by partition key only — only partition key columns must | |
| // be deterministic. | |
| // Inserts, update and deletions are routed by partition key only — thus the partition key columns must be deterministic. |
What is the purpose of the change
Adds support for PTFs to StreamNonDeterministicUpdatePlanVisitor. Essentially finds uses of indeterminism that are considered errors for PTFs.
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code 2.1.121