From 4ef1b5bbf3e659ac7524b28e7f77b9c6de5b3307 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 7 May 2026 10:59:42 +0100 Subject: [PATCH 1/2] Avoid serialization and deserialization for coordinator-local single partition data read Currently, when we execute a local read we fetch data from SSTables and Memtables using a merging iterator and write it to a byte buffer. Later when we combine a CQL response we deserialize the data back to iterate over them as a part of coordinator logic. So, we allocate rows and cells twice here, during the read from SSTables/Memtables and during the deserialization by coordinator logic if we read data locally (it is a typical scenario because usually drivers are sending requests to replicas). The idea of optimization: if we do a single partition read of a small number of rows we can keep the data in memory and avoid double row objects allocation. A system property is used to limit number of rows we keep in memory in this scenario (to avoid too much pressure on GC due to extended lifetime for these objects and promoting them to an old generation). The property also allows to disable the logic in case of any issues. We cannot get a number of rows in advance, so we read first N rows to memory and if we still have something then we serialize the remaining to a buffer and then concatenate iterators for the in-memory rows + deserialized one when we need to iterate over the result patch by Dmitry Konstantinov; reviewed by TBD for CASSANDRA-21354 --- .../config/CassandraRelevantProperties.java | 1 + .../org/apache/cassandra/db/ReadCommand.java | 18 +- .../org/apache/cassandra/db/ReadResponse.java | 195 +++++++++++++++- .../cassandra/service/StorageProxy.java | 5 +- .../apache/cassandra/db/ReadResponseTest.java | 215 +++++++++++++++++- 5 files changed, 421 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index c72577e7ec1c..ad475a142777 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -207,6 +207,7 @@ public enum CassandraRelevantProperties DATA_OUTPUT_STREAM_PLUS_TEMP_BUFFER_SIZE("cassandra.data_output_stream_plus_temp_buffer_size", "8192"), DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX("cassandra.data_response_buffer_initial_size_max", "4096"), DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN("cassandra.data_response_buffer_initial_size_min", "128"), + DATA_RESPONSE_IN_MEMORY_MAX_ROWS("cassandra.data_response_in_memory_max_rows", "128"), DECAYING_ESTIMATED_HISTOGRAM_RESERVOIR_STRIPE_COUNT("cassandra.dehr_stripe_count", "2"), DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES("default.provide.overlapping.tombstones"), /** determinism properties for testing */ diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d161fb315d2c..5bc4171187d2 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -436,14 +436,26 @@ public ReadCommand copyAsDigestQuery(Iterable replicas) public abstract boolean isReversed(); public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi) + { + return createResponse(iterator, rdi, false); + } + public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi, boolean localRead) { // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both // ends equal, and there are no dangling RT bound in any partition. iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true); + if (isDigestQuery()) + return ReadResponse.createDigestResponse(iterator, this); - return isDigestQuery() - ? ReadResponse.createDigestResponse(iterator, this) - : ReadResponse.createDataResponse(iterator, this, rdi); + if (localRead && ReadResponse.IN_MEMORY_MAX_ROWS > 0) + return ReadResponse.createInMemoryDataResponse(iterator, this, rdi); + + return ReadResponse.createDataResponse(iterator, this, rdi); + } + + public ReadResponse createLocalObjectResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi) + { + return createResponse(iterator, rdi, true); } public ReadResponse createEmptyResponse() diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 7edd743ad416..4b60932fb1cb 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -21,16 +21,23 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; @@ -49,6 +56,8 @@ public abstract class ReadResponse // Serializer for single partition read response public static final IVersionedSerializer serializer = new Serializer(); + static final int IN_MEMORY_MAX_ROWS = CassandraRelevantProperties.DATA_RESPONSE_IN_MEMORY_MAX_ROWS.getInt(); + protected ReadResponse() { } @@ -63,9 +72,15 @@ public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, return new LocalDataResponse(data, command, NO_OP_REPAIRED_DATA_INFO); } - public static ReadResponse createSimpleDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) + static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi) + { + return createInMemoryDataResponse(data, command, rdi, IN_MEMORY_MAX_ROWS); + } + + @VisibleForTesting + static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi, int maxRows) { - return new LocalDataResponse(data, selection); + return new InMemoryDataResponse(data, command, rdi, maxRows); } @VisibleForTesting @@ -229,6 +244,9 @@ private static class LocalDataResponse extends DataResponse { // Exponential moving average of response sizes, used to set initial size of output buffer. private static final MovingAverage estimatedResponseBytes = ExpMovingAverage.decayBy1000(); + + // Exponential moving average of overflow(!) sizes, used to set initial size of output buffer. + private static final MovingAverage estimatedOverflowBytes = ExpMovingAverage.decayBy1000(); private static final int bufferInitialSizeMin = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN.getInt(); private static final int bufferInitialSizeMax = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX.getInt(); @@ -240,9 +258,28 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, DeserializationHelper.Flag.LOCAL); } - private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selection) + private static ByteBuffer buildOverflow(UnfilteredRowIterator rowIter, ColumnFilter selection) { - super(build(iter, selection), null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL); + int initialBufferSize = bufferInitialSizeMin; + + if (bufferInitialSizeMax > bufferInitialSizeMin) + { + double estimatedOverflowSize = estimatedOverflowBytes.get(); + double bufferSizeEstimate = Double.isNaN(estimatedOverflowSize) ? bufferInitialSizeMin : estimatedOverflowSize * 1.1; + initialBufferSize = Math.min((int) bufferSizeEstimate, bufferInitialSizeMax); + } + + try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize)) + { + // NOTE: we use UnfilteredRowIteratorSerializer here, not UnfilteredPartitionIterators + UnfilteredRowIteratorSerializer.serializer.serialize(rowIter, selection, buffer, MessagingService.current_version); + estimatedOverflowBytes.update(buffer.position()); + return buffer.buffer(false); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) @@ -354,6 +391,152 @@ public boolean isDigestResponse() { return false; } + + protected ByteBuffer getSerializedData() + { + return data; + } + } + + // built on the coordinator for local single-partition reads; never sent over the network + private static class InMemoryDataResponse extends DataResponse + { + // rows up to the max-rows limit stored as an in-memory partition; null if the response was empty + private final ImmutableBTreePartition partition; + // rows beyond the max-rows limit serialized to a buffer; null if all rows fit in memory + private final ByteBuffer overflow; + + private InMemoryDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi, int maxRows) + { + super(null, rdi.getDigest(), rdi.isConclusive(), MessagingService.current_version, DeserializationHelper.Flag.LOCAL); + + if (!iter.hasNext()) + { + this.partition = null; + this.overflow = null; + return; + } + + try (UnfilteredRowIterator rowIter = iter.next()) + { + LimitedUnfilteredRowIterator limitedIter = new LimitedUnfilteredRowIterator(rowIter, maxRows); + this.partition = ImmutableBTreePartition.create(limitedIter); + // Uses buildOverflow (row-iterator level) to match the UnfilteredRowIteratorSerializer + // deserializer used in makeIterator. + this.overflow = limitedIter.hasOverflow() + ? LocalDataResponse.buildOverflow(rowIter, command.columnFilter()) + : null; + } + } + + // Decorating iterator that stops after maxRows Unfiltered objects and records whether overflow occurred. + // Does NOT close the wrapped iterator on close() so the caller can continue reading overflow rows. + private static class LimitedUnfilteredRowIterator extends AbstractUnfilteredRowIterator + { + private final UnfilteredRowIterator wrapped; + private final int maxRows; + private int rowCount = 0; + private boolean hasOverflow = false; + + private LimitedUnfilteredRowIterator(UnfilteredRowIterator wrapped, int maxRows) + { + super(wrapped.metadata(), + wrapped.partitionKey(), + wrapped.partitionLevelDeletion(), + wrapped.columns(), + wrapped.staticRow(), + wrapped.isReverseOrder(), + wrapped.stats()); + this.wrapped = wrapped; + this.maxRows = maxRows; + } + + boolean hasOverflow() + { + return hasOverflow; + } + + @Override + protected Unfiltered computeNext() + { + if (rowCount >= maxRows) + { + hasOverflow = wrapped.hasNext(); + return endOfData(); + } + if (!wrapped.hasNext()) + return endOfData(); + rowCount++; + return wrapped.next(); + } + } + + @Override + public UnfilteredPartitionIterator makeIterator(ReadCommand command) + { + if (partition == null) + return EmptyIterators.unfilteredPartition(command.metadata()); + + UnfilteredRowIterator inMemoryIter = partition.unfilteredIterator(command.columnFilter(), Slices.ALL, command.isReversed()); + + if (overflow == null) + return new SingletonUnfilteredPartitionIterator(command.metadata(), inMemoryIter); + + // Deserialize the overflow and concatenate with the in-memory part. + // Buffer closing is not needed and actually is incorrect + // because the result iterator uses the buffer context + DataInputBuffer in = new DataInputBuffer(overflow, true); + try + { + UnfilteredRowIterator overflowIter = UnfilteredRowIteratorSerializer.serializer.deserialize( + in, MessagingService.current_version, command.metadata(), command.columnFilter(), DeserializationHelper.Flag.LOCAL); + UnfilteredRowIterator combined = UnfilteredRowIterators.concat(inMemoryIter, overflowIter); + return new SingletonUnfilteredPartitionIterator(command.metadata(), combined); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + protected ByteBuffer getSerializedData() + { + throw new UnsupportedOperationException("InMemoryDataResponse cannot be serialized over the network"); + } + } + + private static class SingletonUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator + { + private final TableMetadata metadata; + private final UnfilteredRowIterator partition; + private boolean returned = false; + + private SingletonUnfilteredPartitionIterator(TableMetadata metadata, UnfilteredRowIterator partition) + { + this.metadata = metadata; + this.partition = partition; + } + + public TableMetadata metadata() { return metadata; } + + public boolean hasNext() + { + return !returned; + } + + public UnfilteredRowIterator next() + { + if (returned) throw new NoSuchElementException(); + returned = true; + return partition; + } + + @Override + public void close() + { + partition.close(); + } } private static class Serializer implements IVersionedSerializer @@ -378,7 +561,7 @@ public void serialize(ReadResponse response, DataOutputPlus out, int version) th ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out); out.writeBoolean(response.isRepairedDigestConclusive()); - ByteBuffer data = ((DataResponse)response).data; + ByteBuffer data = ((DataResponse)response).getSerializedData(); ByteBufferUtil.writeWithVIntLength(data, out); } } @@ -418,7 +601,7 @@ public long serializedSize(ReadResponse response, int version) // In theory, we should deserialize/re-serialize if the version asked is different from the current // version as the content could have a different serialization format. So far though, we haven't made // change to partition iterators serialization since 3.0 so we skip this. - ByteBuffer data = ((DataResponse)response).data; + ByteBuffer data = ((DataResponse)response).getSerializedData(); size += ByteBufferUtil.serializedSizeWithVIntLength(data); } return size; diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 5c1ebc9ed5f3..c267123b95df 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2748,7 +2748,10 @@ protected void runMayThrow() try (ReadExecutionController controller = command.executionController(trackRepairedStatus); UnfilteredPartitionIterator iterator = command.executeLocally(controller)) { - response = command.createResponse(iterator, controller.getRepairedDataInfo()); + if (command.isLimitedToOnePartition() && !command.isDigestQuery()) + response = command.createLocalObjectResponse(iterator, controller.getRepairedDataInfo()); + else + response = command.createResponse(iterator, controller.getRepairedDataInfo()); } catch (RejectException e) { diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java index ebe1cf0322b4..03956bfc5fd3 100644 --- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.junit.Before; @@ -27,16 +29,24 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -50,6 +60,7 @@ public class ReadResponseTest { private final Random random = new Random(); private TableMetadata metadata; + private TableMetadata metadataWithClustering; @BeforeClass public static void beforeClass() @@ -60,13 +71,20 @@ public static void beforeClass() @Before public void setup() { - metadata = TableMetadata.builder("ks", "t1") .offline() .addPartitionKeyColumn("p", Int32Type.instance) .addRegularColumn("v", Int32Type.instance) .partitioner(Murmur3Partitioner.instance) .build(); + + metadataWithClustering = TableMetadata.builder("ks", "t2") + .offline() + .addPartitionKeyColumn("p", Int32Type.instance) + .addClusteringColumn("c", Int32Type.instance) + .addRegularColumn("v", Int32Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); } @Test @@ -172,6 +190,192 @@ public void makeDigestDoesntConsiderRepairedDataInfo() assertEquals(response1.digest(command1), response2.digest(command2)); } + @Test + public void inMemoryResponseEmptyIteratorMatchesLocalDataResponse() + { + ReadCommand command = command(key(), metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + ReadResponse localResponse = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + ReadResponse inMemoryResponse = command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test + public void inMemoryResponseWithRowsMatchesLocalDataResponse() + { + int key = key(); + ReadCommand command = command(key, metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + DecoratedKey dk = metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)); + Row row = buildRow(metadata, dk); + PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, row); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = command.createLocalObjectResponse(singlePartitionIterator(update), rdi); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test(expected = UnsupportedOperationException.class) + public void inMemoryResponseCannotBeSerialized() + { + ReadCommand command = command(key(), metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + ReadResponse response = command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + + try (DataOutputBuffer out = new DataOutputBuffer()) + { + ReadResponse.serializer.serialize(response, out, MessagingService.current_version); + } + catch (IOException e) + { + fail("Unexpected IOException: " + e.getMessage()); + } + } + + @Test + public void inMemoryResponseWithOverflowMatchesLocalDataResponse() + { + // 5 rows, only 2 fit in memory — 3 overflow into the serialized buffer + testMultipleRows(2, 5); + } + + @Test + public void inMemoryResponseAllRowsInMemoryWhenUnderLimit() + { + // 3 rows, limit is 10 — all fit in memory with no overflow + testMultipleRows(10, 3); + } + + @Test + public void inMemoryResponseWithZeroMaxRowsUsesOnlyOverflow() + { + // all rows go directly into the overflow buffer + testMultipleRows(0, 3); + } + + private void testMultipleRows(int maxRows, int rows) + { + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + PartitionUpdate update = buildMultiRowUpdate(metadataWithClustering, key, rows); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = ReadResponse.createInMemoryDataResponse(singlePartitionIterator(update), command, rdi, maxRows); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test + public void inMemoryResponseWithRangeTombstoneOnlyAllInMemory() + { + // Partition contains only a range tombstone (no rows); limit is large enough that nothing overflows + testWithTombstones(10, 0, 0, 5); + } + + @Test + public void inMemoryResponseWithRangeTombstoneOnlyOverflow() + { + // 5 rows at clusterings 0-4 plus a range tombstone covering [6, 9]; + // rows 0-1 go in-memory, rows 2-4 and the RT go to overflow + testWithTombstones(2, 5, 6, 9); + } + + @Test + public void inMemoryResponseWithRangeTombstoneBetweenInMemoryAndOverflow() + { + // RT at [1, 2] sits between the in-memory rows (0) and the overflow rows (3-4) + testWithTombstones(1, 5, 1, 2); + } + + private void testWithTombstones(int maxRows, int rows, int rtStart, int rtEnd) + { + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + PartitionUpdate update = buildUpdateWithRowsAndRangeTombstone(metadataWithClustering, key, rows, rtStart, rtEnd); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = ReadResponse.createInMemoryDataResponse(singlePartitionIterator(update), command, rdi, maxRows); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + + private PartitionUpdate buildUpdateWithRowsAndRangeTombstone(TableMetadata metadata, int partitionKey, + int rowCount, int rtStart, int rtEnd) + { + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, ByteBufferUtil.bytes(partitionKey)).timestamp(0); + for (int i = 0; i < rowCount; i++) + builder.row(i).add("v", i); + builder.addRangeTombstone().start(rtStart).end(rtEnd); + return builder.build(); + } + + private void assertIteratorsEqual(ReadCommand command, ReadResponse expected, ReadResponse actual) + { + List expectedUnfiltered = collectUnfiltered(command, expected); + List actualUnfiltered = collectUnfiltered(command, actual); + assertEquals(expectedUnfiltered, actualUnfiltered); + } + + private List collectUnfiltered(ReadCommand command, ReadResponse response) + { + List result = new ArrayList<>(); + try (UnfilteredPartitionIterator iter = response.makeIterator(command)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + while (partition.hasNext()) + result.add(partition.next().toString(partition.metadata(), true)); + } + } + } + return result; + } + + private Row buildRow(TableMetadata metadata, DecoratedKey key) + { + ColumnMetadata col = metadata.getColumn(ByteBufferUtil.bytes("v")); + Clustering clustering = Clustering.EMPTY; + return BTreeRow.singleCellRow(clustering, BufferCell.live(col, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(42))); + } + + private PartitionUpdate buildMultiRowUpdate(TableMetadata metadata, int partitionKey, int rowCount) + { + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, ByteBufferUtil.bytes(partitionKey)).timestamp(0); + for (int i = 0; i < rowCount; i++) + builder.row(i).add("v", i); + return builder.build(); + } + + private UnfilteredPartitionIterator singlePartitionIterator(PartitionUpdate update) + { + UnfilteredRowIterator rowIter = update.unfilteredIterator(); + return new AbstractUnfilteredPartitionIterator() + { + private boolean returned = false; + + public TableMetadata metadata() { return update.metadata(); } + + public boolean hasNext() { return !returned; } + + public UnfilteredRowIterator next() + { + returned = true; + return rowIter; + } + }; + } + private void verifySerDe(ReadResponse response) { // check that roundtripping through ReadResponse.serializer behaves as expected for (MessagingService.Version version : MessagingService.Version.supportedVersions()) @@ -222,6 +426,11 @@ private ReadCommand command(int key, TableMetadata metadata) return new StubReadCommand(key, metadata, false); } + private ReadCommand command(int key) + { + return command(key, metadata); + } + private static class StubRepairedDataInfo extends RepairedDataInfo { private final ByteBuffer repairedDigest; @@ -262,11 +471,11 @@ private static class StubReadCommand extends SinglePartitionReadCommand RowFilter.none(), DataLimits.NONE, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), - null, + new ClusteringIndexSliceFilter(Slices.ALL, false), null, false, null); - + } @Override From e32230a34dc07ecd8f20ac8f2c8b1460c71ef796 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 7 May 2026 21:14:54 +0100 Subject: [PATCH 2/2] fix RepairedDataInfo propagation --- .../org/apache/cassandra/db/ReadResponse.java | 33 ++++--- .../apache/cassandra/db/ReadResponseTest.java | 96 +++++++++++++++++++ 2 files changed, 116 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 4b60932fb1cb..5d1cdeb3447e 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -325,8 +325,8 @@ static abstract class DataResponse extends ReadResponse // TODO: can the digest be calculated over the raw bytes now? // The response, serialized in the current messaging version private final ByteBuffer data; - private final ByteBuffer repairedDataDigest; - private final boolean isRepairedDigestConclusive; + protected ByteBuffer repairedDataDigest; + protected boolean isRepairedDigestConclusive; private final int dataSerializationVersion; private final DeserializationHelper.Flag flag; @@ -408,25 +408,32 @@ private static class InMemoryDataResponse extends DataResponse private InMemoryDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi, int maxRows) { - super(null, rdi.getDigest(), rdi.isConclusive(), MessagingService.current_version, DeserializationHelper.Flag.LOCAL); + // pass fake values for rdi.getDigest(), rdi.isConclusive(), they will be calculated and set later + super(null, null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL); if (!iter.hasNext()) { this.partition = null; this.overflow = null; - return; } - - try (UnfilteredRowIterator rowIter = iter.next()) + else { - LimitedUnfilteredRowIterator limitedIter = new LimitedUnfilteredRowIterator(rowIter, maxRows); - this.partition = ImmutableBTreePartition.create(limitedIter); - // Uses buildOverflow (row-iterator level) to match the UnfilteredRowIteratorSerializer - // deserializer used in makeIterator. - this.overflow = limitedIter.hasOverflow() - ? LocalDataResponse.buildOverflow(rowIter, command.columnFilter()) - : null; + try (UnfilteredRowIterator rowIter = iter.next()) + { + LimitedUnfilteredRowIterator limitedIter = new LimitedUnfilteredRowIterator(rowIter, maxRows); + this.partition = ImmutableBTreePartition.create(limitedIter); + // Uses buildOverflow (row-iterator level) to match the UnfilteredRowIteratorSerializer + // deserializer used in makeIterator. + this.overflow = limitedIter.hasOverflow() + ? LocalDataResponse.buildOverflow(rowIter, command.columnFilter()) + : null; + } } + + // Capture digest after consuming the iterator so that any updates made by + // RepairedDataInfo transformations are reflected in the digest. + this.repairedDataDigest = rdi.getDigest(); + this.isRepairedDigestConclusive = rdi.isConclusive(); } // Decorating iterator that stops after maxRows Unfiltered objects and records whether overflow occurred. diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java index 03956bfc5fd3..99705312abb5 100644 --- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.WrappingUnfilteredRowIterator; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.io.util.DataInputBuffer; @@ -271,6 +272,26 @@ private void testMultipleRows(int maxRows, int rows) assertIteratorsEqual(command, localResponse, inMemoryResponse); } + @Test + public void inMemoryResponseCapturesRepairedDigestAfterIteratorIsConsumed() + { + // RepairedDataInfo updates its digest lazily as rows are consumed via withRepairedDataInfo + // transformations; this test uses a stub that simulates the same timing. + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + PartitionUpdate update = buildMultiRowUpdate(metadataWithClustering, key, 3); + + ByteBuffer expectedDigest = digest(); + // Returns EMPTY before any iteration; returns expectedDigest once the iterator has been consumed. + LazyRepairedDataInfo rdi = new LazyRepairedDataInfo(expectedDigest); + + ReadResponse response = ReadResponse.createInMemoryDataResponse(lazyRdiWrappedIterator(update, rdi), command, rdi, 10); + + assertTrue("digest should be non-empty after iterator was consumed", rdi.wasConsumedBeforeDigestCaptured()); + assertEquals(expectedDigest, response.repairedDataDigest()); + assertTrue(response.isRepairedDigestConclusive()); + } + @Test public void inMemoryResponseWithRangeTombstoneOnlyAllInMemory() { @@ -456,6 +477,81 @@ public boolean isConclusive() } } + /** + * Simulates a RepairedDataInfo that updates its digest lazily as rows are consumed. + * Returns EMPTY_BYTE_BUFFER until the partition iterator wrapping it is fully consumed, + * at which point getDigest() returns the provided expected digest. + * This lets us verify that InMemoryDataResponse captures the digest *after* consumption. + */ + private static class LazyRepairedDataInfo extends RepairedDataInfo + { + private final ByteBuffer finalDigest; + private boolean iteratorConsumed = false; + private boolean digestCapturedAfterConsumption = false; + + LazyRepairedDataInfo(ByteBuffer finalDigest) + { + super(null); + this.finalDigest = finalDigest; + } + + void markConsumed() + { + iteratorConsumed = true; + } + + boolean wasConsumedBeforeDigestCaptured() + { + return digestCapturedAfterConsumption; + } + + @Override + public ByteBuffer getDigest() + { + if (iteratorConsumed) + digestCapturedAfterConsumption = true; + return iteratorConsumed ? finalDigest : ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + + @Override + public boolean isConclusive() + { + return true; + } + } + + private UnfilteredPartitionIterator lazyRdiWrappedIterator(PartitionUpdate update, LazyRepairedDataInfo rdi) + { + // Wraps the row iterator so that close() marks rdi as consumed, simulating RepairedDataInfo + // updating its digest lazily after the partition has been fully read. + UnfilteredRowIterator baseRowIter = update.unfilteredIterator(); + UnfilteredRowIterator wrappedRowIter = new WrappingUnfilteredRowIterator() + { + public UnfilteredRowIterator wrapped() { return baseRowIter; } + + @Override + public void close() + { + rdi.markConsumed(); + baseRowIter.close(); + } + }; + return new AbstractUnfilteredPartitionIterator() + { + private boolean returned = false; + + public TableMetadata metadata() { return update.metadata(); } + + public boolean hasNext() { return !returned; } + + public UnfilteredRowIterator next() + { + returned = true; + return wrappedRowIter; + } + }; + } + private static class StubReadCommand extends SinglePartitionReadCommand { StubReadCommand(int key, TableMetadata metadata, boolean isDigest)