diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0455f0efa8bb6..a83023360076b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -20,6 +20,7 @@ import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -764,6 +765,28 @@ default ManagedLedgerAttributes getManagedLedgerAttributes() { void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx); + /** + * Read entries from the managed ledger starting from the provided position. + * + *

The start position is inclusive when it points to an existing entry. If it points to a non-existing entry, + * the read starts from the next valid entry in ledger order. {@link PositionFactory#EARLIEST} starts from the first + * available entry, while {@link PositionFactory#LATEST} starts from the position after the current last confirmed + * entry. This method does not wait for future writes and will complete with fewer entries, or an empty list, when + * there are not enough currently readable entries. + * + *

The returned entries are raw ledger entries and are not filtered by any cursor acknowledgement state. Callers + * are responsible for releasing returned entries. + * + * @param maxPosition the maximum position to read (inclusive). The read will not return entries beyond this + * position. When {@code null}, defaults to no upper bound (equivalent to + * {@link PositionFactory#LATEST}). + */ + CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries, Position maxPosition); + + default CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries) { + return asyncReadEntries(startPosition, numberOfEntries, PositionFactory.LATEST); + } + /** * Get all the managed ledgers. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4a1a3d12ab075..d1015df327ee3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static java.lang.Math.max; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; @@ -2330,6 +2331,49 @@ public void asyncReadEntry(Position position, ReadEntryCallback callback, Object } + @Override + public CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries, + Position maxPosition) { + if (startPosition == null || numberOfEntries <= 0 || maxPosition == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("Invalid parameters")); + } + startPosition = getStartPosition(startPosition); + if (startPosition == null) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + if (maxPosition.compareTo(startPosition) < 0) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + + CompletableFuture> promise = new CompletableFuture<>(); + OpReadEntries.create(this, startPosition, numberOfEntries, maxPosition, promise).readEntries(); + return promise; + } + + private Position getStartPosition(Position startPosition) { + if (PositionFactory.EARLIEST.equals(startPosition)) { + if (ledgers.isEmpty()) { + return null; + } + Long firstLedger = ledgers.firstKey(); + if (firstLedger == null) { + return null; + } + return PositionFactory.create(firstLedger, 0); + } + if (PositionFactory.LATEST.equals(startPosition)) { + Position lastPosition = getLastPosition(); + return lastPosition == null ? null : lastPosition.getNext(); + } + if (startPosition.compareTo(lastConfirmedEntry) > 0) { + return null; + } + if (isValidPosition(startPosition)) { + return startPosition; + } + return getNextValidPosition(startPosition); + } + private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { @@ -2451,6 +2495,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry } } + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ReadEntriesCallback callback, + Object ctx) { + IntSupplier expectedReadCount = () -> 0; + if (config.getReadEntryTimeoutSeconds() > 0) { + // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled + long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); + long createdTime = System.nanoTime(); + ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, + callback, readOpCount, createdTime, ctx); + lastReadCallback = readCallback; + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount); + } else { + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx); + } + } + static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { volatile ReadEntryCallback readEntryCallback; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java new file mode 100644 index 0000000000000..d2b80c71d2349 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntries.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static java.lang.Math.min; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.CustomLog; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State; +import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo; + +@CustomLog +class OpReadEntries implements ReadEntriesCallback { + ManagedLedgerImpl ledger; + Position readPosition; + Position maxPosition; + private int count; + private CompletableFuture> promise; + + private List entries; + private Position nextReadPosition; + + static OpReadEntries create(ManagedLedgerImpl ledger, Position readPosition, int count, + Position maxPosition, CompletableFuture> promise) { + OpReadEntries op = new OpReadEntries(); + op.ledger = ledger; + op.readPosition = ledger.startReadOperationOnLedger(readPosition); + op.count = count; + op.maxPosition = maxPosition; + op.promise = promise; + op.entries = new ArrayList<>(count); + op.nextReadPosition = op.readPosition; + return op; + } + + void readEntries() { + final State state = ManagedLedgerImpl.STATE_UPDATER.get(ledger); + if (state.isFenced() || state == State.Closed) { + readEntriesFailed(new ManagedLedgerFencedException(), null); + return; + } + + if (readPosition.compareTo(maxPosition) > 0) { + checkReadCompletion(); + return; + } + + long ledgerId = readPosition.getLedgerId(); + LedgerHandle currentLedger = ledger.currentLedger; + + if (currentLedger != null && ledgerId == currentLedger.getId()) { + // Current writing ledger is not in the cache (since we don't want + // it to be automatically evicted), and we cannot use 2 different + // ledger handles (read & write)for the same ledger. + internalReadFromLedger(currentLedger); + } else { + LedgerInfo ledgerInfo = ledger.ledgers.get(ledgerId); + if (ledgerInfo == null || ledgerInfo.getEntries() == 0) { + updateReadPosition(getNextLedgerPosition(ledgerId)); + checkReadCompletion(); + return; + } + + ledger.getLedgerHandle(ledgerId).thenAccept(this::internalReadFromLedger) + .exceptionally(ex -> { + ledger.log.error().attr("position", readPosition).exceptionMessage(ex) + .log("Error opening ledger for reading"); + readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), null); + return null; + }); + } + } + + private void internalReadFromLedger(ReadHandle readHandle) { + long firstEntry = readPosition.getEntryId(); + long lastEntryInLedger; + + Position lastPosition = ledger.lastConfirmedEntry; + + if (readHandle.getId() == lastPosition.getLedgerId()) { + // For the current ledger, we only give read visibility to the last entry we have received a confirmation in + // the managed ledger layer + lastEntryInLedger = lastPosition.getEntryId(); + } else { + // For other ledgers, already closed the BK lastAddConfirmed is appropriate + lastEntryInLedger = readHandle.getLastAddConfirmed(); + } + + if (readHandle.getId() == maxPosition.getLedgerId()) { + lastEntryInLedger = min(maxPosition.getEntryId(), lastEntryInLedger); + } + + if (firstEntry > lastEntryInLedger) { + log.debug().attr("ledgerId", readHandle.getId()) + .attr("lastEntry", lastEntryInLedger) + .attr("readEntry", firstEntry) + .log("No more messages to read from ledger"); + + LedgerHandle currentLedger = ledger.currentLedger; + if (currentLedger == null || readHandle.getId() != currentLedger.getId()) { + updateReadPosition(getNextLedgerPosition(readHandle.getId())); + } else { + updateReadPosition(readPosition); + } + + checkReadCompletion(); + return; + } + + long lastEntry = min(firstEntry + getNumberOfEntriesToRead() - 1, lastEntryInLedger); + + log.debug().attr("ledgerId", readHandle.getId()) + .attr("firstEntry", firstEntry) + .attr("lastEntry", lastEntry) + .log("Reading entries from ledger"); + ledger.asyncReadEntry(readHandle, firstEntry, lastEntry, this, null); + } + + private Position getNextLedgerPosition(long ledgerId) { + Long nextLedgerId = ledger.ledgers.ceilingKey(ledgerId + 1); + return PositionFactory.create(nextLedgerId != null ? nextLedgerId : ledgerId + 1, 0); + } + + @Override + public void readEntriesComplete(List returnedEntries, Object ctx) { + try { + internalReadEntriesComplete(returnedEntries); + } catch (Throwable throwable) { + log.error().attr("op", this).exception(throwable) + .log("Fallback to readEntriesFailed for exception in readEntriesComplete"); + readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + } + } + + private void internalReadEntriesComplete(List returnedEntries) { + if (returnedEntries.isEmpty()) { + log.warn().attr("op", this).log("Read no entries unexpectedly"); + checkReadCompletion(); + return; + } + + log.debug() + .attr("managedLedger", ledger.getName()) + .attr("batchSize", returnedEntries.size()) + .attr("cumulativeSize", entries.size()) + .attr("requestedCount", count) + .log("Read entries succeeded"); + + entries.addAll(returnedEntries); + updateReadPosition(returnedEntries.get(returnedEntries.size() - 1).getPosition().getNext()); + checkReadCompletion(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + try { + internalReadEntriesFailed(exception); + } catch (Throwable throwable) { + promise.completeExceptionally(ManagedLedgerException.getManagedLedgerException(throwable)); + } + } + + private void internalReadEntriesFailed(ManagedLedgerException exception) { + if (!entries.isEmpty()) { + promise.complete(entries); + return; + } + + log.warn() + .attr("managedLedger", ledger.getName()) + .attr("readPosition", readPosition) + .exception(exception) + .log("Read failed from ledger"); + promise.completeExceptionally(exception); + } + + void updateReadPosition(Position newReadPosition) { + nextReadPosition = newReadPosition; + } + + void checkReadCompletion() { + if (entries.size() < count + && ledger.hasMoreEntries(nextReadPosition) + && maxPosition.compareTo(readPosition) > 0) { + ledger.getExecutor().execute(() -> { + readPosition = ledger.startReadOperationOnLedger(nextReadPosition); + readEntries(); + }); + } else { + promise.complete(entries); + } + } + + int getNumberOfEntriesToRead() { + return count - entries.size(); + } + + + @Override + public String toString() { + final var ledger = this.ledger; + final var readPosition = this.readPosition; + final var maxPosition = this.maxPosition; + final var nextReadPosition = this.nextReadPosition; + final var entries = this.entries; + final var count = this.count; + if (ledger != null) { + return ledger.getName() + "{ readPosition: " + + (readPosition != null ? readPosition : "(null)") + ", maxPosition: " + + (maxPosition != null ? maxPosition : "(null)") + ", nextReadPosition: " + + (nextReadPosition != null ? nextReadPosition : "(null)") + ", entries count: " + + (entries != null ? entries.size() : "(null)") + ", count: " + count + " }"; + } else { + return "(null)"; + } + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index bef94ecec5e64..8e67861e7ab34 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -76,6 +76,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; @@ -202,6 +203,38 @@ public static void makeReadEntryProbFail(ManagedLedgerImpl ml, Supplier future) throws Exception { + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected future to fail"); + } catch (ExecutionException e) { + return e.getCause(); + } + return null; + } + + private static void assertEntryPositionsAndRelease(List entries, Position... expectedPositions) { + try { + assertEquals(entries.size(), expectedPositions.length); + for (int i = 0; i < expectedPositions.length; i++) { + assertEquals(entries.get(i).getPosition(), expectedPositions[i]); + } + } finally { + entries.forEach(Entry::release); + } + } + + private static void assertEntryDataAndRelease(List entries, String... expectedData) { + try { + assertEquals(entries.size(), expectedData.length); + for (int i = 0; i < expectedData.length; i++) { + assertEquals(new String(entries.get(i).getData(), Encoding), expectedData[i]); + } + } finally { + entries.forEach(Entry::release); + } + } + @Data private static class DeleteLedgerInfo{ volatile boolean hasCalled; @@ -333,6 +366,282 @@ public void managedLedgerApi() throws Exception { ledger.close(); } + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntries() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntries", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + + assertEquals(ledger.asyncReadEntries(PositionFactory.EARLIEST, 10).get(5, TimeUnit.SECONDS), + Collections.emptyList()); + assertEquals(ledger.asyncReadEntries(PositionFactory.LATEST, 10).get(5, TimeUnit.SECONDS), + Collections.emptyList()); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + ledger.addEntry("entry-3".getBytes(Encoding)); + Position p4 = ledger.addEntry("entry-4".getBytes(Encoding)); + + assertEntryPositionsAndRelease(ledger.asyncReadEntries(PositionFactory.EARLIEST, 3).get(5, TimeUnit.SECONDS), + p0, p1, p2); + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 10).get(5, TimeUnit.SECONDS), p1, p2, + PositionFactory.create(p2.getLedgerId(), p2.getEntryId() + 1), p4); + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(PositionFactory.create(p1.getLedgerId(), p1.getEntryId() + 100), 10) + .get(5, TimeUnit.SECONDS), + p2, PositionFactory.create(p2.getLedgerId(), p2.getEntryId() + 1), p4); + + assertEquals(ledger.asyncReadEntries(p4.getNext(), 10).get(5, TimeUnit.SECONDS), Collections.emptyList()); + assertEquals(ledger.asyncReadEntries(PositionFactory.LATEST, 10).get(5, TimeUnit.SECONDS), + Collections.emptyList()); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesRejectsInvalidArguments() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesRejectsInvalidArguments"); + + assertTrue(expectFutureFailure(ledger.asyncReadEntries(null, 1)) instanceof IllegalArgumentException); + assertTrue(expectFutureFailure(ledger.asyncReadEntries(PositionFactory.EARLIEST, 0)) + instanceof IllegalArgumentException); + assertTrue(expectFutureFailure(ledger.asyncReadEntries(PositionFactory.EARLIEST, -1)) + instanceof IllegalArgumentException); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesPositionBoundaryValidation() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesPositionBoundaryValidation", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + + assertNotEquals(p1.getLedgerId(), p2.getLedgerId()); + + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(PositionFactory.create(p0.getLedgerId(), -1), 1).get(5, TimeUnit.SECONDS), + p0); + assertEntryPositionsAndRelease(ledger.asyncReadEntries(PositionFactory.create(-2, 0), 1) + .get(5, TimeUnit.SECONDS), p0); + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(PositionFactory.create(p0.getLedgerId(), Long.MAX_VALUE), 1) + .get(5, TimeUnit.SECONDS), + p2); + assertEquals(ledger.asyncReadEntries(PositionFactory.create(Long.MAX_VALUE, 0), 1).get(5, TimeUnit.SECONDS), + Collections.emptyList()); + assertEquals(ledger.asyncReadEntries(PositionFactory.create(p2.getLedgerId(), Long.MAX_VALUE), 1) + .get(5, TimeUnit.SECONDS), Collections.emptyList()); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesDoesNotWaitForFutureWrites() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesDoesNotWaitForFutureWrites"); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + CompletableFuture> readAfterLast = ledger.asyncReadEntries(p0.getNext(), 10); + CompletableFuture> readLatest = ledger.asyncReadEntries(PositionFactory.LATEST, 10); + + assertEquals(readAfterLast.get(5, TimeUnit.SECONDS), Collections.emptyList()); + assertEquals(readLatest.get(5, TimeUnit.SECONDS), Collections.emptyList()); + + ledger.addEntry("entry-1".getBytes(Encoding)); + assertEquals(readAfterLast.get(5, TimeUnit.SECONDS), Collections.emptyList()); + assertEquals(readLatest.get(5, TimeUnit.SECONDS), Collections.emptyList()); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesExactCountBoundaries() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesExactCountBoundaries", + new ManagedLedgerConfig().setMaxEntriesPerLedger(3)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); + Position p4 = ledger.addEntry("entry-4".getBytes(Encoding)); + + assertEquals(p0.getLedgerId(), p2.getLedgerId()); + assertNotEquals(p2.getLedgerId(), p3.getLedgerId()); + + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 1).get(5, TimeUnit.SECONDS), p1); + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 2).get(5, TimeUnit.SECONDS), p1, p2); + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 10).get(5, TimeUnit.SECONDS), p1, p2, p3, p4); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesSkipsEmptyLedgers() throws Exception { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open( + "testManagedLedgerAsyncReadEntriesSkipsEmptyLedgers", + new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + ledger.ledgerClosed(ledger.currentLedger, 0L); + Awaitility.await().untilAsserted(() -> assertEquals(ManagedLedgerImpl.STATE_UPDATER.get(ledger), + ManagedLedgerImpl.State.LedgerOpened)); + LedgerHandle emptyLedger = ledger.currentLedger; + ledger.ledgerClosed(emptyLedger, -1L); + Awaitility.await().untilAsserted(() -> assertEquals(ManagedLedgerImpl.STATE_UPDATER.get(ledger), + ManagedLedgerImpl.State.LedgerOpened)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + + assertNotEquals(p0.getLedgerId(), p1.getLedgerId()); + assertFalse(ledger.getLedgersInfo().containsKey(emptyLedger.getId())); + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(PositionFactory.create(emptyLedger.getId(), 0), 1).get(5, TimeUnit.SECONDS), + p1); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesStopsOnErrorAndReturnsPartialEntries() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesStopsOnErrorAndReturnsPartialEntries", + new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + + assertNotEquals(p0.getLedgerId(), p1.getLedgerId()); + assertNotEquals(p1.getLedgerId(), p2.getLedgerId()); + + bkc.deleteLedger(p1.getLedgerId()); + + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p0, 3).get(5, TimeUnit.SECONDS), p0); + + assertTrue(expectFutureFailure(ledger.asyncReadEntries(p1, 3)) instanceof ManagedLedgerException); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesFailsWhenClosed() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesFailsWhenClosed"); + Position position = ledger.addEntry("entry-0".getBytes(Encoding)); + + ledger.close(); + + assertTrue(expectFutureFailure(ledger.asyncReadEntries(position, 1)) + instanceof ManagedLedgerException.ManagedLedgerFencedException); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesIgnoresCursorAckState() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesIgnoresCursorAckState"); + ManagedCursor cursor = ledger.openCursor("c1"); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + + List cursorEntries = cursor.readEntries(2); + cursor.markDelete(cursorEntries.get(cursorEntries.size() - 1).getPosition()); + cursorEntries.forEach(Entry::release); + + assertEntryDataAndRelease(ledger.asyncReadEntries(p0, 2).get(5, TimeUnit.SECONDS), "entry-0", "entry-1"); + assertEntryPositionsAndRelease(ledger.asyncReadEntries(p1, 1).get(5, TimeUnit.SECONDS), p1); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesMaxPositionBeforeStartReturnsEmpty() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesMaxPositionBeforeStartReturnsEmpty"); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + + // maxPosition is p0, startPosition is p1 -- nothing to read + assertEquals(ledger.asyncReadEntries(p1, 10, p0).get(5, TimeUnit.SECONDS), + Collections.emptyList()); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesMaxPositionCapsSameLedger() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesMaxPositionCapsSameLedger", + new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); + + // Request 10 entries starting at p0 but capped at p1 -- should only get p0, p1 + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(p0, 10, p1).get(5, TimeUnit.SECONDS), + p0, p1); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesMaxPositionInclusive() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesMaxPositionInclusive", + new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + + // maxPosition == p1 means p1 is the last readable entry + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(p0, 10, p1).get(5, TimeUnit.SECONDS), + p0, p1); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesMaxPositionCrossesLedger() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesMaxPositionCrossesLedger", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + // ledger roll + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); + + assertNotEquals(p1.getLedgerId(), p2.getLedgerId()); + + // maxPosition in the second ledger, should only read entries up to maxPosition + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(p0, 10, p2).get(5, TimeUnit.SECONDS), + p0, p1, p2); + + ledger.close(); + } + + @Test(timeOut = 20000) + public void testManagedLedgerAsyncReadEntriesMaxPositionRespectsBothConstraints() throws Exception { + ManagedLedger ledger = factory.open("testManagedLedgerAsyncReadEntriesMaxPositionRespectsBothConstraints", + new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); + + Position p0 = ledger.addEntry("entry-0".getBytes(Encoding)); + Position p1 = ledger.addEntry("entry-1".getBytes(Encoding)); + Position p2 = ledger.addEntry("entry-2".getBytes(Encoding)); + Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); + Position p4 = ledger.addEntry("entry-4".getBytes(Encoding)); + + // Count=2 is tighter than maxPosition, should get only 2 entries + assertEntryPositionsAndRelease( + ledger.asyncReadEntries(p0, 2, p4).get(5, TimeUnit.SECONDS), + p0, p1); + + ledger.close(); + } + @Test(timeOut = 20000) public void simple() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java index ff9d3ea8b44d9..0dbeb8f3ca3df 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java @@ -652,6 +652,12 @@ public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback c delegate.asyncReadEntry(position, callback, ctx); } + @Override + public CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries, + Position maxPosition) { + return delegate.asyncReadEntries(startPosition, numberOfEntries, maxPosition); + } + @Override public NavigableMap getLedgersInfo() { return delegate.getLedgersInfo(); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 348fcce86f211..2f5e33060dc71 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -20,6 +20,7 @@ import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -395,6 +396,12 @@ public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback c } + @Override + public CompletableFuture> asyncReadEntries(Position startPosition, int numberOfEntries, + Position maxPosition) { + return CompletableFuture.completedFuture(List.of()); + } + @Override public NavigableMap getLedgersInfo() { return null;