[SPARK-57275][CONNECT] Validate row count after consuming all arrow batches#56343
Open
biruktesf-db wants to merge 1 commit into
Open
[SPARK-57275][CONNECT] Validate row count after consuming all arrow batches#56343biruktesf-db wants to merge 1 commit into
biruktesf-db wants to merge 1 commit into
Conversation
73dfc7e to
568fe92
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
When consuming query results on the spark connect client, count each RecordBatch once (num_records += batch.num_rows) and validate row_count only after the IPC stream is fully consumed. The Scala client in SparkResult.scala already validates after the loop and is unaffected.
Why are the changes needed?
The Arrow IPC streaming format wraps a result as
[Schema][RecordBatch]*[EOS]a single message can carry multiple RecordBatches, andpa.ipc.open_stream(...)parses all of them. The server's arrow_batch.row_count is the total rows across every RecordBatch in the message and the spark connect client validates the row count inside the per-batch loop:When a message contains more than one RecordBatch, the check fires after the first batch, before the stream is fully consumed, and throws:
SparkConnectException: Expected N rows in arrow batch but got M. (M < N)Impact: Any code path that produces multi-RecordBatch IPC streams fails to fetch results, even though the payload is well-formed and parseable by PyArrow.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests for the client,
Was this patch authored or co-authored using generative AI tooling?
Generated-by:Claude Opus 4.8