diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 4875ce43e22..b2015a8e3b0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -29,7 +29,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import org.apache.commons.lang3.ArrayUtils; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -323,7 +322,7 @@ protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions writeOption } } - public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix, BiConsumer callback) + public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix, IteratorCallback callback) throws RocksDBException { if (ArrayUtils.isEmpty(prefix)) { @@ -334,7 +333,7 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix, } public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix, - final byte[] start, final byte[] end, BiConsumer callback) throws RocksDBException { + final byte[] start, final byte[] end, IteratorCallback callback) throws RocksDBException { if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) { throw new RocksDBException( @@ -358,7 +357,11 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix, try { readOptions = new ReadOptions(); readOptions.setTotalOrderSeek(true); - readOptions.setReadaheadSize(4L * 1024 * 1024); + // Preserve 4MB readahead for scans without an upper bound, while avoiding over-prefetch for bounded ranges. + if (ArrayUtils.isEmpty(end)) { + readOptions.setReadaheadSize(4L * 1024 * 1024); + } + boolean hasStart = !ArrayUtils.isEmpty(start); boolean hasPrefix = !ArrayUtils.isEmpty(prefix); @@ -389,9 +392,13 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix, if (hasPrefix && !checkPrefix(key, prefix)) { break; } - callback.accept(iterator.key(), iterator.value()); + byte[] value = iterator.value(); + if (!callback.accept(key, value)) { + break; + } iterator.next(); } + iterator.status(); } finally { if (startSlice != null) { startSlice.close(); @@ -761,4 +768,9 @@ void recursiveDelete(File file) { file.delete(); } } + + public interface IteratorCallback { + boolean accept(byte[] key, byte[] value); + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java index dc3712663c7..8dd161fbd8b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java @@ -161,6 +161,7 @@ public Set scanAllQueueIdInTopic(String topic) throws RocksDBException keyBuffer.position(prefix.length); int queueId = keyBuffer.getInt(); queueIdSet.add(queueId); + return true; }); return queueIdSet; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java index deee0295706..f8fc00da063 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java @@ -19,6 +19,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.rocketmq.common.BoundaryType; @@ -123,44 +125,28 @@ public ByteBuffer getCQInKV(final String topic, final int queueId, final long cq } public List rangeQuery(final String topic, final int queueId, final long startIndex, final int num) throws RocksDBException { - final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8); - final List defaultCFHList = new ArrayList<>(num); - final ByteBuffer[] resultList = new ByteBuffer[num]; - final List kvIndexList = new ArrayList<>(num); - final List kvKeyList = new ArrayList<>(num); - for (int i = 0; i < num; i++) { - final ByteBuffer keyBB = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + i); - kvIndexList.add(i); - kvKeyList.add(keyBB.array()); - defaultCFHList.add(this.defaultCFH); + if (startIndex < 0 || num <= 0) { + return Collections.emptyList(); } - int keyNum = kvIndexList.size(); - if (keyNum > 0) { - List kvValueList = this.rocksDBStorage.multiGet(defaultCFHList, kvKeyList); - final int valueNum = kvValueList.size(); - if (keyNum != valueNum) { - throw new RocksDBException("rocksdb bug, multiGet"); + final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8); + final byte[] startKeyBytes = buildCQKeyByteBuffer(topicBytes, queueId, startIndex).array(); + final byte[] endKeyBytes = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + num).array(); + final byte[] prefix = Arrays.copyOf(startKeyBytes, startKeyBytes.length - Long.BYTES); + List results = new ArrayList<>(); + this.rocksDBStorage.iterate(this.defaultCFH, prefix, startKeyBytes, endKeyBytes, (key, value) -> { + if (key.length < Long.BYTES || value == null || value.length != CQ_UNIT_SIZE) { + return false; } - for (int i = 0; i < valueNum; i++) { - byte[] value = kvValueList.get(i); - if (value == null) { - continue; - } - ByteBuffer byteBuffer = ByteBuffer.wrap(value); - resultList[kvIndexList.get(i)] = byteBuffer; + long expectedOffset = startIndex + results.size(); + long currentOffset = ByteBuffer.wrap(key).getLong(key.length - Long.BYTES); + // ConsumeQueue offsets are expected to be continuous. + if (currentOffset != expectedOffset) { + return false; } - } - - final int resultSize = resultList.length; - List bbValueList = new ArrayList<>(resultSize); - for (int i = 0; i < resultSize; i++) { - ByteBuffer byteBuffer = resultList[i]; - if (byteBuffer == null) { - break; - } - bbValueList.add(byteBuffer); - } - return bbValueList; + results.add(ByteBuffer.wrap(value)); + return true; + }); + return results; } /** diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java index 9431f7ed048..2aba065ac2e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java @@ -17,7 +17,10 @@ package org.apache.rocketmq.store.queue; import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -133,6 +136,56 @@ public void testPutMessagePositionInfoWrapper_basic() throws Exception { ); } + @Test + public void testRangeQueryReturnsContinuousOffsets() throws Exception { + RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + String topic = "test-topic-" + UUID.randomUUID(); + int queueId = 0; + int msgNum = 20; + int msgSize = 100; + List requests = new ArrayList<>(msgNum); + for (int i = 0; i < msgNum; i++) { + requests.add(new DispatchRequest(topic, queueId, i, (long) i * msgSize, msgSize, i)); + } + + store.putMessagePosition(requests); + + List buffers = store.rangeQuery(topic, queueId, 0, msgNum); + assertEquals(msgNum, buffers.size()); + for (int i = 0; i < msgNum; i++) { + ByteBuffer buffer = buffers.get(i); + assertEquals((long) i * msgSize, buffer.getLong()); + assertEquals(msgSize, buffer.getInt()); + assertEquals(i, buffer.getLong()); + } + } + + @Test + public void testRangeQueryStopsAtMissingOffset() throws Exception { + RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + String topic = "test-topic-" + UUID.randomUUID(); + int queueId = 0; + int msgSize = 100; + List requests = new ArrayList<>(20); + for (int i = 0; i <= 20; i++) { + if (i == 2) { + continue; + } + requests.add(new DispatchRequest(topic, queueId, i, (long) i * msgSize, msgSize, i)); + } + + store.putMessagePosition(requests); + + List buffers = store.rangeQuery(topic, queueId, 0, 20); + assertEquals(2, buffers.size()); + for (int i = 0; i < buffers.size(); i++) { + ByteBuffer buffer = buffers.get(i); + assertEquals((long) i * msgSize, buffer.getLong()); + assertEquals(msgSize, buffer.getInt()); + assertEquals(i, buffer.getLong()); + } + } + @Test public void testPutMessagePositionInfoWrapper_lmq() throws Exception { String topic = "test-topic-" + UUID.randomUUID();