Skip to content

Refactor AbstractMessageStream and implementations#4396

Open
hjohn wants to merge 3 commits intoaxon-5.1.xfrom
feature/abstract-deluxe-message-stream
Open

Refactor AbstractMessageStream and implementations#4396
hjohn wants to merge 3 commits intoaxon-5.1.xfrom
feature/abstract-deluxe-message-stream

Conversation

@hjohn
Copy link
Copy Markdown
Contributor

@hjohn hjohn commented Apr 8, 2026

  • Move most of the complicated logic to AbstractMessageStream for streams deriving from it
  • Made AbstractMessageStream safe to extend without breaking its invariants
  • Based FluxMessageStream on AbstractMessageStream
  • Based ConcatenatingMessageStream on AbstractMessageStream
  • Simplified ConcatenatingMessageStream using active stream logic
  • Removed unused code from QueueMessageStream and renamed methods for clarity

By doing the above, this PR resolves #4356

This also fixes a bug in DefaultEventStoreTransaction that I ran into while fixing streams:

Resolves #4200

@hjohn hjohn requested a review from a team as a code owner April 8, 2026 15:59
@hjohn hjohn requested review from MateuszNaKodach, abuijze and corradom and removed request for a team April 8, 2026 15:59
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch from 4415738 to 4809dc0 Compare April 8, 2026 16:01
@hjohn hjohn self-assigned this Apr 8, 2026
@hjohn hjohn added the Priority 2: Should High priority. Ideally, these issues are part of the release they’re assigned to. label Apr 8, 2026
@smcvb smcvb added this to the Release 5.2.0 milestone Apr 9, 2026
@smcvb smcvb added Type: Bug Use to signal issues that describe a bug within the system. Type: Enhancement Use to signal an issue enhances an already existing feature of the project. labels Apr 9, 2026
Copy link
Copy Markdown
Contributor

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First and foremost: tremendous simplification of the concrete stream implementations! On the other end, the AbstractMessageStream became a tad more complex, let alone concretely validating the shift in a PR. I do have a bunch of concerns and some questions. Honestly not 100% sure if they all merit changes, but I am going to be protective here given the role of the MessageStream.

@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch from 4809dc0 to 467d49c Compare April 9, 2026 14:34
@hjohn hjohn changed the base branch from main to axon-5.1.x April 9, 2026 14:34
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch 3 times, most recently from 7b9184f to a9faf4e Compare April 13, 2026 08:58
Copy link
Copy Markdown
Contributor

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of synchronized could be documented, as it's a well thought off decision. Furthermore, the before-each assertion seems off to me. Lastly, the flux-pre-subscribe is AFAIK not the typical approach of Project Reactor-like components. Hence, I think it's best if we stick to what users expect when using this.

@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch from a9faf4e to 307cb20 Compare April 13, 2026 12:43
Copy link
Copy Markdown
Contributor

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concerns have been addressed, hence I'm approving this pull request.

@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch 6 times, most recently from 68fa755 to 92ccac0 Compare April 15, 2026 11:13
@hjohn hjohn requested a review from smcvb April 15, 2026 11:17
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch 2 times, most recently from a4b160f to f405fd2 Compare April 15, 2026 13:05
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch 4 times, most recently from a229e81 to 89199bf Compare April 16, 2026 02:34
@smcvb smcvb modified the milestones: Release 5.2.0, Release 5.1.0 Apr 16, 2026
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch 2 times, most recently from 8f1d6f6 to 4923a30 Compare April 16, 2026 13:14
Comment on lines +114 to +116
if (!Boolean.TRUE.equals(processingContext.getResource(prepareCommitExecuted))) {
updateAppendPosition(marker);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct description of this change?

The Bug Fix in DefaultEventStoreTransaction

The source() method attaches a handler that updates the append position when a stream completes. But if source() was called multiple times (for multiple sourcing conditions), the onComplete callback fired multiple times, updating
the position each time — causing incorrect behavior.

Fix: a prepareCommitExecuted flag on the ProcessingContext:

Before:
source() call 1 ──► onComplete ──► updateAppendPosition() ✓
source() call 2 ──► onComplete ──► updateAppendPosition() ✗ (ran again!)

After:
source() call 1 ──► onComplete ──► prepareCommitExecuted? NO → updateAppendPosition()
set prepareCommitExecuted = true
source() call 2 ──► onComplete ──► prepareCommitExecuted? YES → skip ✓

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not quite what happened. There were two issues:

  1. onComplete fires earlier now when it detects an empty stream, meaning that the append position is already updated (!= ConsistencyMarker.ORIGIN)
  2. When the 2nd stream is requested (in the afterCommit) it would return a stream without filtering the terminal message (because it entered to broken path which lacks a .filter), on which the test fails.

I'm unsure what the test tried to accomplish, but I will make a ticket to look into it further.

hjohn added 2 commits April 17, 2026 15:03
- Move most of the complicated logic to AbstractMessageStream for
streams deriving from it
- Made AbstractMessageStream safe to extend without breaking its
invariants
- Based FluxMessageStream on AbstractMessageStream
- Based ConcatenatingMessageStream on AbstractMessageStream
- Simplified ConcatenatingMessageStream using active stream logic
- Removed unused code from QueueMessageStream and renamed methods for
clarity
@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch from 9d29ac6 to d0d58b2 Compare April 17, 2026 13:04
Copy link
Copy Markdown
Contributor

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question in the DefaultEventStoreTransaction and MessageStream are most important to me. The other comments are mostly clean-up.

However, I think that regardless of your reply on the first two comments, we're good to go with this (rather massive) undertaking. Let's see this in action! 💪

return MessageStreamUtils.asCompletableFuture(this);

/*
* NOTE: We intentionally use full reduce-based consumption instead of a single next()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we're dealing with something unbounded, as could be the case with Flux? Wouldn't that mean this operation potentially doesn't complete?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you can't do a reduce on unbounded streams; I created a ticket for this that if we detect something is unbounded (not easy with delayed streams) it will do throw an exception as soon as it discovers this instead of hanging.

Note that first is unaffected, that still will work fine.

What FirstResult did was really weird, completing potentially an unbounded stream...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have that issue number for me perhaps? 🙏

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket for this is #4411

* @return A {@code CompletableFuture} that completes with the first {@link MessageStream.Entry entry} from the
* stream.
*/
public static <M extends Message> CompletableFuture<MessageStream.Entry<M>> asCompletableFuture(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class is a perfect candidate to mark @Internal. Honestly, this is actually a breaking change that you're doing already. Granted, I dare to guess nobody is using this other than us. Hence, please mark it internal; that gives us the required freedom to adjust this further in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@hjohn hjohn force-pushed the feature/abstract-deluxe-message-stream branch from d0d58b2 to d82b326 Compare April 17, 2026 14:22
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Priority 2: Should High priority. Ideally, these issues are part of the release they’re assigned to. Type: Bug Use to signal issues that describe a bug within the system. Type: Enhancement Use to signal an issue enhances an already existing feature of the project.

Projects

None yet

3 participants