Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
Expand Down Expand Up @@ -323,7 +322,7 @@ protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions writeOption
}
}

public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix, BiConsumer<byte[], byte[]> callback)
public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix, IteratorCallback callback)
throws RocksDBException {

if (ArrayUtils.isEmpty(prefix)) {
Expand All @@ -334,7 +333,7 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix,
}

public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
final byte[] start, final byte[] end, BiConsumer<byte[], byte[]> callback) throws RocksDBException {
final byte[] start, final byte[] end, IteratorCallback callback) throws RocksDBException {

if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
throw new RocksDBException(
Expand All @@ -358,7 +357,11 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
try {
readOptions = new ReadOptions();
readOptions.setTotalOrderSeek(true);
readOptions.setReadaheadSize(4L * 1024 * 1024);
// Preserve 4MB readahead for scans without an upper bound, while avoiding over-prefetch for bounded ranges.
if (ArrayUtils.isEmpty(end)) {
readOptions.setReadaheadSize(4L * 1024 * 1024);
}

boolean hasStart = !ArrayUtils.isEmpty(start);
boolean hasPrefix = !ArrayUtils.isEmpty(prefix);

Expand Down Expand Up @@ -389,9 +392,13 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
if (hasPrefix && !checkPrefix(key, prefix)) {
break;
}
callback.accept(iterator.key(), iterator.value());
byte[] value = iterator.value();
if (!callback.accept(key, value)) {
break;
}
iterator.next();
}
iterator.status();
} finally {
if (startSlice != null) {
startSlice.close();
Expand Down Expand Up @@ -761,4 +768,9 @@ void recursiveDelete(File file) {
file.delete();
}
}

public interface IteratorCallback {
boolean accept(byte[] key, byte[] value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public Set<Integer> scanAllQueueIdInTopic(String topic) throws RocksDBException
keyBuffer.position(prefix.length);
int queueId = keyBuffer.getInt();
queueIdSet.add(queueId);
return true;
});
return queueIdSet;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.rocketmq.common.BoundaryType;
Expand Down Expand Up @@ -123,44 +125,28 @@ public ByteBuffer getCQInKV(final String topic, final int queueId, final long cq
}

public List<ByteBuffer> rangeQuery(final String topic, final int queueId, final long startIndex, final int num) throws RocksDBException {
final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
final List<ColumnFamilyHandle> defaultCFHList = new ArrayList<>(num);
final ByteBuffer[] resultList = new ByteBuffer[num];
final List<Integer> kvIndexList = new ArrayList<>(num);
final List<byte[]> kvKeyList = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
final ByteBuffer keyBB = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + i);
kvIndexList.add(i);
kvKeyList.add(keyBB.array());
defaultCFHList.add(this.defaultCFH);
if (startIndex < 0 || num <= 0) {
return Collections.emptyList();
}
int keyNum = kvIndexList.size();
if (keyNum > 0) {
List<byte[]> kvValueList = this.rocksDBStorage.multiGet(defaultCFHList, kvKeyList);
final int valueNum = kvValueList.size();
if (keyNum != valueNum) {
throw new RocksDBException("rocksdb bug, multiGet");
final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
final byte[] startKeyBytes = buildCQKeyByteBuffer(topicBytes, queueId, startIndex).array();
final byte[] endKeyBytes = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + num).array();
final byte[] prefix = Arrays.copyOf(startKeyBytes, startKeyBytes.length - Long.BYTES);
List<ByteBuffer> results = new ArrayList<>();
this.rocksDBStorage.iterate(this.defaultCFH, prefix, startKeyBytes, endKeyBytes, (key, value) -> {
if (key.length < Long.BYTES || value == null || value.length != CQ_UNIT_SIZE) {
return false;
}
for (int i = 0; i < valueNum; i++) {
byte[] value = kvValueList.get(i);
if (value == null) {
continue;
}
ByteBuffer byteBuffer = ByteBuffer.wrap(value);
resultList[kvIndexList.get(i)] = byteBuffer;
long expectedOffset = startIndex + results.size();
long currentOffset = ByteBuffer.wrap(key).getLong(key.length - Long.BYTES);
// ConsumeQueue offsets are expected to be continuous.
if (currentOffset != expectedOffset) {
return false;
}
}

final int resultSize = resultList.length;
List<ByteBuffer> bbValueList = new ArrayList<>(resultSize);
for (int i = 0; i < resultSize; i++) {
ByteBuffer byteBuffer = resultList[i];
if (byteBuffer == null) {
break;
}
bbValueList.add(byteBuffer);
}
return bbValueList;
results.add(ByteBuffer.wrap(value));
return true;
});
return results;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.rocketmq.store.queue;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -133,6 +136,56 @@ public void testPutMessagePositionInfoWrapper_basic() throws Exception {
);
}

@Test
public void testRangeQueryReturnsContinuousOffsets() throws Exception {
RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
String topic = "test-topic-" + UUID.randomUUID();
int queueId = 0;
int msgNum = 20;
int msgSize = 100;
List<DispatchRequest> requests = new ArrayList<>(msgNum);
for (int i = 0; i < msgNum; i++) {
requests.add(new DispatchRequest(topic, queueId, i, (long) i * msgSize, msgSize, i));
}

store.putMessagePosition(requests);

List<ByteBuffer> buffers = store.rangeQuery(topic, queueId, 0, msgNum);
assertEquals(msgNum, buffers.size());
for (int i = 0; i < msgNum; i++) {
ByteBuffer buffer = buffers.get(i);
assertEquals((long) i * msgSize, buffer.getLong());
assertEquals(msgSize, buffer.getInt());
assertEquals(i, buffer.getLong());
}
}

@Test
public void testRangeQueryStopsAtMissingOffset() throws Exception {
RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
String topic = "test-topic-" + UUID.randomUUID();
int queueId = 0;
int msgSize = 100;
List<DispatchRequest> requests = new ArrayList<>(20);
for (int i = 0; i <= 20; i++) {
if (i == 2) {
continue;
}
requests.add(new DispatchRequest(topic, queueId, i, (long) i * msgSize, msgSize, i));
}

store.putMessagePosition(requests);

List<ByteBuffer> buffers = store.rangeQuery(topic, queueId, 0, 20);
assertEquals(2, buffers.size());
for (int i = 0; i < buffers.size(); i++) {
ByteBuffer buffer = buffers.get(i);
assertEquals((long) i * msgSize, buffer.getLong());
assertEquals(msgSize, buffer.getInt());
assertEquals(i, buffer.getLong());
}
}

@Test
public void testPutMessagePositionInfoWrapper_lmq() throws Exception {
String topic = "test-topic-" + UUID.randomUUID();
Expand Down
Loading