Skip to content

Flush Immediately After Publish Fix#1550

Merged
scottf merged 2 commits intomainfrom
flush-immediately
Apr 2, 2026
Merged

Flush Immediately After Publish Fix#1550
scottf merged 2 commits intomainfrom
flush-immediately

Conversation

@scottf
Copy link
Copy Markdown
Contributor

@scottf scottf commented Apr 2, 2026

Make sure bytes are written to data port when flushImmediatelyAfterPublish

Make sure bytes are written to data port when flushImmediatelyAfterPublish
@scottf
Copy link
Copy Markdown
Contributor Author

scottf commented Apr 2, 2026

@claude please review

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Apr 2, 2026

Claude finished @scottf's task in 1m 23s —— View job


Code Review

  • Gather context
  • Read changed file and diff
  • Analyze the fix
  • Post review feedback

The fix is correct and well-targeted. This is a clear bug fix with minimal scope.

Bug Summary

In sendMessageBatch (NatsConnectionWriter.java), when msg.flushImmediatelyAfterPublish was true, the original code called dataPort.flush() without first calling dataPort.write(). The message bytes had been accumulated into sendBuffer but were never actually pushed to the dataPort's output stream — so the flush had no effect on the message just processed.

Fix Analysis

Before:

stats.incrementOut(size);
if (msg.flushImmediatelyAfterPublish) {
    dataPort.flush();  // ← nothing was written to dataPort yet!
}

After:

if (msg.flushImmediatelyAfterPublish) {
    dataPort.write(sendBuffer, sendPosition);  // ← write accumulated bytes first
    dataPort.flush();
    stats.registerWrite(sendPosition);
    sendPosition = 0;  // ← reset so end-of-loop write doesn't double-send
}
stats.incrementOut(size);

All four changes are correct and necessary:

  1. dataPort.write(sendBuffer, sendPosition) — The missing write; this is the core of the fix.
  2. dataPort.flush() — Now meaningful since bytes were just written.
  3. stats.registerWrite(sendPosition) — Records the bytes written in this flush (may include bytes from earlier messages in the batch that preceded this flush-immediate message, which is correct behavior).
  4. sendPosition = 0 — Prevents the final if (sendPosition > 0) write at the end of the method from double-writing the same bytes.

Notes

  • The WriterMessageQueue.accumulate() loop (line 142) stops accumulating once it encounters a flushImmediatelyAfterPublish message as the cursor, but it can still include one earlier non-flush message in the batch. The fix correctly handles this by flushing all pending bytes at that point.
  • The added comment (// if last message was flushImmediatelyAfterPublish, send position will always be 0) is accurate and helpful.

Verdict

LGTM. The fix is minimal, correct, and doesn't touch anything beyond the bug. No concerns.

Copy link
Copy Markdown
Member

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scottf scottf merged commit c67c7fc into main Apr 2, 2026
5 checks passed
@scottf scottf deleted the flush-immediately branch April 2, 2026 12:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants