diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0455f0efa8bb6..a83023360076b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -20,6 +20,7 @@ import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -764,6 +765,28 @@ default ManagedLedgerAttributes getManagedLedgerAttributes() { void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx); + /** + * Read entries from the managed ledger starting from the provided position. + * + *
The start position is inclusive when it points to an existing entry. If it points to a non-existing entry, + * the read starts from the next valid entry in ledger order. {@link PositionFactory#EARLIEST} starts from the first + * available entry, while {@link PositionFactory#LATEST} starts from the position after the current last confirmed + * entry. This method does not wait for future writes and will complete with fewer entries, or an empty list, when + * there are not enough currently readable entries. + * + *
The returned entries are raw ledger entries and are not filtered by any cursor acknowledgement state. Callers
+ * are responsible for releasing returned entries.
+ *
+ * @param maxPosition the maximum position to read (inclusive). The read will not return entries beyond this
+ * position. When {@code null}, defaults to no upper bound (equivalent to
+ * {@link PositionFactory#LATEST}).
+ */
+ CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries, Position maxPosition);
+
+ default CompletableFuture
> asyncReadEntries(Position startPosition, int numberOfEntries) {
+ return asyncReadEntries(startPosition, numberOfEntries, PositionFactory.LATEST);
+ }
+
/**
* Get all the managed ledgers.
*/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4a1a3d12ab075..d1015df327ee3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
@@ -2330,6 +2331,49 @@ public void asyncReadEntry(Position position, ReadEntryCallback callback, Object
}
+ @Override
+ public CompletableFuture
> asyncReadEntries(Position startPosition, int numberOfEntries,
+ Position maxPosition) {
+ if (startPosition == null || numberOfEntries <= 0 || maxPosition == null) {
+ return CompletableFuture.failedFuture(new IllegalArgumentException("Invalid parameters"));
+ }
+ startPosition = getStartPosition(startPosition);
+ if (startPosition == null) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ if (maxPosition.compareTo(startPosition) < 0) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ CompletableFuture
> promise = new CompletableFuture<>();
+ OpReadEntries.create(this, startPosition, numberOfEntries, maxPosition, promise).readEntries();
+ return promise;
+ }
+
+ private Position getStartPosition(Position startPosition) {
+ if (PositionFactory.EARLIEST.equals(startPosition)) {
+ if (ledgers.isEmpty()) {
+ return null;
+ }
+ Long firstLedger = ledgers.firstKey();
+ if (firstLedger == null) {
+ return null;
+ }
+ return PositionFactory.create(firstLedger, 0);
+ }
+ if (PositionFactory.LATEST.equals(startPosition)) {
+ Position lastPosition = getLastPosition();
+ return lastPosition == null ? null : lastPosition.getNext();
+ }
+ if (startPosition.compareTo(lastConfirmedEntry) > 0) {
+ return null;
+ }
+ if (isValidPosition(startPosition)) {
+ return startPosition;
+ }
+ return getNextValidPosition(startPosition);
+ }
+
private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
@@ -2451,6 +2495,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry
}
}
+ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ReadEntriesCallback callback,
+ Object ctx) {
+ IntSupplier expectedReadCount = () -> 0;
+ if (config.getReadEntryTimeoutSeconds() > 0) {
+ // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
+ long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+ long createdTime = System.nanoTime();
+ ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
+ callback, readOpCount, createdTime, ctx);
+ lastReadCallback = readCallback;
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount);
+ } else {
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx);
+ }
+ }
+
static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
volatile ReadEntryCallback readEntryCallback;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java
new file mode 100644
index 0000000000000..d2b80c71d2349
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static java.lang.Math.min;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.CustomLog;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
+
+@CustomLog
+class OpReadEntries implements ReadEntriesCallback {
+ ManagedLedgerImpl ledger;
+ Position readPosition;
+ Position maxPosition;
+ private int count;
+ private CompletableFuture
> promise;
+
+ private List
> promise) {
+ OpReadEntries op = new OpReadEntries();
+ op.ledger = ledger;
+ op.readPosition = ledger.startReadOperationOnLedger(readPosition);
+ op.count = count;
+ op.maxPosition = maxPosition;
+ op.promise = promise;
+ op.entries = new ArrayList<>(count);
+ op.nextReadPosition = op.readPosition;
+ return op;
+ }
+
+ void readEntries() {
+ final State state = ManagedLedgerImpl.STATE_UPDATER.get(ledger);
+ if (state.isFenced() || state == State.Closed) {
+ readEntriesFailed(new ManagedLedgerFencedException(), null);
+ return;
+ }
+
+ if (readPosition.compareTo(maxPosition) > 0) {
+ checkReadCompletion();
+ return;
+ }
+
+ long ledgerId = readPosition.getLedgerId();
+ LedgerHandle currentLedger = ledger.currentLedger;
+
+ if (currentLedger != null && ledgerId == currentLedger.getId()) {
+ // Current writing ledger is not in the cache (since we don't want
+ // it to be automatically evicted), and we cannot use 2 different
+ // ledger handles (read & write)for the same ledger.
+ internalReadFromLedger(currentLedger);
+ } else {
+ LedgerInfo ledgerInfo = ledger.ledgers.get(ledgerId);
+ if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
+ updateReadPosition(getNextLedgerPosition(ledgerId));
+ checkReadCompletion();
+ return;
+ }
+
+ ledger.getLedgerHandle(ledgerId).thenAccept(this::internalReadFromLedger)
+ .exceptionally(ex -> {
+ ledger.log.error().attr("position", readPosition).exceptionMessage(ex)
+ .log("Error opening ledger for reading");
+ readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), null);
+ return null;
+ });
+ }
+ }
+
+ private void internalReadFromLedger(ReadHandle readHandle) {
+ long firstEntry = readPosition.getEntryId();
+ long lastEntryInLedger;
+
+ Position lastPosition = ledger.lastConfirmedEntry;
+
+ if (readHandle.getId() == lastPosition.getLedgerId()) {
+ // For the current ledger, we only give read visibility to the last entry we have received a confirmation in
+ // the managed ledger layer
+ lastEntryInLedger = lastPosition.getEntryId();
+ } else {
+ // For other ledgers, already closed the BK lastAddConfirmed is appropriate
+ lastEntryInLedger = readHandle.getLastAddConfirmed();
+ }
+
+ if (readHandle.getId() == maxPosition.getLedgerId()) {
+ lastEntryInLedger = min(maxPosition.getEntryId(), lastEntryInLedger);
+ }
+
+ if (firstEntry > lastEntryInLedger) {
+ log.debug().attr("ledgerId", readHandle.getId())
+ .attr("lastEntry", lastEntryInLedger)
+ .attr("readEntry", firstEntry)
+ .log("No more messages to read from ledger");
+
+ LedgerHandle currentLedger = ledger.currentLedger;
+ if (currentLedger == null || readHandle.getId() != currentLedger.getId()) {
+ updateReadPosition(getNextLedgerPosition(readHandle.getId()));
+ } else {
+ updateReadPosition(readPosition);
+ }
+
+ checkReadCompletion();
+ return;
+ }
+
+ long lastEntry = min(firstEntry + getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+
+ log.debug().attr("ledgerId", readHandle.getId())
+ .attr("firstEntry", firstEntry)
+ .attr("lastEntry", lastEntry)
+ .log("Reading entries from ledger");
+ ledger.asyncReadEntry(readHandle, firstEntry, lastEntry, this, null);
+ }
+
+ private Position getNextLedgerPosition(long ledgerId) {
+ Long nextLedgerId = ledger.ledgers.ceilingKey(ledgerId + 1);
+ return PositionFactory.create(nextLedgerId != null ? nextLedgerId : ledgerId + 1, 0);
+ }
+
+ @Override
+ public void readEntriesComplete(List
> readAfterLast = ledger.asyncReadEntries(p0.getNext(), 10);
+ CompletableFuture
> readLatest = ledger.asyncReadEntries(PositionFactory.LATEST, 10);
+
+ assertEquals(readAfterLast.get(5, TimeUnit.SECONDS), Collections.emptyList());
+ assertEquals(readLatest.get(5, TimeUnit.SECONDS), Collections.emptyList());
+
+ ledger.addEntry("entry-1".getBytes(Encoding));
+ assertEquals(readAfterLast.get(5, TimeUnit.SECONDS), Collections.emptyList());
+ assertEquals(readLatest.get(5, TimeUnit.SECONDS), Collections.emptyList());
+
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testManagedLedgerAsyncReadEntriesExactCountBoundaries() throws Exception {
+ ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesExactCountBoundaries",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
+
+ Position p0 = ledger.addEntry("entry-0".getBytes(Encoding));
+ Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
+ Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
+ Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
+ Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));
+
+ assertEquals(p0.getLedgerId(), p2.getLedgerId());
+ assertNotEquals(p2.getLedgerId(), p3.getLedgerId());
+
+ assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 1).get(5, TimeUnit.SECONDS), p1);
+ assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 2).get(5, TimeUnit.SECONDS), p1, p2);
+ assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 10).get(5, TimeUnit.SECONDS), p1, p2, p3, p4);
+
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testManagedLedgerAsyncReadEntriesSkipsEmptyLedgers() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(
+ "testManagedLedgerAsyncReadEntriesSkipsEmptyLedgers",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
+
+ Position p0 = ledger.addEntry("entry-0".getBytes(Encoding));
+ ledger.ledgerClosed(ledger.currentLedger, 0L);
+ Awaitility.await().untilAsserted(() -> assertEquals(ManagedLedgerImpl.STATE_UPDATER.get(ledger),
+ ManagedLedgerImpl.State.LedgerOpened));
+ LedgerHandle emptyLedger = ledger.currentLedger;
+ ledger.ledgerClosed(emptyLedger, -1L);
+ Awaitility.await().untilAsserted(() -> assertEquals(ManagedLedgerImpl.STATE_UPDATER.get(ledger),
+ ManagedLedgerImpl.State.LedgerOpened));
+ Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
+
+ assertNotEquals(p0.getLedgerId(), p1.getLedgerId());
+ assertFalse(ledger.getLedgersInfo().containsKey(emptyLedger.getId()));
+ assertEntryPositionsAndRelease(
+ ledger.asyncReadEntries(PositionFactory.create(emptyLedger.getId(), 0), 1).get(5, TimeUnit.SECONDS),
+ p1);
+
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testManagedLedgerAsyncReadEntriesStopsOnErrorAndReturnsPartialEntries() throws Exception {
+ ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesStopsOnErrorAndReturnsPartialEntries",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+ Position p0 = ledger.addEntry("entry-0".getBytes(Encoding));
+ Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
+ Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
+
+ assertNotEquals(p0.getLedgerId(), p1.getLedgerId());
+ assertNotEquals(p1.getLedgerId(), p2.getLedgerId());
+
+ bkc.deleteLedger(p1.getLedgerId());
+
+ assertEntryPositionsAndRelease(ledger.asyncReadEntries(p0, 3).get(5, TimeUnit.SECONDS), p0);
+
+ assertTrue(expectFutureFailure(ledger.asyncReadEntries(p1, 3)) instanceof ManagedLedgerException);
+
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testManagedLedgerAsyncReadEntriesFailsWhenClosed() throws Exception {
+ ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesFailsWhenClosed");
+ Position position = ledger.addEntry("entry-0".getBytes(Encoding));
+
+ ledger.close();
+
+ assertTrue(expectFutureFailure(ledger.asyncReadEntries(position, 1))
+ instanceof ManagedLedgerException.ManagedLedgerFencedException);
+ }
+
+ @Test(timeOut = 20000)
+ public void testManagedLedgerAsyncReadEntriesIgnoresCursorAckState() throws Exception {
+ ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesIgnoresCursorAckState");
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ Position p0 = ledger.addEntry("entry-0".getBytes(Encoding));
+ Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
+
+ List
> asyncReadEntries(Position startPosition, int numberOfEntries,
+ Position maxPosition) {
+ return delegate.asyncReadEntries(startPosition, numberOfEntries, maxPosition);
+ }
+
@Override
public NavigableMap
> asyncReadEntries(Position startPosition, int numberOfEntries,
+ Position maxPosition) {
+ return CompletableFuture.completedFuture(List.of());
+ }
+
@Override
public NavigableMap