Refactor AbstractMessageStream and implementations#4396
Refactor AbstractMessageStream and implementations#4396hjohn wants to merge 3 commits intoaxon-5.1.xfrom
Conversation
4415738 to
4809dc0
Compare
smcvb
left a comment
There was a problem hiding this comment.
First and foremost: tremendous simplification of the concrete stream implementations! On the other end, the AbstractMessageStream became a tad more complex, let alone concretely validating the shift in a PR. I do have a bunch of concerns and some questions. Honestly not 100% sure if they all merit changes, but I am going to be protective here given the role of the MessageStream.
4809dc0 to
467d49c
Compare
7b9184f to
a9faf4e
Compare
smcvb
left a comment
There was a problem hiding this comment.
I think the use of synchronized could be documented, as it's a well thought off decision. Furthermore, the before-each assertion seems off to me. Lastly, the flux-pre-subscribe is AFAIK not the typical approach of Project Reactor-like components. Hence, I think it's best if we stick to what users expect when using this.
a9faf4e to
307cb20
Compare
smcvb
left a comment
There was a problem hiding this comment.
My concerns have been addressed, hence I'm approving this pull request.
68fa755 to
92ccac0
Compare
a4b160f to
f405fd2
Compare
a229e81 to
89199bf
Compare
8f1d6f6 to
4923a30
Compare
| if (!Boolean.TRUE.equals(processingContext.getResource(prepareCommitExecuted))) { | ||
| updateAppendPosition(marker); | ||
| } |
There was a problem hiding this comment.
Is it correct description of this change?
The Bug Fix in DefaultEventStoreTransaction
The source() method attaches a handler that updates the append position when a stream completes. But if source() was called multiple times (for multiple sourcing conditions), the onComplete callback fired multiple times, updating
the position each time — causing incorrect behavior.
Fix: a prepareCommitExecuted flag on the ProcessingContext:
Before:
source() call 1 ──► onComplete ──► updateAppendPosition() ✓
source() call 2 ──► onComplete ──► updateAppendPosition() ✗ (ran again!)
After:
source() call 1 ──► onComplete ──► prepareCommitExecuted? NO → updateAppendPosition()
set prepareCommitExecuted = true
source() call 2 ──► onComplete ──► prepareCommitExecuted? YES → skip ✓
There was a problem hiding this comment.
No, this is not quite what happened. There were two issues:
- onComplete fires earlier now when it detects an empty stream, meaning that the append position is already updated (!= ConsistencyMarker.ORIGIN)
- When the 2nd stream is requested (in the afterCommit) it would return a stream without filtering the terminal message (because it entered to broken path which lacks a
.filter), on which the test fails.
I'm unsure what the test tried to accomplish, but I will make a ticket to look into it further.
- Move most of the complicated logic to AbstractMessageStream for streams deriving from it - Made AbstractMessageStream safe to extend without breaking its invariants - Based FluxMessageStream on AbstractMessageStream - Based ConcatenatingMessageStream on AbstractMessageStream - Simplified ConcatenatingMessageStream using active stream logic - Removed unused code from QueueMessageStream and renamed methods for clarity
9d29ac6 to
d0d58b2
Compare
smcvb
left a comment
There was a problem hiding this comment.
The question in the DefaultEventStoreTransaction and MessageStream are most important to me. The other comments are mostly clean-up.
However, I think that regardless of your reply on the first two comments, we're good to go with this (rather massive) undertaking. Let's see this in action! 💪
| return MessageStreamUtils.asCompletableFuture(this); | ||
|
|
||
| /* | ||
| * NOTE: We intentionally use full reduce-based consumption instead of a single next() |
There was a problem hiding this comment.
What if we're dealing with something unbounded, as could be the case with Flux? Wouldn't that mean this operation potentially doesn't complete?
There was a problem hiding this comment.
Yeah, you can't do a reduce on unbounded streams; I created a ticket for this that if we detect something is unbounded (not easy with delayed streams) it will do throw an exception as soon as it discovers this instead of hanging.
Note that first is unaffected, that still will work fine.
What FirstResult did was really weird, completing potentially an unbounded stream...
There was a problem hiding this comment.
Do you have that issue number for me perhaps? 🙏
| * @return A {@code CompletableFuture} that completes with the first {@link MessageStream.Entry entry} from the | ||
| * stream. | ||
| */ | ||
| public static <M extends Message> CompletableFuture<MessageStream.Entry<M>> asCompletableFuture( |
There was a problem hiding this comment.
I think this class is a perfect candidate to mark @Internal. Honestly, this is actually a breaking change that you're doing already. Granted, I dare to guess nobody is using this other than us. Hence, please mark it internal; that gives us the required freedom to adjust this further in the future.
d0d58b2 to
d82b326
Compare
|




By doing the above, this PR resolves #4356
This also fixes a bug in
DefaultEventStoreTransactionthat I ran into while fixing streams:Resolves #4200