Skip to content

[FLINK-38233] Adds support to StreamNonDeterministicUpdatePlanVisitor for PTF changelogs#28061

Open
AlanConfluent wants to merge 2 commits intoapache:masterfrom
AlanConfluent:FLINK-38233
Open

[FLINK-38233] Adds support to StreamNonDeterministicUpdatePlanVisitor for PTF changelogs#28061
AlanConfluent wants to merge 2 commits intoapache:masterfrom
AlanConfluent:FLINK-38233

Conversation

@AlanConfluent
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Adds support for PTFs to StreamNonDeterministicUpdatePlanVisitor. Essentially finds uses of indeterminism that are considered errors for PTFs.

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • By running the added unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code 2.1.121

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 28, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

* that Concern 1 (PTF own non-determinism) is caught by the NDU visitor when downstream
* requires deterministic output columns.
*/
public static class NonDeterministicUpdatingRetractFunction extends UpdatingRetractFunction {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all the disabling of the NDU analyzer in semantic and other PTFs tests. If I remember correctly, there are a couple of TODOs in various PTF tests for this PR.

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @AlanConfluent, thanks for the clean PR. I added some comments in the first pass and I'm thinking of how we can make this more complete. We're only using the traits but not using some internal planner medatada. Some things to check:

1 - requireDeterminismExcludeUpsertKey is being skipped. Every other recursive visitor pushes input requirements through visitInputs, which calls requireDeterminismExcludeUpsertKey(input, requireDeterminism). Take a look if that makes sense here
2 - getNonDeterministicCallName doesn't catch dynamic functions. FlinkRexUtil.getNonDeterministicCallName recurses into operands but only checks !operator.isDeterministic. It does not check isDynamicFunction() which is done in other parts here. Check if that is relevant

And one observation to the logic that you might want to adjust in the comments. Per SUPPORT_UPDATES doc (lines 167-172):

"The function receives {+I,+U,-D} if the input table is upserting using the same upsert key as the partition key. Otherwise, retractions {+I,-U,+U,-D} (i.e. including UPDATE_BEFORE) enter the function."

So even without REQUIRE_UPDATE_BEFORE declared, the function physically receives UB when partition_key ⊄ upsert_key. The visitor only consults the static trait. Practically I believe this doesn't affect correctness (the function isn't required to consume UB just because it arrives). So we can assume the UB is not relevant but there might be UBs.

Comment on lines +974 to +979
// Concern 1: PTF function itself is non-deterministic.
// requireDeterminism is the set of PTF output columns downstream requires to be
// deterministic (for correct retract matching). All PTF output columns are assumed to
// come from PTF's computation — there are pass-through input columns potentially, but
// they are not considered specially for now. If the PTF is non-deterministic and any of
// those columns are required, we cannot satisfy the requirement.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: the code is self-explanatory, we can simplify the comment

Suggested change
// Concern 1: PTF function itself is non-deterministic.
// requireDeterminism is the set of PTF output columns downstream requires to be
// deterministic (for correct retract matching). All PTF output columns are assumed to
// come from PTF's computation — there are pass-through input columns potentially, but
// they are not considered specially for now. If the PTF is non-deterministic and any of
// those columns are required, we cannot satisfy the requirement.
// Concern 1: PTF function itself is non-deterministic and downstream nodes
// require determinism. PTFs can have pass-through input columns, but they
// are not considered specially for now.

// they are not considered specially for now. If the PTF is non-deterministic and any of
// those columns are required, we cannot satisfy the requirement.
if (!requireDeterminism.isEmpty()) {
final Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallName(call);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking if the function output is deterministic fits the current code well since everything is there. From a correctness and completeness perspective, we would have optimally an option so users tell exactly which columns are deterministic and which aren't so we check that against the ImmutableBitSet that requireDeterminism.

I don't see user's taking the time to configure that properly but I see builtin PTFs requiring this to make sure the planner can properly check determinism per column

Comment on lines +989 to +990
// Insert-only input: the PTF manages retract correctness internally via its own
// state, so no requirement is pushed to inputs.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PTF doesn't even see retracts so it doesn't have any retract correctness to manage internally

Suggested change
// Insert-only input: the PTF manages retract correctness internally via its own
// state, so no requirement is pushed to inputs.
// No retracts arrive at the PTF input, so input-column determinism does
// not affect retract correctness

Comment on lines +1021 to +1022
// Retracts are routed by partition key only — only partition key columns must
// be deterministic.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It thing we are worried about in this case aren't the retractions - but rather a I, U and D events should land in the same node

Suggested change
// Retracts are routed by partition key only — only partition key columns must
// be deterministic.
// Inserts, update and deletions are routed by partition key only — thus the partition key columns must be deterministic.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants