diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index c72577e7ec1..ad475a14277 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -207,6 +207,7 @@ public enum CassandraRelevantProperties DATA_OUTPUT_STREAM_PLUS_TEMP_BUFFER_SIZE("cassandra.data_output_stream_plus_temp_buffer_size", "8192"), DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX("cassandra.data_response_buffer_initial_size_max", "4096"), DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN("cassandra.data_response_buffer_initial_size_min", "128"), + DATA_RESPONSE_IN_MEMORY_MAX_ROWS("cassandra.data_response_in_memory_max_rows", "128"), DECAYING_ESTIMATED_HISTOGRAM_RESERVOIR_STRIPE_COUNT("cassandra.dehr_stripe_count", "2"), DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES("default.provide.overlapping.tombstones"), /** determinism properties for testing */ diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d161fb315d2..5bc4171187d 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -436,14 +436,26 @@ public ReadCommand copyAsDigestQuery(Iterable replicas) public abstract boolean isReversed(); public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi) + { + return createResponse(iterator, rdi, false); + } + public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi, boolean localRead) { // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both // ends equal, and there are no dangling RT bound in any partition. iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true); + if (isDigestQuery()) + return ReadResponse.createDigestResponse(iterator, this); - return isDigestQuery() - ? ReadResponse.createDigestResponse(iterator, this) - : ReadResponse.createDataResponse(iterator, this, rdi); + if (localRead && ReadResponse.IN_MEMORY_MAX_ROWS > 0) + return ReadResponse.createInMemoryDataResponse(iterator, this, rdi); + + return ReadResponse.createDataResponse(iterator, this, rdi); + } + + public ReadResponse createLocalObjectResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi) + { + return createResponse(iterator, rdi, true); } public ReadResponse createEmptyResponse() diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 7edd743ad41..5d1cdeb3447 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -21,16 +21,23 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; @@ -49,6 +56,8 @@ public abstract class ReadResponse // Serializer for single partition read response public static final IVersionedSerializer serializer = new Serializer(); + static final int IN_MEMORY_MAX_ROWS = CassandraRelevantProperties.DATA_RESPONSE_IN_MEMORY_MAX_ROWS.getInt(); + protected ReadResponse() { } @@ -63,9 +72,15 @@ public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, return new LocalDataResponse(data, command, NO_OP_REPAIRED_DATA_INFO); } - public static ReadResponse createSimpleDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) + static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi) + { + return createInMemoryDataResponse(data, command, rdi, IN_MEMORY_MAX_ROWS); + } + + @VisibleForTesting + static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi, int maxRows) { - return new LocalDataResponse(data, selection); + return new InMemoryDataResponse(data, command, rdi, maxRows); } @VisibleForTesting @@ -229,6 +244,9 @@ private static class LocalDataResponse extends DataResponse { // Exponential moving average of response sizes, used to set initial size of output buffer. private static final MovingAverage estimatedResponseBytes = ExpMovingAverage.decayBy1000(); + + // Exponential moving average of overflow(!) sizes, used to set initial size of output buffer. + private static final MovingAverage estimatedOverflowBytes = ExpMovingAverage.decayBy1000(); private static final int bufferInitialSizeMin = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN.getInt(); private static final int bufferInitialSizeMax = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX.getInt(); @@ -240,9 +258,28 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, DeserializationHelper.Flag.LOCAL); } - private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selection) + private static ByteBuffer buildOverflow(UnfilteredRowIterator rowIter, ColumnFilter selection) { - super(build(iter, selection), null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL); + int initialBufferSize = bufferInitialSizeMin; + + if (bufferInitialSizeMax > bufferInitialSizeMin) + { + double estimatedOverflowSize = estimatedOverflowBytes.get(); + double bufferSizeEstimate = Double.isNaN(estimatedOverflowSize) ? bufferInitialSizeMin : estimatedOverflowSize * 1.1; + initialBufferSize = Math.min((int) bufferSizeEstimate, bufferInitialSizeMax); + } + + try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize)) + { + // NOTE: we use UnfilteredRowIteratorSerializer here, not UnfilteredPartitionIterators + UnfilteredRowIteratorSerializer.serializer.serialize(rowIter, selection, buffer, MessagingService.current_version); + estimatedOverflowBytes.update(buffer.position()); + return buffer.buffer(false); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) @@ -288,8 +325,8 @@ static abstract class DataResponse extends ReadResponse // TODO: can the digest be calculated over the raw bytes now? // The response, serialized in the current messaging version private final ByteBuffer data; - private final ByteBuffer repairedDataDigest; - private final boolean isRepairedDigestConclusive; + protected ByteBuffer repairedDataDigest; + protected boolean isRepairedDigestConclusive; private final int dataSerializationVersion; private final DeserializationHelper.Flag flag; @@ -354,6 +391,159 @@ public boolean isDigestResponse() { return false; } + + protected ByteBuffer getSerializedData() + { + return data; + } + } + + // built on the coordinator for local single-partition reads; never sent over the network + private static class InMemoryDataResponse extends DataResponse + { + // rows up to the max-rows limit stored as an in-memory partition; null if the response was empty + private final ImmutableBTreePartition partition; + // rows beyond the max-rows limit serialized to a buffer; null if all rows fit in memory + private final ByteBuffer overflow; + + private InMemoryDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi, int maxRows) + { + // pass fake values for rdi.getDigest(), rdi.isConclusive(), they will be calculated and set later + super(null, null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL); + + if (!iter.hasNext()) + { + this.partition = null; + this.overflow = null; + } + else + { + try (UnfilteredRowIterator rowIter = iter.next()) + { + LimitedUnfilteredRowIterator limitedIter = new LimitedUnfilteredRowIterator(rowIter, maxRows); + this.partition = ImmutableBTreePartition.create(limitedIter); + // Uses buildOverflow (row-iterator level) to match the UnfilteredRowIteratorSerializer + // deserializer used in makeIterator. + this.overflow = limitedIter.hasOverflow() + ? LocalDataResponse.buildOverflow(rowIter, command.columnFilter()) + : null; + } + } + + // Capture digest after consuming the iterator so that any updates made by + // RepairedDataInfo transformations are reflected in the digest. + this.repairedDataDigest = rdi.getDigest(); + this.isRepairedDigestConclusive = rdi.isConclusive(); + } + + // Decorating iterator that stops after maxRows Unfiltered objects and records whether overflow occurred. + // Does NOT close the wrapped iterator on close() so the caller can continue reading overflow rows. + private static class LimitedUnfilteredRowIterator extends AbstractUnfilteredRowIterator + { + private final UnfilteredRowIterator wrapped; + private final int maxRows; + private int rowCount = 0; + private boolean hasOverflow = false; + + private LimitedUnfilteredRowIterator(UnfilteredRowIterator wrapped, int maxRows) + { + super(wrapped.metadata(), + wrapped.partitionKey(), + wrapped.partitionLevelDeletion(), + wrapped.columns(), + wrapped.staticRow(), + wrapped.isReverseOrder(), + wrapped.stats()); + this.wrapped = wrapped; + this.maxRows = maxRows; + } + + boolean hasOverflow() + { + return hasOverflow; + } + + @Override + protected Unfiltered computeNext() + { + if (rowCount >= maxRows) + { + hasOverflow = wrapped.hasNext(); + return endOfData(); + } + if (!wrapped.hasNext()) + return endOfData(); + rowCount++; + return wrapped.next(); + } + } + + @Override + public UnfilteredPartitionIterator makeIterator(ReadCommand command) + { + if (partition == null) + return EmptyIterators.unfilteredPartition(command.metadata()); + + UnfilteredRowIterator inMemoryIter = partition.unfilteredIterator(command.columnFilter(), Slices.ALL, command.isReversed()); + + if (overflow == null) + return new SingletonUnfilteredPartitionIterator(command.metadata(), inMemoryIter); + + // Deserialize the overflow and concatenate with the in-memory part. + // Buffer closing is not needed and actually is incorrect + // because the result iterator uses the buffer context + DataInputBuffer in = new DataInputBuffer(overflow, true); + try + { + UnfilteredRowIterator overflowIter = UnfilteredRowIteratorSerializer.serializer.deserialize( + in, MessagingService.current_version, command.metadata(), command.columnFilter(), DeserializationHelper.Flag.LOCAL); + UnfilteredRowIterator combined = UnfilteredRowIterators.concat(inMemoryIter, overflowIter); + return new SingletonUnfilteredPartitionIterator(command.metadata(), combined); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + protected ByteBuffer getSerializedData() + { + throw new UnsupportedOperationException("InMemoryDataResponse cannot be serialized over the network"); + } + } + + private static class SingletonUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator + { + private final TableMetadata metadata; + private final UnfilteredRowIterator partition; + private boolean returned = false; + + private SingletonUnfilteredPartitionIterator(TableMetadata metadata, UnfilteredRowIterator partition) + { + this.metadata = metadata; + this.partition = partition; + } + + public TableMetadata metadata() { return metadata; } + + public boolean hasNext() + { + return !returned; + } + + public UnfilteredRowIterator next() + { + if (returned) throw new NoSuchElementException(); + returned = true; + return partition; + } + + @Override + public void close() + { + partition.close(); + } } private static class Serializer implements IVersionedSerializer @@ -378,7 +568,7 @@ public void serialize(ReadResponse response, DataOutputPlus out, int version) th ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out); out.writeBoolean(response.isRepairedDigestConclusive()); - ByteBuffer data = ((DataResponse)response).data; + ByteBuffer data = ((DataResponse)response).getSerializedData(); ByteBufferUtil.writeWithVIntLength(data, out); } } @@ -418,7 +608,7 @@ public long serializedSize(ReadResponse response, int version) // In theory, we should deserialize/re-serialize if the version asked is different from the current // version as the content could have a different serialization format. So far though, we haven't made // change to partition iterators serialization since 3.0 so we skip this. - ByteBuffer data = ((DataResponse)response).data; + ByteBuffer data = ((DataResponse)response).getSerializedData(); size += ByteBufferUtil.serializedSizeWithVIntLength(data); } return size; diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 5c1ebc9ed5f..c267123b95d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2748,7 +2748,10 @@ protected void runMayThrow() try (ReadExecutionController controller = command.executionController(trackRepairedStatus); UnfilteredPartitionIterator iterator = command.executeLocally(controller)) { - response = command.createResponse(iterator, controller.getRepairedDataInfo()); + if (command.isLimitedToOnePartition() && !command.isDigestQuery()) + response = command.createLocalObjectResponse(iterator, controller.getRepairedDataInfo()); + else + response = command.createResponse(iterator, controller.getRepairedDataInfo()); } catch (RejectException e) { diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java index ebe1cf0322b..99705312abb 100644 --- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.junit.Before; @@ -27,16 +29,25 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.WrappingUnfilteredRowIterator; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -50,6 +61,7 @@ public class ReadResponseTest { private final Random random = new Random(); private TableMetadata metadata; + private TableMetadata metadataWithClustering; @BeforeClass public static void beforeClass() @@ -60,13 +72,20 @@ public static void beforeClass() @Before public void setup() { - metadata = TableMetadata.builder("ks", "t1") .offline() .addPartitionKeyColumn("p", Int32Type.instance) .addRegularColumn("v", Int32Type.instance) .partitioner(Murmur3Partitioner.instance) .build(); + + metadataWithClustering = TableMetadata.builder("ks", "t2") + .offline() + .addPartitionKeyColumn("p", Int32Type.instance) + .addClusteringColumn("c", Int32Type.instance) + .addRegularColumn("v", Int32Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); } @Test @@ -172,6 +191,212 @@ public void makeDigestDoesntConsiderRepairedDataInfo() assertEquals(response1.digest(command1), response2.digest(command2)); } + @Test + public void inMemoryResponseEmptyIteratorMatchesLocalDataResponse() + { + ReadCommand command = command(key(), metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + ReadResponse localResponse = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + ReadResponse inMemoryResponse = command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test + public void inMemoryResponseWithRowsMatchesLocalDataResponse() + { + int key = key(); + ReadCommand command = command(key, metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + DecoratedKey dk = metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)); + Row row = buildRow(metadata, dk); + PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, row); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = command.createLocalObjectResponse(singlePartitionIterator(update), rdi); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test(expected = UnsupportedOperationException.class) + public void inMemoryResponseCannotBeSerialized() + { + ReadCommand command = command(key(), metadata); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + ReadResponse response = command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata), rdi); + + try (DataOutputBuffer out = new DataOutputBuffer()) + { + ReadResponse.serializer.serialize(response, out, MessagingService.current_version); + } + catch (IOException e) + { + fail("Unexpected IOException: " + e.getMessage()); + } + } + + @Test + public void inMemoryResponseWithOverflowMatchesLocalDataResponse() + { + // 5 rows, only 2 fit in memory — 3 overflow into the serialized buffer + testMultipleRows(2, 5); + } + + @Test + public void inMemoryResponseAllRowsInMemoryWhenUnderLimit() + { + // 3 rows, limit is 10 — all fit in memory with no overflow + testMultipleRows(10, 3); + } + + @Test + public void inMemoryResponseWithZeroMaxRowsUsesOnlyOverflow() + { + // all rows go directly into the overflow buffer + testMultipleRows(0, 3); + } + + private void testMultipleRows(int maxRows, int rows) + { + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + PartitionUpdate update = buildMultiRowUpdate(metadataWithClustering, key, rows); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = ReadResponse.createInMemoryDataResponse(singlePartitionIterator(update), command, rdi, maxRows); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + @Test + public void inMemoryResponseCapturesRepairedDigestAfterIteratorIsConsumed() + { + // RepairedDataInfo updates its digest lazily as rows are consumed via withRepairedDataInfo + // transformations; this test uses a stub that simulates the same timing. + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + PartitionUpdate update = buildMultiRowUpdate(metadataWithClustering, key, 3); + + ByteBuffer expectedDigest = digest(); + // Returns EMPTY before any iteration; returns expectedDigest once the iterator has been consumed. + LazyRepairedDataInfo rdi = new LazyRepairedDataInfo(expectedDigest); + + ReadResponse response = ReadResponse.createInMemoryDataResponse(lazyRdiWrappedIterator(update, rdi), command, rdi, 10); + + assertTrue("digest should be non-empty after iterator was consumed", rdi.wasConsumedBeforeDigestCaptured()); + assertEquals(expectedDigest, response.repairedDataDigest()); + assertTrue(response.isRepairedDigestConclusive()); + } + + @Test + public void inMemoryResponseWithRangeTombstoneOnlyAllInMemory() + { + // Partition contains only a range tombstone (no rows); limit is large enough that nothing overflows + testWithTombstones(10, 0, 0, 5); + } + + @Test + public void inMemoryResponseWithRangeTombstoneOnlyOverflow() + { + // 5 rows at clusterings 0-4 plus a range tombstone covering [6, 9]; + // rows 0-1 go in-memory, rows 2-4 and the RT go to overflow + testWithTombstones(2, 5, 6, 9); + } + + @Test + public void inMemoryResponseWithRangeTombstoneBetweenInMemoryAndOverflow() + { + // RT at [1, 2] sits between the in-memory rows (0) and the overflow rows (3-4) + testWithTombstones(1, 5, 1, 2); + } + + private void testWithTombstones(int maxRows, int rows, int rtStart, int rtEnd) + { + int key = key(); + ReadCommand command = command(key, metadataWithClustering); + StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + PartitionUpdate update = buildUpdateWithRowsAndRangeTombstone(metadataWithClustering, key, rows, rtStart, rtEnd); + + ReadResponse localResponse = command.createResponse(singlePartitionIterator(update), rdi); + ReadResponse inMemoryResponse = ReadResponse.createInMemoryDataResponse(singlePartitionIterator(update), command, rdi, maxRows); + + assertIteratorsEqual(command, localResponse, inMemoryResponse); + } + + + private PartitionUpdate buildUpdateWithRowsAndRangeTombstone(TableMetadata metadata, int partitionKey, + int rowCount, int rtStart, int rtEnd) + { + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, ByteBufferUtil.bytes(partitionKey)).timestamp(0); + for (int i = 0; i < rowCount; i++) + builder.row(i).add("v", i); + builder.addRangeTombstone().start(rtStart).end(rtEnd); + return builder.build(); + } + + private void assertIteratorsEqual(ReadCommand command, ReadResponse expected, ReadResponse actual) + { + List expectedUnfiltered = collectUnfiltered(command, expected); + List actualUnfiltered = collectUnfiltered(command, actual); + assertEquals(expectedUnfiltered, actualUnfiltered); + } + + private List collectUnfiltered(ReadCommand command, ReadResponse response) + { + List result = new ArrayList<>(); + try (UnfilteredPartitionIterator iter = response.makeIterator(command)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + while (partition.hasNext()) + result.add(partition.next().toString(partition.metadata(), true)); + } + } + } + return result; + } + + private Row buildRow(TableMetadata metadata, DecoratedKey key) + { + ColumnMetadata col = metadata.getColumn(ByteBufferUtil.bytes("v")); + Clustering clustering = Clustering.EMPTY; + return BTreeRow.singleCellRow(clustering, BufferCell.live(col, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(42))); + } + + private PartitionUpdate buildMultiRowUpdate(TableMetadata metadata, int partitionKey, int rowCount) + { + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, ByteBufferUtil.bytes(partitionKey)).timestamp(0); + for (int i = 0; i < rowCount; i++) + builder.row(i).add("v", i); + return builder.build(); + } + + private UnfilteredPartitionIterator singlePartitionIterator(PartitionUpdate update) + { + UnfilteredRowIterator rowIter = update.unfilteredIterator(); + return new AbstractUnfilteredPartitionIterator() + { + private boolean returned = false; + + public TableMetadata metadata() { return update.metadata(); } + + public boolean hasNext() { return !returned; } + + public UnfilteredRowIterator next() + { + returned = true; + return rowIter; + } + }; + } + private void verifySerDe(ReadResponse response) { // check that roundtripping through ReadResponse.serializer behaves as expected for (MessagingService.Version version : MessagingService.Version.supportedVersions()) @@ -222,6 +447,11 @@ private ReadCommand command(int key, TableMetadata metadata) return new StubReadCommand(key, metadata, false); } + private ReadCommand command(int key) + { + return command(key, metadata); + } + private static class StubRepairedDataInfo extends RepairedDataInfo { private final ByteBuffer repairedDigest; @@ -247,6 +477,81 @@ public boolean isConclusive() } } + /** + * Simulates a RepairedDataInfo that updates its digest lazily as rows are consumed. + * Returns EMPTY_BYTE_BUFFER until the partition iterator wrapping it is fully consumed, + * at which point getDigest() returns the provided expected digest. + * This lets us verify that InMemoryDataResponse captures the digest *after* consumption. + */ + private static class LazyRepairedDataInfo extends RepairedDataInfo + { + private final ByteBuffer finalDigest; + private boolean iteratorConsumed = false; + private boolean digestCapturedAfterConsumption = false; + + LazyRepairedDataInfo(ByteBuffer finalDigest) + { + super(null); + this.finalDigest = finalDigest; + } + + void markConsumed() + { + iteratorConsumed = true; + } + + boolean wasConsumedBeforeDigestCaptured() + { + return digestCapturedAfterConsumption; + } + + @Override + public ByteBuffer getDigest() + { + if (iteratorConsumed) + digestCapturedAfterConsumption = true; + return iteratorConsumed ? finalDigest : ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + + @Override + public boolean isConclusive() + { + return true; + } + } + + private UnfilteredPartitionIterator lazyRdiWrappedIterator(PartitionUpdate update, LazyRepairedDataInfo rdi) + { + // Wraps the row iterator so that close() marks rdi as consumed, simulating RepairedDataInfo + // updating its digest lazily after the partition has been fully read. + UnfilteredRowIterator baseRowIter = update.unfilteredIterator(); + UnfilteredRowIterator wrappedRowIter = new WrappingUnfilteredRowIterator() + { + public UnfilteredRowIterator wrapped() { return baseRowIter; } + + @Override + public void close() + { + rdi.markConsumed(); + baseRowIter.close(); + } + }; + return new AbstractUnfilteredPartitionIterator() + { + private boolean returned = false; + + public TableMetadata metadata() { return update.metadata(); } + + public boolean hasNext() { return !returned; } + + public UnfilteredRowIterator next() + { + returned = true; + return wrappedRowIter; + } + }; + } + private static class StubReadCommand extends SinglePartitionReadCommand { StubReadCommand(int key, TableMetadata metadata, boolean isDigest) @@ -262,11 +567,11 @@ private static class StubReadCommand extends SinglePartitionReadCommand RowFilter.none(), DataLimits.NONE, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), - null, + new ClusteringIndexSliceFilter(Slices.ALL, false), null, false, null); - + } @Override