Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public enum CassandraRelevantProperties
DATA_OUTPUT_STREAM_PLUS_TEMP_BUFFER_SIZE("cassandra.data_output_stream_plus_temp_buffer_size", "8192"),
DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX("cassandra.data_response_buffer_initial_size_max", "4096"),
DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN("cassandra.data_response_buffer_initial_size_min", "128"),
DATA_RESPONSE_IN_MEMORY_MAX_ROWS("cassandra.data_response_in_memory_max_rows", "128"),
DECAYING_ESTIMATED_HISTOGRAM_RESERVOIR_STRIPE_COUNT("cassandra.dehr_stripe_count", "2"),
DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES("default.provide.overlapping.tombstones"),
/** determinism properties for testing */
Expand Down
18 changes: 15 additions & 3 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,26 @@ public ReadCommand copyAsDigestQuery(Iterable<Replica> replicas)
public abstract boolean isReversed();

public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi)
{
return createResponse(iterator, rdi, false);
}
public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi, boolean localRead)
{
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
// ends equal, and there are no dangling RT bound in any partition.
iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true);
if (isDigestQuery())
return ReadResponse.createDigestResponse(iterator, this);

return isDigestQuery()
? ReadResponse.createDigestResponse(iterator, this)
: ReadResponse.createDataResponse(iterator, this, rdi);
if (localRead && ReadResponse.IN_MEMORY_MAX_ROWS > 0)
return ReadResponse.createInMemoryDataResponse(iterator, this, rdi);

return ReadResponse.createDataResponse(iterator, this, rdi);
}

public ReadResponse createLocalObjectResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi)
{
return createResponse(iterator, rdi, true);
}

public ReadResponse createEmptyResponse()
Expand Down
206 changes: 198 additions & 8 deletions src/java/org/apache/cassandra/db/ReadResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
Expand All @@ -49,6 +56,8 @@ public abstract class ReadResponse
// Serializer for single partition read response
public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();

static final int IN_MEMORY_MAX_ROWS = CassandraRelevantProperties.DATA_RESPONSE_IN_MEMORY_MAX_ROWS.getInt();

protected ReadResponse()
{
}
Expand All @@ -63,9 +72,15 @@ public static ReadResponse createDataResponse(UnfilteredPartitionIterator data,
return new LocalDataResponse(data, command, NO_OP_REPAIRED_DATA_INFO);
}

public static ReadResponse createSimpleDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi)
{
return createInMemoryDataResponse(data, command, rdi, IN_MEMORY_MAX_ROWS);
}

@VisibleForTesting
static ReadResponse createInMemoryDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi, int maxRows)
{
return new LocalDataResponse(data, selection);
return new InMemoryDataResponse(data, command, rdi, maxRows);
}

@VisibleForTesting
Expand Down Expand Up @@ -229,6 +244,9 @@ private static class LocalDataResponse extends DataResponse
{
// Exponential moving average of response sizes, used to set initial size of output buffer.
private static final MovingAverage estimatedResponseBytes = ExpMovingAverage.decayBy1000();

// Exponential moving average of overflow(!) sizes, used to set initial size of output buffer.
private static final MovingAverage estimatedOverflowBytes = ExpMovingAverage.decayBy1000();
private static final int bufferInitialSizeMin = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MIN.getInt();
private static final int bufferInitialSizeMax = CassandraRelevantProperties.DATA_RESPONSE_BUFFER_INITIAL_SIZE_MAX.getInt();

Expand All @@ -240,9 +258,28 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command,
DeserializationHelper.Flag.LOCAL);
}

private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selection)
private static ByteBuffer buildOverflow(UnfilteredRowIterator rowIter, ColumnFilter selection)
{
super(build(iter, selection), null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL);
int initialBufferSize = bufferInitialSizeMin;

if (bufferInitialSizeMax > bufferInitialSizeMin)
{
double estimatedOverflowSize = estimatedOverflowBytes.get();
double bufferSizeEstimate = Double.isNaN(estimatedOverflowSize) ? bufferInitialSizeMin : estimatedOverflowSize * 1.1;
initialBufferSize = Math.min((int) bufferSizeEstimate, bufferInitialSizeMax);
}

try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize))
{
// NOTE: we use UnfilteredRowIteratorSerializer here, not UnfilteredPartitionIterators
UnfilteredRowIteratorSerializer.serializer.serialize(rowIter, selection, buffer, MessagingService.current_version);
estimatedOverflowBytes.update(buffer.position());
return buffer.buffer(false);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}

private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
Expand Down Expand Up @@ -288,8 +325,8 @@ static abstract class DataResponse extends ReadResponse
// TODO: can the digest be calculated over the raw bytes now?
// The response, serialized in the current messaging version
private final ByteBuffer data;
private final ByteBuffer repairedDataDigest;
private final boolean isRepairedDigestConclusive;
protected ByteBuffer repairedDataDigest;
protected boolean isRepairedDigestConclusive;
private final int dataSerializationVersion;
private final DeserializationHelper.Flag flag;

Expand Down Expand Up @@ -354,6 +391,159 @@ public boolean isDigestResponse()
{
return false;
}

protected ByteBuffer getSerializedData()
{
return data;
}
}

// built on the coordinator for local single-partition reads; never sent over the network
private static class InMemoryDataResponse extends DataResponse
{
// rows up to the max-rows limit stored as an in-memory partition; null if the response was empty
private final ImmutableBTreePartition partition;
// rows beyond the max-rows limit serialized to a buffer; null if all rows fit in memory
private final ByteBuffer overflow;

private InMemoryDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi, int maxRows)
{
// pass fake values for rdi.getDigest(), rdi.isConclusive(), they will be calculated and set later
super(null, null, false, MessagingService.current_version, DeserializationHelper.Flag.LOCAL);

if (!iter.hasNext())
{
this.partition = null;
this.overflow = null;
}
else
{
try (UnfilteredRowIterator rowIter = iter.next())
{
LimitedUnfilteredRowIterator limitedIter = new LimitedUnfilteredRowIterator(rowIter, maxRows);
this.partition = ImmutableBTreePartition.create(limitedIter);
// Uses buildOverflow (row-iterator level) to match the UnfilteredRowIteratorSerializer
// deserializer used in makeIterator.
this.overflow = limitedIter.hasOverflow()
? LocalDataResponse.buildOverflow(rowIter, command.columnFilter())
: null;
}
}

// Capture digest after consuming the iterator so that any updates made by
// RepairedDataInfo transformations are reflected in the digest.
this.repairedDataDigest = rdi.getDigest();
this.isRepairedDigestConclusive = rdi.isConclusive();
}

// Decorating iterator that stops after maxRows Unfiltered objects and records whether overflow occurred.
// Does NOT close the wrapped iterator on close() so the caller can continue reading overflow rows.
private static class LimitedUnfilteredRowIterator extends AbstractUnfilteredRowIterator
{
private final UnfilteredRowIterator wrapped;
private final int maxRows;
private int rowCount = 0;
private boolean hasOverflow = false;

private LimitedUnfilteredRowIterator(UnfilteredRowIterator wrapped, int maxRows)
{
super(wrapped.metadata(),
wrapped.partitionKey(),
wrapped.partitionLevelDeletion(),
wrapped.columns(),
wrapped.staticRow(),
wrapped.isReverseOrder(),
wrapped.stats());
this.wrapped = wrapped;
this.maxRows = maxRows;
}

boolean hasOverflow()
{
return hasOverflow;
}

@Override
protected Unfiltered computeNext()
{
if (rowCount >= maxRows)
{
hasOverflow = wrapped.hasNext();
return endOfData();
}
if (!wrapped.hasNext())
return endOfData();
rowCount++;
return wrapped.next();
}
}

@Override
public UnfilteredPartitionIterator makeIterator(ReadCommand command)
{
if (partition == null)
return EmptyIterators.unfilteredPartition(command.metadata());

UnfilteredRowIterator inMemoryIter = partition.unfilteredIterator(command.columnFilter(), Slices.ALL, command.isReversed());

if (overflow == null)
return new SingletonUnfilteredPartitionIterator(command.metadata(), inMemoryIter);

// Deserialize the overflow and concatenate with the in-memory part.
// Buffer closing is not needed and actually is incorrect
// because the result iterator uses the buffer context
DataInputBuffer in = new DataInputBuffer(overflow, true);
try
{
UnfilteredRowIterator overflowIter = UnfilteredRowIteratorSerializer.serializer.deserialize(
in, MessagingService.current_version, command.metadata(), command.columnFilter(), DeserializationHelper.Flag.LOCAL);
UnfilteredRowIterator combined = UnfilteredRowIterators.concat(inMemoryIter, overflowIter);
return new SingletonUnfilteredPartitionIterator(command.metadata(), combined);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}

@Override
protected ByteBuffer getSerializedData()
{
throw new UnsupportedOperationException("InMemoryDataResponse cannot be serialized over the network");
}
}

private static class SingletonUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
{
private final TableMetadata metadata;
private final UnfilteredRowIterator partition;
private boolean returned = false;

private SingletonUnfilteredPartitionIterator(TableMetadata metadata, UnfilteredRowIterator partition)
{
this.metadata = metadata;
this.partition = partition;
}

public TableMetadata metadata() { return metadata; }

public boolean hasNext()
{
return !returned;
}

public UnfilteredRowIterator next()
{
if (returned) throw new NoSuchElementException();
returned = true;
return partition;
}

@Override
public void close()
{
partition.close();
}
}

private static class Serializer implements IVersionedSerializer<ReadResponse>
Expand All @@ -378,7 +568,7 @@ public void serialize(ReadResponse response, DataOutputPlus out, int version) th
ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out);
out.writeBoolean(response.isRepairedDigestConclusive());

ByteBuffer data = ((DataResponse)response).data;
ByteBuffer data = ((DataResponse)response).getSerializedData();
ByteBufferUtil.writeWithVIntLength(data, out);
}
}
Expand Down Expand Up @@ -418,7 +608,7 @@ public long serializedSize(ReadResponse response, int version)
// In theory, we should deserialize/re-serialize if the version asked is different from the current
// version as the content could have a different serialization format. So far though, we haven't made
// change to partition iterators serialization since 3.0 so we skip this.
ByteBuffer data = ((DataResponse)response).data;
ByteBuffer data = ((DataResponse)response).getSerializedData();
size += ByteBufferUtil.serializedSizeWithVIntLength(data);
}
return size;
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,10 @@ protected void runMayThrow()
try (ReadExecutionController controller = command.executionController(trackRepairedStatus);
UnfilteredPartitionIterator iterator = command.executeLocally(controller))
{
response = command.createResponse(iterator, controller.getRepairedDataInfo());
if (command.isLimitedToOnePartition() && !command.isDigestQuery())
response = command.createLocalObjectResponse(iterator, controller.getRepairedDataInfo());
else
response = command.createResponse(iterator, controller.getRepairedDataInfo());
}
catch (RejectException e)
{
Expand Down
Loading