diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 6502b7946f..7c8cb0ad5b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -139,7 +139,6 @@ public LogScanner createLogScanner() { tableInfo, conn.getMetadataUpdater(), conn.getClientMetricGroup(), - conn.getOrCreateRemoteFileDownloader(), projectedColumns, schemaGetter, recordBatchFilter); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java index 12928eda4e..4bb175ed73 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java @@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) { while (pendings != null && !pendings.isEmpty()) { PendingFetch pendingFetch = pendings.peek(); if (pendingFetch.isCompleted()) { - CompletedFetch completedFetch = pendingFetch.toCompletedFetch(); - completedFetches.add(completedFetch); pendings.poll(); - hasCompleted = true; + try { + CompletedFetch completedFetch = pendingFetch.toCompletedFetch(); + completedFetches.add(completedFetch); + hasCompleted = true; + } catch (Throwable t) { + // If toCompletedFetch() fails (e.g. the underlying chunk + // future completed exceptionally), discard this entry so + // the queue is not blocked. The bucket will become fetchable + // again and the server can re-issue a remote fetch. + LOG.warn( + "Discarding failed pending fetch for bucket {}", + tableBucket, + t); + } } else { break; } } if (hasCompleted) { notEmptyCondition.signalAll(); - // clear the bucket entry if there is no pending fetches for the bucket. - if (pendings.isEmpty()) { - this.pendingFetches.remove(tableBucket); - } + } + // clear the bucket entry if there are no pending fetches for the bucket. + if (pendings != null && pendings.isEmpty()) { + this.pendingFetches.remove(tableBucket); } }); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 2625716cde..1c8850ce30 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -21,7 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -126,7 +125,6 @@ public LogFetcher( Configuration conf, MetadataUpdater metadataUpdater, ScannerMetricGroup scannerMetricGroup, - RemoteFileDownloader remoteFileDownloader, SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.isPartitioned(); @@ -163,8 +161,7 @@ public LogFetcher( this.logFetchCollector = new LogFetchCollector(tablePath, logScannerStatus, conf, metadataUpdater); this.scannerMetricGroup = scannerMetricGroup; - this.remoteLogDownloader = - new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup); + this.remoteLogDownloader = new RemoteLogDownloader(tablePath, conf, scannerMetricGroup); remoteLogDownloader.start(); } @@ -480,22 +477,37 @@ private void pendRemoteFetches( fetchOffset = segment.remoteLogStartOffset(); } RemoteLogDownloadFuture downloadFuture = - remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment); - RemotePendingFetch pendingFetch = - new RemotePendingFetch( - segment, - downloadFuture, - posInLogSegment, - fetchOffset, - highWatermark, - remoteReadContext, - logScannerStatus, - isCheckCrcs); - logFetchBuffer.pend(pendingFetch); - downloadFuture.onComplete(() -> logFetchBuffer.tryComplete(segment.tableBucket())); + remoteLogDownloader.requestRemoteLog( + remoteLogTabletDir, segment, posInLogSegment); + pendRemoteChunkFetch(segment, downloadFuture, fetchOffset, highWatermark); } } + /** + * Registers a pending fetch for a single chunk and sets up the callback chain so that + * subsequent chunks of the same segment are automatically pended. + */ + private void pendRemoteChunkFetch( + RemoteLogSegment segment, + RemoteLogDownloadFuture downloadFuture, + long fetchOffset, + long highWatermark) { + downloadFuture.setNextChunkCallback( + nextFuture -> + pendRemoteChunkFetch(segment, nextFuture, fetchOffset, highWatermark)); + RemotePendingFetch pendingFetch = + new RemotePendingFetch( + segment, + downloadFuture, + fetchOffset, + highWatermark, + remoteReadContext, + logScannerStatus, + isCheckCrcs); + logFetchBuffer.pend(pendingFetch); + downloadFuture.onComplete(() -> logFetchBuffer.tryComplete(segment.tableBucket())); + } + @VisibleForTesting Map prepareFetchLogRequests() { Map> fetchLogReqForBuckets = new HashMap<>(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 9a2dbf0b4c..eb61bcbebf 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -20,7 +20,6 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; import org.apache.fluss.metadata.SchemaGetter; @@ -80,7 +79,6 @@ public LogScannerImpl( TableInfo tableInfo, MetadataUpdater metadataUpdater, ClientMetricGroup clientMetricGroup, - RemoteFileDownloader remoteFileDownloader, @Nullable int[] projectedFields, SchemaGetter schemaGetter, @Nullable Predicate recordBatchFilter) { @@ -102,7 +100,6 @@ public LogScannerImpl( conf, metadataUpdater, scannerMetricGroup, - remoteFileDownloader, schemaGetter); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java index 0ac0643078..52db99a053 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java @@ -19,27 +19,23 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.record.FileLogRecords; import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.record.LogRecords; import org.apache.fluss.rpc.protocol.ApiError; -import java.io.IOException; - /** * {@link RemoteCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that - * the log records are fetched from remote log storage. + * the log records are fetched from remote log storage as file-backed or in-memory chunks. */ @Internal class RemoteCompletedFetch extends CompletedFetch { - private final FileLogRecords fileLogRecords; - - // recycle to clean up the fetched remote log files and increment the prefetch semaphore + // recycle to notify the downloader that this chunk has been consumed private final Runnable recycleCallback; RemoteCompletedFetch( TableBucket tableBucket, - FileLogRecords fileLogRecords, + LogRecords logRecords, long highWatermark, LogRecordReadContext readContext, LogScannerStatus logScannerStatus, @@ -49,28 +45,27 @@ class RemoteCompletedFetch extends CompletedFetch { super( tableBucket, ApiError.NONE, - fileLogRecords.sizeInBytes(), + logRecords.sizeInBytes(), highWatermark, - fileLogRecords.batches().iterator(), + logRecords.batches().iterator(), readContext, logScannerStatus, isCheckCrc, fetchOffset, CompletedFetch.NO_FILTERED_END_OFFSET); - this.fileLogRecords = fileLogRecords; this.recycleCallback = recycleCallback; } @Override void drain() { - super.drain(); - // close file channel only, don't need to flush the file which is very heavy - try { - fileLogRecords.closeHandlers(); - } catch (IOException e) { - LOG.warn("Failed to close file channel for remote log records", e); + // Guard against double-drain: super.drain() is idempotent (checks isConsumed internally), + // but recycleCallback must only be called once to avoid double-incrementing chunksConsumed + // which would trigger a spurious semaphore release and corrupt flow-control counters. + if (isConsumed()) { + return; } - // call recycle to remove the fetched files and increment the prefetch semaphore + super.drain(); + // call recycle to notify the downloader and trigger next chunk read recycleCallback.run(); } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java index 339df422cc..e07bcd1336 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java @@ -17,40 +17,39 @@ package org.apache.fluss.client.table.scanner.log; -import org.apache.fluss.exception.FlussRuntimeException; -import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecords; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; -/** Represents the future of a remote log download request. */ +/** + * Represents the future of a single chunk read from a remote log segment. Each chunk is delivered + * as a {@link LogRecords} via a {@link CompletableFuture}. + */ public class RemoteLogDownloadFuture { - private final CompletableFuture logFileFuture; + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogDownloadFuture.class); + + private final CompletableFuture chunkFuture; private final Runnable recycleCallback; + private Consumer nextChunkCallback; public RemoteLogDownloadFuture( - CompletableFuture logFileFuture, Runnable recycleCallback) { - this.logFileFuture = logFileFuture; + CompletableFuture chunkFuture, Runnable recycleCallback) { + this.chunkFuture = chunkFuture; this.recycleCallback = recycleCallback; } public boolean isDone() { - return logFileFuture.isDone(); + return chunkFuture.isDone(); } - public FileLogRecords getFileLogRecords(int startPosition) { - try { - FileLogRecords fileLogRecords = FileLogRecords.open(logFileFuture.join(), false); - if (startPosition > 0) { - return fileLogRecords.slice(startPosition, Integer.MAX_VALUE); - } else { - return fileLogRecords; - } - } catch (IOException e) { - throw new FlussRuntimeException(e); - } + /** Returns the chunk data. Blocks until the chunk is ready. */ + public LogRecords getLogRecords() { + return chunkFuture.join(); } public Runnable getRecycleCallback() { @@ -58,6 +57,29 @@ public Runnable getRecycleCallback() { } public void onComplete(Runnable callback) { - logFileFuture.thenRun(callback); + // Use whenComplete (instead of thenRun) so the callback fires regardless of whether + // the chunkFuture completed successfully or exceptionally. This is critical: if a chunk + // read fails, the LogFetchBuffer still needs tryComplete() to be called so the failed + // PendingFetch can be drained (otherwise the bucket would be stuck permanently). + chunkFuture.whenComplete( + (result, throwable) -> { + try { + callback.run(); + } catch (Throwable t) { + LOG.error("Exception in chunk completion callback", t); + } + }); + } + + /** + * Sets a callback that will be invoked when the next chunk's future is created. This allows the + * {@link LogFetcher} to register a new {@link RemotePendingFetch} for each subsequent chunk. + */ + public void setNextChunkCallback(Consumer callback) { + this.nextChunkCallback = callback; + } + + public Consumer getNextChunkCallback() { + return nextChunkCallback; } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java index b30458983c..f717d462bc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java @@ -20,16 +20,17 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.metrics.ScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; -import org.apache.fluss.fs.FsPathAndFileName; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecords; +import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.remote.RemoteLogSegment; -import org.apache.fluss.utils.ExceptionUtils; -import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.concurrent.ShutdownableThread; import org.slf4j.Logger; @@ -40,9 +41,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -50,13 +48,21 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly; -import static org.apache.fluss.utils.FlussPaths.LOG_FILE_SUFFIX; import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir; import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentFile; -/** Downloader to read remote log files to local disk. */ +/** + * Reads remote log segment files in chunks via streaming I/O. Each chunk (configurable via {@code + * client.scanner.remote-log.chunk-size}, default 8 MB) is read from the remote filesystem, appended + * to a local temporary file, and delivered as a {@link FileLogRecords} slice. This reduces JVM heap + * pressure by leveraging OS page cache. + * + *

Flow control: each segment has at most {@code maxPrefetchChunks} unconsumed chunks. The + * downloader pauses when this limit is reached and resumes when chunks are consumed. + */ @ThreadSafe @Internal public class RemoteLogDownloader implements Closeable { @@ -64,8 +70,6 @@ public class RemoteLogDownloader implements Closeable { private static final long POLL_TIMEOUT = 5000L; - private final Path localLogDir; - /** * A queue to hold the remote log segment files to be fetched. The queue is ordered by the * max_timestamp of the remote log segment. So we download the remote log segments from the @@ -73,47 +77,50 @@ public class RemoteLogDownloader implements Closeable { */ private final PriorityBlockingQueue segmentsToFetch; - private final BlockingQueue segmentsToRecycle; + /** Queue for continuation chunks of segments that are already being streamed. */ + private final BlockingQueue continuationQueue; private final Semaphore prefetchSemaphore; private final DownloadRemoteLogThread downloadThread; - private final RemoteFileDownloader remoteFileDownloader; - private final ScannerMetricGroup scannerMetricGroup; private final long pollTimeout; + private final File tmpDir; + + private final int chunkSize; + + private final int maxPrefetchChunks; + public RemoteLogDownloader( - TablePath tablePath, - Configuration conf, - RemoteFileDownloader remoteFileDownloader, - ScannerMetricGroup scannerMetricGroup) { + TablePath tablePath, Configuration conf, ScannerMetricGroup scannerMetricGroup) { // default we give a 5s long interval to avoid frequent loop - this(tablePath, conf, remoteFileDownloader, scannerMetricGroup, POLL_TIMEOUT); + this(tablePath, conf, scannerMetricGroup, POLL_TIMEOUT); } @VisibleForTesting RemoteLogDownloader( TablePath tablePath, Configuration conf, - RemoteFileDownloader remoteFileDownloader, ScannerMetricGroup scannerMetricGroup, long pollTimeout) { this.segmentsToFetch = new PriorityBlockingQueue<>(); - this.segmentsToRecycle = new LinkedBlockingQueue<>(); - this.remoteFileDownloader = remoteFileDownloader; + this.continuationQueue = new LinkedBlockingQueue<>(); this.scannerMetricGroup = scannerMetricGroup; this.pollTimeout = pollTimeout; this.prefetchSemaphore = new Semaphore(conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)); - // The local tmp dir to store the fetched log segment files, - // add UUID to avoid conflict between tasks. - this.localLogDir = - Paths.get( - conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR), - "remote-logs-" + UUID.randomUUID()); + this.chunkSize = + (int) conf.get(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE).getBytes(); + this.maxPrefetchChunks = + conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS); + this.tmpDir = new File(conf.getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR)); + if (!tmpDir.exists() && !tmpDir.mkdirs()) { + throw new IllegalStateException( + "Failed to create temp directory for remote log chunks: " + tmpDir); + } this.downloadThread = new DownloadRemoteLogThread(tablePath); } @@ -121,106 +128,255 @@ public void start() { downloadThread.start(); } - /** Request to fetch remote log segment to local. This method is non-blocking. */ - public RemoteLogDownloadFuture requestRemoteLog(FsPath logTabletDir, RemoteLogSegment segment) { - RemoteLogDownloadRequest request = new RemoteLogDownloadRequest(segment, logTabletDir); + /** + * Request to read a remote log segment in chunks starting from the given position. This method + * is non-blocking and returns a future for the first chunk. + */ + public RemoteLogDownloadFuture requestRemoteLog( + FsPath logTabletDir, RemoteLogSegment segment, int startPosition) { + CompletableFuture chunkFuture = new CompletableFuture<>(); + RemoteLogDownloadRequest request = + new RemoteLogDownloadRequest(segment, logTabletDir, startPosition, chunkFuture); + RemoteLogDownloadFuture downloadFuture = + new RemoteLogDownloadFuture(chunkFuture, () -> onChunkConsumed(request)); + // Assign downloadFuture before publishing the request to segmentsToFetch, so that the + // download thread never sees a null downloadFuture when reading request.downloadFuture + // inside createAndQueueNextChunk. + request.downloadFuture = downloadFuture; segmentsToFetch.add(request); - return new RemoteLogDownloadFuture(request.future, () -> recycleRemoteLog(segment)); + return downloadFuture; + } + + /** + * Called when a chunk has been consumed (drained). Increments the consumed counter and attempts + * to schedule the next chunk download or cleanup. + */ + private void onChunkConsumed(RemoteLogDownloadRequest request) { + request.chunksConsumed.incrementAndGet(); + tryScheduleNextChunk(request); + } + + /** + * Core scheduling logic. Called from both the download thread (after writing a chunk) and the + * consumer thread (after consuming a chunk). Uses synchronized(request) to avoid race + * conditions between the two threads. + * + *

Lock ordering note: To avoid deadlock with {@code LogFetchBuffer}'s internal lock + * (which may call {@code drain()} → {@code recycleCallback} → this method), we must never + * invoke external callbacks while holding {@code synchronized(request)}. All callbacks are + * captured inside the lock and invoked after releasing it. + */ + private void tryScheduleNextChunk(RemoteLogDownloadRequest request) { + // Capture any pending callback to fire outside the lock. + Consumer pendingCallback = null; + RemoteLogDownloadFuture pendingNextFuture = null; + boolean shouldCleanupAndRelease = false; + + synchronized (request) { + if (request.queuedForContinuation || request.cleanedUp) { + return; + } + + boolean exhausted = request.reader == null || request.reader.isExhausted(); + + if (!exhausted) { + int unconsumed = request.chunksWritten.get() - request.chunksConsumed.get(); + if (unconsumed < maxPrefetchChunks) { + // Build the next future inside the lock, but defer the callback invocation. + RemoteLogDownloadFuture nextFuture = buildNextChunkFuture(request); + pendingCallback = nextFuture != null ? nextFuture.getNextChunkCallback() : null; + pendingNextFuture = nextFuture; + request.queuedForContinuation = true; + // Re-queue as a continuation (higher priority than new segments). + continuationQueue.add(request); + } + // else: too many unconsumed chunks, pause downloading + } else if (request.chunksConsumed.get() >= request.chunksWritten.get()) { + // All chunks consumed and segment exhausted: cleanup + cleanupRequest(request); + shouldCleanupAndRelease = true; + } + } + + // Fire the external callback outside the lock to prevent lock-ordering deadlocks. + if (pendingCallback != null && pendingNextFuture != null) { + pendingCallback.accept(pendingNextFuture); + } + if (shouldCleanupAndRelease) { + prefetchSemaphore.release(); + } } /** - * Recycle the consumed remote log. The removal of the log file is async in the {@link - * #downloadThread}. + * Builds a new {@link RemoteLogDownloadFuture} for the next chunk and updates {@code + * request.chunkFuture} and {@code request.downloadFuture}. Must be called within {@code + * synchronized(request)}. Does NOT invoke any external callbacks. + * + * @return the newly created future, with the inherited {@code nextChunkCallback} already set. */ - void recycleRemoteLog(RemoteLogSegment segment) { - segmentsToRecycle.add(segment); - prefetchSemaphore.release(); + private RemoteLogDownloadFuture buildNextChunkFuture(RemoteLogDownloadRequest request) { + Consumer callback = + request.downloadFuture != null + ? request.downloadFuture.getNextChunkCallback() + : null; + + CompletableFuture nextChunkFuture = new CompletableFuture<>(); + request.chunkFuture = nextChunkFuture; + + RemoteLogDownloadFuture nextFuture = + new RemoteLogDownloadFuture(nextChunkFuture, () -> onChunkConsumed(request)); + if (callback != null) { + nextFuture.setNextChunkCallback(callback); + } + request.downloadFuture = nextFuture; + return nextFuture; + } + + /** Cleans up resources for a completed request: closes reader, deletes temp file. */ + private void cleanupRequest(RemoteLogDownloadRequest request) { + if (request.cleanedUp) { + return; + } + request.cleanedUp = true; + if (request.reader != null) { + request.reader.close(); + request.reader = null; + } + if (request.localFileRecords != null) { + try { + request.localFileRecords.deleteIfExists(); + } catch (IOException e) { + LOG.warn( + "Failed to delete temp file for segment {}.", + request.segment.remoteLogSegmentId(), + e); + } + request.localFileRecords = null; + } } /** - * Fetch a remote log segment file to local. This method will block until there is a log segment - * to fetch. + * Reads one chunk from a remote log segment. Continuations (subsequent chunks of an + * already-opened segment) are processed first without acquiring the semaphore. New segments + * require a semaphore permit. */ void fetchOnce() throws Exception { - // blocks until there is capacity (the fetched file is consumed) - prefetchSemaphore.acquire(); + // Priority 1: process continuations (no semaphore needed). + RemoteLogDownloadRequest request = continuationQueue.poll(); + if (request != null) { + synchronized (request) { + request.queuedForContinuation = false; + } + processChunkRead(request); + return; + } - // wait until there is a remote fetch request - RemoteLogDownloadRequest request = segmentsToFetch.poll(pollTimeout, TimeUnit.MILLISECONDS); + // Priority 2: process new segments (semaphore needed). + // Use tryAcquire with timeout so the thread can periodically loop back and check + // the continuation queue for new chunk requests from already-active segments. + if (!prefetchSemaphore.tryAcquire(pollTimeout, TimeUnit.MILLISECONDS)) { + return; + } + try { + request = segmentsToFetch.poll(pollTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Release the permit before propagating the interrupt, so that + // shutdown does not leak semaphore permits. + prefetchSemaphore.release(); + throw e; + } if (request == null) { prefetchSemaphore.release(); return; } + processChunkRead(request); + } + private void processChunkRead(RemoteLogDownloadRequest request) { TableBucket tableBucket = request.getTableBucket(); + // Capture the chunkFuture early. A concurrent consumer thread may call + // createAndQueueNextChunk (via onChunkConsumed) and replace request.chunkFuture + // while we are reading/writing the chunk data. + final CompletableFuture currentChunkFuture = request.chunkFuture; try { - // 1. cleanup the finished logs first to free up disk space - cleanupRemoteLogs(); + // Lazily initialize the chunk reader and local temp file on first access. + if (request.reader == null) { + FsPath remotePath = + getRemoteLogFilePath(request.remoteLogTabletDir, request.segment); + FileSystem fs = remotePath.getFileSystem(); + FSDataInputStream inputStream = fs.open(remotePath); + request.reader = + new RemoteSegmentChunkReader( + inputStream, + request.startPosition, + request.segment.segmentSizeInBytes()); + + // Create a temp file for this segment. The File reference is owned by + // localFileRecords; deleteIfExists() in cleanupRequest handles removal. + File tempFile = + new File( + tmpDir, + "remote-chunk-" + + request.segment.remoteLogSegmentId() + + "-" + + UUID.randomUUID() + + ".tmp"); + request.localFileRecords = FileLogRecords.open(tempFile, true, false, 0, false); + request.localFileWrittenPosition = 0; + } + + long startTime = System.currentTimeMillis(); + + // Read chunk from remote into memory (temporary). + MemoryLogRecords chunkData = request.reader.readNextChunk(chunkSize); + + // Handle empty chunk (EOF or segment exhausted). This can happen when: + // 1. The segment has 0 bytes (degenerate case) + // 2. A continuation read reaches EOF (e.g., segment size metadata was larger + // than the actual file, or the previous chunk already read all data) + // + // For the empty sentinel we do NOT increment chunksWritten. Instead, we directly + // invoke tryScheduleNextChunk which will find exhausted=true and + // chunksConsumed >= chunksWritten, triggering cleanup and semaphore release on the + // download thread. The consumer's later recycleCallback (from drain()) is harmless + // because cleanedUp=true will cause tryScheduleNextChunk to return immediately. + if (chunkData.sizeInBytes() == 0) { + currentChunkFuture.complete(MemoryLogRecords.EMPTY); + tryScheduleNextChunk(request); + return; + } - // 2. do the actual download work - FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName(); scannerMetricGroup.remoteFetchRequestCount().inc(); - long startTime = System.currentTimeMillis(); - // download the remote file to local - remoteFileDownloader - .downloadFileAsync(fsPathAndFileName, localLogDir) - .whenComplete( - (bytes, throwable) -> { - if (throwable != null) { - LOG.error( - "Failed to download remote log segment file {} for table bucket {}.", - fsPathAndFileName.getFileName(), - tableBucket, - ExceptionUtils.stripExecutionException(throwable)); - // release the semaphore for the failed request - prefetchSemaphore.release(); - // add back the request to the queue, - // so we do not complete the request.future here - segmentsToFetch.add(request); - scannerMetricGroup.remoteFetchErrorCount().inc(); - } else { - LOG.info( - "Successfully downloaded remote log segment file {} to local for " - + "table bucket {} cost {} ms.", - fsPathAndFileName.getFileName(), - tableBucket, - System.currentTimeMillis() - startTime); - File localFile = - new File( - localLogDir.toFile(), - fsPathAndFileName.getFileName()); - scannerMetricGroup.remoteFetchBytes().inc(bytes); - request.future.complete(localFile); - } - }); + // Append to local temp file and create a slice for this chunk. + int chunkStart = request.localFileWrittenPosition; + int written = request.localFileRecords.append(chunkData); + request.localFileWrittenPosition += written; + + FileLogRecords slice = request.localFileRecords.slice(chunkStart, written); + request.chunksWritten.incrementAndGet(); + + scannerMetricGroup.remoteFetchBytes().inc(written); + LOG.debug( + "Read remote log chunk of {} bytes for bucket {} in {} ms.", + written, + tableBucket, + System.currentTimeMillis() - startTime); + + currentChunkFuture.complete(slice); + + // Try to pre-fetch next chunk if flow control allows. + tryScheduleNextChunk(request); } catch (Throwable t) { + synchronized (request) { + cleanupRequest(request); + } + currentChunkFuture.completeExceptionally(t); + // Release semaphore: cleanedUp flag prevents double-release via + // tryScheduleNextChunk when the consumer later calls recycle. prefetchSemaphore.release(); - // add back the request to the queue - segmentsToFetch.add(request); scannerMetricGroup.remoteFetchErrorCount().inc(); - // log the error and continue instead of shutdown the download thread - LOG.error("Failed to download remote log segment for table bucket {}.", tableBucket, t); - } - } - - private void cleanupRemoteLogs() { - RemoteLogSegment segment; - while ((segment = segmentsToRecycle.poll()) != null) { - cleanupFinishedRemoteLog(segment); - } - } - - private void cleanupFinishedRemoteLog(RemoteLogSegment segment) { - try { - Path logFile = localLogDir.resolve(getLocalFileNameOfRemoteSegment(segment)); - Files.deleteIfExists(logFile); - LOG.info( - "Consumed and deleted the fetched log segment file {} for bucket {}.", - logFile.getFileName(), - segment.tableBucket()); - } catch (IOException e) { - LOG.warn("Failed to delete the local fetch segment file {}.", localLogDir, e); + LOG.error("Failed to read remote log chunk for table bucket {}.", tableBucket, t); } } @@ -231,8 +387,17 @@ public void close() throws IOException { } catch (InterruptedException e) { // ignore } - - deleteDirectoryQuietly(localLogDir.toFile()); + // Cleanup all pending requests. + for (RemoteLogDownloadRequest req : segmentsToFetch) { + synchronized (req) { + cleanupRequest(req); + } + } + for (RemoteLogDownloadRequest req : continuationQueue) { + synchronized (req) { + cleanupRequest(req); + } + } } @VisibleForTesting @@ -240,44 +405,22 @@ Semaphore getPrefetchSemaphore() { return prefetchSemaphore; } - @VisibleForTesting - Path getLocalLogDir() { - return localLogDir; - } - @VisibleForTesting int getSizeOfSegmentsToFetch() { return segmentsToFetch.size(); } - protected static FsPathAndFileName getFsPathAndFileName( - FsPath remoteLogTabletDir, RemoteLogSegment segment) { - FsPath remotePath = - remoteLogSegmentFile( - remoteLogSegmentDir(remoteLogTabletDir, segment.remoteLogSegmentId()), - segment.remoteLogStartOffset()); - return new FsPathAndFileName(remotePath, getLocalFileNameOfRemoteSegment(segment)); - } - - /** - * Get the local file name of the remote log segment. - * - *

The file name is in pattern: - * - *

-     *     ${remote_segment_id}_${offset_prefix}.log
-     * 
- */ - private static String getLocalFileNameOfRemoteSegment(RemoteLogSegment segment) { - return segment.remoteLogSegmentId() - + "_" - + FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset()) - + LOG_FILE_SUFFIX; + /** Returns the remote file path for the given segment. */ + @VisibleForTesting + static FsPath getRemoteLogFilePath(FsPath remoteLogTabletDir, RemoteLogSegment segment) { + return remoteLogSegmentFile( + remoteLogSegmentDir(remoteLogTabletDir, segment.remoteLogSegmentId()), + segment.remoteLogStartOffset()); } /** - * Thread to download remote log files to local. The thread will keep fetching remote log files - * until it is interrupted. + * Thread to read remote log chunks. The thread will keep reading chunks until it is + * interrupted. */ private class DownloadRemoteLogThread extends ShutdownableThread { public DownloadRemoteLogThread(TablePath tablePath) { @@ -287,23 +430,42 @@ public DownloadRemoteLogThread(TablePath tablePath) { @Override public void doWork() throws Exception { fetchOnce(); - cleanupRemoteLogs(); } } - /** Represents a request to download a remote log segment file to local. */ + @VisibleForTesting + int getMaxPrefetchChunks() { + return maxPrefetchChunks; + } + + /** Represents a request to read a remote log segment in chunks. */ static class RemoteLogDownloadRequest implements Comparable { final RemoteLogSegment segment; final FsPath remoteLogTabletDir; - final CompletableFuture future = new CompletableFuture<>(); - - public RemoteLogDownloadRequest(RemoteLogSegment segment, FsPath remoteLogTabletDir) { + final int startPosition; + CompletableFuture chunkFuture; + RemoteSegmentChunkReader reader; + RemoteLogDownloadFuture downloadFuture; + + // Local temp file state. + FileLogRecords localFileRecords; + int localFileWrittenPosition; + + // Flow control counters. + final AtomicInteger chunksWritten = new AtomicInteger(0); + final AtomicInteger chunksConsumed = new AtomicInteger(0); + volatile boolean queuedForContinuation; + boolean cleanedUp; + + RemoteLogDownloadRequest( + RemoteLogSegment segment, + FsPath remoteLogTabletDir, + int startPosition, + CompletableFuture chunkFuture) { this.segment = segment; this.remoteLogTabletDir = remoteLogTabletDir; - } - - public FsPathAndFileName getFsPathAndFileName() { - return RemoteLogDownloader.getFsPathAndFileName(remoteLogTabletDir, segment); + this.startPosition = startPosition; + this.chunkFuture = chunkFuture; } public TableBucket getTableBucket() { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemotePendingFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemotePendingFetch.java index ba4ffd8c82..f6a2308d58 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemotePendingFetch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemotePendingFetch.java @@ -18,8 +18,8 @@ package org.apache.fluss.client.table.scanner.log; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.record.FileLogRecords; import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.record.LogRecords; import org.apache.fluss.remote.RemoteLogSegment; /** @@ -31,7 +31,6 @@ class RemotePendingFetch implements PendingFetch { final RemoteLogSegment remoteLogSegment; private final RemoteLogDownloadFuture downloadFuture; - private final int posInLogSegment; private final long fetchOffset; private final long highWatermark; private final LogRecordReadContext readContext; @@ -41,7 +40,6 @@ class RemotePendingFetch implements PendingFetch { RemotePendingFetch( RemoteLogSegment remoteLogSegment, RemoteLogDownloadFuture downloadFuture, - int posInLogSegment, long fetchOffset, long highWatermark, LogRecordReadContext readContext, @@ -49,7 +47,6 @@ class RemotePendingFetch implements PendingFetch { boolean isCheckCrc) { this.remoteLogSegment = remoteLogSegment; this.downloadFuture = downloadFuture; - this.posInLogSegment = posInLogSegment; this.fetchOffset = fetchOffset; this.highWatermark = highWatermark; this.readContext = readContext; @@ -69,10 +66,10 @@ public boolean isCompleted() { @Override public CompletedFetch toCompletedFetch() { - FileLogRecords fileLogRecords = downloadFuture.getFileLogRecords(posInLogSegment); + LogRecords logRecords = downloadFuture.getLogRecords(); return new RemoteCompletedFetch( remoteLogSegment.tableBucket(), - fileLogRecords, + logRecords, highWatermark, readContext, logScannerStatus, @@ -88,8 +85,6 @@ public String toString() { + remoteLogSegment + ", fetchOffset=" + fetchOffset - + ", posInLogSegment=" - + posInLogSegment + ", highWatermark=" + highWatermark + '}'; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteSegmentChunkReader.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteSegmentChunkReader.java new file mode 100644 index 0000000000..2eebbaa90a --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteSegmentChunkReader.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.record.MemoryLogRecords; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; + +/** + * Reads a remote log segment file in chunks via {@link FSDataInputStream}. Each call to {@link + * #readNextChunk(int)} reads up to {@code maxChunkSize} bytes, finds the last complete batch + * boundary, and returns a {@link MemoryLogRecords} containing only complete batches. + * + *

If a single batch is larger than the chunk size, the entire batch is read in one go. + */ +@Internal +class RemoteSegmentChunkReader implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RemoteSegmentChunkReader.class); + + private final FSDataInputStream inputStream; + private long currentPosition; + private final long endPosition; + + RemoteSegmentChunkReader(FSDataInputStream inputStream, long startPosition, long endPosition) { + this.inputStream = inputStream; + this.currentPosition = startPosition; + this.endPosition = endPosition; + } + + /** + * Reads the next chunk of data from the remote file, truncating to complete batch boundaries. + * + * @param maxChunkSize the maximum number of bytes to read in one chunk + * @return a {@link MemoryLogRecords} containing complete batches, or {@link + * MemoryLogRecords#EMPTY} if no complete batch could be read + */ + MemoryLogRecords readNextChunk(int maxChunkSize) throws IOException { + if (isExhausted()) { + return MemoryLogRecords.EMPTY; + } + + long remaining = endPosition - currentPosition; + int bytesToRead = (int) Math.min(maxChunkSize, remaining); + + // Seek to the current position and read bytes. + inputStream.seek(currentPosition); + byte[] buffer = new byte[bytesToRead]; + int totalRead = readFully(buffer, 0, bytesToRead); + + if (totalRead == 0) { + // No data read, mark as exhausted. + currentPosition = endPosition; + return MemoryLogRecords.EMPTY; + } + + // Find the last complete batch boundary. + int validBytes = findLastCompleteBatchEnd(buffer, totalRead); + + if (validBytes == 0) { + // No complete batch in the chunk. This means a single batch is larger than + // maxChunkSize. Read the entire oversized batch. + return readOversizedBatch(buffer, totalRead); + } + + currentPosition += validBytes; + return MemoryLogRecords.pointToBytes(buffer, 0, validBytes); + } + + /** + * Scans through the buffer to find the end position of the last complete batch. + * + * @return the number of bytes up to and including the last complete batch, or 0 if no complete + * batch fits in the buffer + */ + private int findLastCompleteBatchEnd(byte[] buffer, int bufferLength) { + int position = 0; + int lastCompleteBatchEnd = 0; + + while (position + LOG_OVERHEAD <= bufferLength) { + // Read the batch length from the LENGTH_OFFSET within this batch. + // Fluss uses little-endian byte order for the batch header. + int batchLength = + ByteBuffer.wrap(buffer, position + LENGTH_OFFSET, LENGTH_LENGTH) + .order(ByteOrder.LITTLE_ENDIAN) + .getInt(); + int totalBatchSize = LOG_OVERHEAD + batchLength; + + if (totalBatchSize <= 0) { + // Invalid batch length, stop parsing. + break; + } + + if (position + totalBatchSize > bufferLength) { + // This batch extends beyond the buffer boundary. + break; + } + + position += totalBatchSize; + lastCompleteBatchEnd = position; + } + + return lastCompleteBatchEnd; + } + + /** + * Handles the case where a single batch is larger than the chunk size. Reads the batch header + * to determine the full batch size, then reads the entire batch into memory. + */ + private MemoryLogRecords readOversizedBatch(byte[] partialBuffer, int partialLength) + throws IOException { + if (partialLength < LOG_OVERHEAD) { + // Not enough data even for a batch header, mark as exhausted. + currentPosition = endPosition; + return MemoryLogRecords.EMPTY; + } + + // Read the batch length from the partial buffer. + // Fluss uses little-endian byte order for the batch header. + int batchLength = + ByteBuffer.wrap(partialBuffer, LENGTH_OFFSET, LENGTH_LENGTH) + .order(ByteOrder.LITTLE_ENDIAN) + .getInt(); + int totalBatchSize = LOG_OVERHEAD + batchLength; + + if (totalBatchSize <= 0 || currentPosition + totalBatchSize > endPosition) { + LOG.warn( + "Encountered invalid batch size {} at position {}, skipping remaining segment.", + totalBatchSize, + currentPosition); + currentPosition = endPosition; + return MemoryLogRecords.EMPTY; + } + + // Allocate a buffer for the full batch and copy existing data. + byte[] fullBuffer = new byte[totalBatchSize]; + System.arraycopy(partialBuffer, 0, fullBuffer, 0, partialLength); + + // Read the remaining bytes of the oversized batch. + int remainingToRead = totalBatchSize - partialLength; + int totalRead = readFully(fullBuffer, partialLength, remainingToRead); + + if (totalRead < remainingToRead) { + LOG.warn( + "Unexpected EOF reading oversized batch at position {}, " + + "expected {} more bytes but got {}.", + currentPosition, + remainingToRead, + totalRead); + currentPosition = endPosition; + return MemoryLogRecords.EMPTY; + } + + currentPosition += totalBatchSize; + return MemoryLogRecords.pointToBytes(fullBuffer); + } + + /** Reads exactly {@code length} bytes into the buffer, retrying on partial reads. */ + private int readFully(byte[] buffer, int offset, int length) throws IOException { + int totalRead = 0; + while (totalRead < length) { + int read = inputStream.read(buffer, offset + totalRead, length - totalRead); + if (read < 0) { + break; + } + totalRead += read; + } + return totalRead; + } + + /** Returns true if the entire segment has been read. */ + boolean isExhausted() { + return currentPosition >= endPosition; + } + + @Override + public void close() { + try { + inputStream.close(); + } catch (IOException e) { + LOG.warn("Failed to close remote segment input stream.", e); + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java index ac64fbdbdb..2122b152de 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java @@ -20,7 +20,6 @@ import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.TestingScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.metadata.TableBucket; @@ -116,7 +115,6 @@ protected void setup() throws Exception { clientConf, metadataUpdater, scannerMetricGroup, - new RemoteFileDownloader(1), TEST_SCHEMA_GETTER); } @@ -438,7 +436,6 @@ private LogFetcher createFetcherWithBuckets(Map scanBuckets) clientConf, metadataUpdater, scannerMetricGroup, - new RemoteFileDownloader(1), TEST_SCHEMA_GETTER); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 50addfcfe0..5094551cdf 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -22,7 +22,6 @@ import org.apache.fluss.client.metadata.ClientSchemaGetter; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.TestingScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; @@ -108,7 +107,6 @@ protected void setup() throws Exception { clientConf, metadataUpdater, TestingScannerMetricGroup.newInstance(), - new RemoteFileDownloader(1), clientSchemaGetter); } @@ -183,7 +181,6 @@ void testFetchWithSchemaChange() throws Exception { clientConf, metadataUpdater, TestingScannerMetricGroup.newInstance(), - new RemoteFileDownloader(1), clientSchemaGetter); newSchemaLogFetcher.sendFetches(); // The fetcher is async to fetch data, so we need to wait the result write to the @@ -282,7 +279,6 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception { clientConf, metadataUpdater, TestingScannerMetricGroup.newInstance(), - new RemoteFileDownloader(1), clientSchemaGetter); // send fetches to fetch data, should have no available fetch. @@ -323,7 +319,6 @@ void testFetchWithInvalidTableOrPartitions() throws Exception { clientConf, metadataUpdater1, TestingScannerMetricGroup.newInstance(), - new RemoteFileDownloader(1), clientSchemaGetter); ExecutorService executor = Executors.newSingleThreadExecutor(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java index 576eab5dae..335b147719 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.client.metadata.TestingClientSchemaGetter; import org.apache.fluss.client.metadata.TestingMetadataUpdater; import org.apache.fluss.client.metrics.TestingScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -84,7 +83,6 @@ public void setup() { new Configuration(), metadataUpdater, TestingScannerMetricGroup.newInstance(), - new RemoteFileDownloader(1), clientSchemaGetter); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java index e5fe7290cf..c8651a2bba 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java @@ -18,44 +18,38 @@ package org.apache.fluss.client.table.scanner.log; import org.apache.fluss.client.table.scanner.ScanRecord; -import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.LogFormat; -import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.ChangeType; -import org.apache.fluss.record.FileLogRecords; import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.record.LogRecords; +import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.record.TestingSchemaGetter; -import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataTypes; -import org.apache.fluss.types.RowType; import org.apache.fluss.utils.Projection; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; -import java.io.File; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; import static org.apache.fluss.record.TestData.DATA2; -import static org.apache.fluss.record.TestData.DATA2_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA2_SCHEMA; import static org.apache.fluss.record.TestData.DATA2_TABLE_ID; @@ -63,15 +57,13 @@ import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH; import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; -import static org.apache.fluss.testutils.DataTestUtils.genLogFile; -import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir; +import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link org.apache.fluss.client.table.scanner.log.RemoteCompletedFetch}. */ class RemoteCompletedFetchTest { private LogScannerStatus logScannerStatus; private LogRecordReadContext remoteReadContext; - private @TempDir File tempDir; private TableInfo tableInfo; private SchemaGetter schemaGetter; @@ -103,13 +95,11 @@ void testSimple() throws Exception { long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); - FileLogRecords fileLogRecords = - createFileLogRecords( - tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); + MemoryLogRecords memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( tableBucket, - fileLogRecords, + memoryLogRecords, fetchOffset, null, () -> recycleCalled.set(true)); @@ -122,9 +112,7 @@ void testSimple() throws Exception { scanRecords = completedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(2); assertThat(scanRecords.get(0).logOffset()).isEqualTo(8L); - // when read finish, the file channel should be closed. - assertThat(fileLogRecords.channel().isOpen()).isFalse(); - // and recycle should be called. + // recycle should be called when read finishes. assertThat(recycleCalled.get()).isTrue(); // no more records can be read @@ -137,15 +125,10 @@ void testFetchForPartitionTable() throws Exception { long fetchOffset = 0L; TableBucket tb = new TableBucket(DATA2_TABLE_ID, (long) 0, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); - FileLogRecords fileLogRecords = - createFileLogRecords( - tb, - PhysicalTablePath.of(DATA2_TABLE_PATH, "20240904"), - DATA2, - LogFormat.ARROW); + MemoryLogRecords memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( - tb, fileLogRecords, fetchOffset, null, () -> recycleCalled.set(true)); + tb, memoryLogRecords, fetchOffset, null, () -> recycleCalled.set(true)); List scanRecords = completedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(8); @@ -155,9 +138,7 @@ void testFetchForPartitionTable() throws Exception { scanRecords = completedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(2); assertThat(scanRecords.get(0).logOffset()).isEqualTo(8L); - // when read finish, the file channel should be closed. - assertThat(fileLogRecords.channel().isOpen()).isFalse(); - // and recycle should be called. + // recycle should be called when read finishes. assertThat(recycleCalled.get()).isTrue(); // no more records can be read @@ -169,11 +150,9 @@ void testFetchForPartitionTable() throws Exception { void testNegativeFetchCount() throws Exception { long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); - FileLogRecords fileLogRecords = - createFileLogRecords( - tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); + MemoryLogRecords memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = - makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); + makeCompletedFetch(tableBucket, memoryLogRecords, fetchOffset, null); List scanRecords = completedFetch.fetchRecords(-10); assertThat(scanRecords.size()).isEqualTo(0); @@ -183,14 +162,10 @@ void testNegativeFetchCount() throws Exception { void testNoRecordsInFetch() throws Exception { long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); - FileLogRecords fileLogRecords = - createFileLogRecords( - tableBucket, - DATA2_PHYSICAL_TABLE_PATH, - Collections.emptyList(), - LogFormat.ARROW); + MemoryLogRecords memoryLogRecords = + createMemoryLogRecords(Collections.emptyList(), LogFormat.ARROW); RemoteCompletedFetch completedFetch = - makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); + makeCompletedFetch(tableBucket, memoryLogRecords, fetchOffset, null); List scanRecords = completedFetch.fetchRecords(10); assertThat(scanRecords.size()).isEqualTo(0); @@ -224,11 +199,13 @@ void testProjection(String format) throws Exception { System.currentTimeMillis()); long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); - FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, logFormat); + MemoryLogRecords memoryLogRecords = createMemoryLogRecords(DATA2, logFormat); RemoteCompletedFetch completedFetch = makeCompletedFetch( - tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {0, 2})); + tableBucket, + memoryLogRecords, + fetchOffset, + Projection.of(new int[] {0, 2})); List scanRecords = completedFetch.fetchRecords(8); List expectedObjects = @@ -254,7 +231,10 @@ void testProjection(String format) throws Exception { completedFetch = makeCompletedFetch( - tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {2, 0})); + tableBucket, + memoryLogRecords, + fetchOffset, + Projection.of(new int[] {2, 0})); scanRecords = completedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(8); for (int i = 0; i < scanRecords.size(); i++) { @@ -268,37 +248,30 @@ void testProjection(String format) throws Exception { } } - private FileLogRecords createFileLogRecords( - TableBucket tableBucket, - PhysicalTablePath physicalTablePath, - List objects, - LogFormat logFormat) + private MemoryLogRecords createMemoryLogRecords(List objects, LogFormat logFormat) throws Exception { - UUID segmentId = UUID.randomUUID(); - RemoteLogSegment remoteLogSegment = - RemoteLogSegment.Builder.builder() - .tableBucket(tableBucket) - .physicalTablePath(physicalTablePath) - .remoteLogSegmentId(segmentId) - .remoteLogStartOffset(0L) - .remoteLogEndOffset(9L) - .segmentSizeInBytes(Integer.MAX_VALUE) - .build(); - File logFile = - genRemoteLogSegmentFile( - DATA2_ROW_TYPE, tempDir, remoteLogSegment, objects, 0L, logFormat); - return FileLogRecords.open(logFile, false); + if (objects.isEmpty()) { + return MemoryLogRecords.EMPTY; + } + return createRecordsWithoutBaseLogOffset( + DATA2_ROW_TYPE, + DEFAULT_SCHEMA_ID, + 0L, + System.currentTimeMillis(), + CURRENT_LOG_MAGIC_VALUE, + objects, + logFormat); } private RemoteCompletedFetch makeCompletedFetch( TableBucket tableBucket, - FileLogRecords fileLogRecords, + LogRecords logRecords, long fetchOffset, @Nullable Projection projection, Runnable recycle) { return new RemoteCompletedFetch( tableBucket, - fileLogRecords, + logRecords, 10L, LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter), logScannerStatus, @@ -309,25 +282,9 @@ private RemoteCompletedFetch makeCompletedFetch( private RemoteCompletedFetch makeCompletedFetch( TableBucket tableBucket, - FileLogRecords fileLogRecords, + LogRecords logRecords, long fetchOffset, @Nullable Projection projection) { - return makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, projection, () -> {}); - } - - private static File genRemoteLogSegmentFile( - RowType rowType, - File remoteLogTabletDir, - RemoteLogSegment remoteLogSegment, - List objects, - long baseOffset, - LogFormat logFormat) - throws Exception { - FsPath remoteLogSegmentDir = - remoteLogSegmentDir( - new FsPath(remoteLogTabletDir.getAbsolutePath()), - remoteLogSegment.remoteLogSegmentId()); - return genLogFile( - rowType, new File(remoteLogSegmentDir.toString()), objects, baseOffset, logFormat); + return makeCompletedFetch(tableBucket, logRecords, fetchOffset, projection, () -> {}); } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java index 407756a7eb..6812e8add4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java @@ -19,15 +19,17 @@ import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.metrics.TestingScannerMetricGroup; -import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.client.table.scanner.log.RemoteLogDownloader.RemoteLogDownloadRequest; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecords; import org.apache.fluss.remote.RemoteLogSegment; -import org.apache.fluss.utils.FileUtils; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.IOUtils; import org.junit.jupiter.api.BeforeEach; @@ -35,28 +37,29 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; -import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithBaseOffset; import static org.apache.fluss.testutils.DataTestUtils.genRemoteLogSegmentFile; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.apache.fluss.utils.FlussPaths.remoteLogDir; +import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir; import static org.apache.fluss.utils.FlussPaths.remoteLogTabletDir; import static org.assertj.core.api.Assertions.assertThat; @@ -64,7 +67,7 @@ class RemoteLogDownloaderTest { private @TempDir File remoteDataDir; - private @TempDir File localDir; + private @TempDir File tmpDir; private FsPath remoteLogDir; private Configuration conf; private ScannerMetricGroup scannerMetricGroup; @@ -73,23 +76,20 @@ class RemoteLogDownloaderTest { void beforeEach() { conf = new Configuration(); conf.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir.getAbsolutePath()); - conf.set(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, localDir.getAbsolutePath()); conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 4); + conf.set(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, tmpDir.getAbsolutePath()); remoteLogDir = remoteLogDir(conf); scannerMetricGroup = TestingScannerMetricGroup.newInstance(); } @Test void testPrefetchNum() throws Exception { - RemoteFileDownloader remoteFileDownloader = new RemoteFileDownloader(1); RemoteLogDownloader remoteLogDownloader = - new RemoteLogDownloader( - DATA1_TABLE_PATH, conf, remoteFileDownloader, scannerMetricGroup, 10L); + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); try { // trigger auto download. remoteLogDownloader.start(); - Path localLogDir = remoteLogDownloader.getLocalLogDir(); TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); List remoteLogSegments = buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 5, conf, 10); @@ -98,7 +98,7 @@ void testPrefetchNum() throws Exception { List futures = requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, remoteLogSegments); - // the first 4 segments should success. + // the first 4 segments should success (each segment is small, fully read in one chunk). retry( Duration.ofMinutes(1), () -> { @@ -107,66 +107,40 @@ void testPrefetchNum() throws Exception { } }); - assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4); assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4); assertThat(scannerMetricGroup.remoteFetchBytes().getCount()) .isEqualTo( remoteLogSegmentFilesLength(remoteLogSegments, remoteLogTabletDir, 4)); assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0); + // Consume the first chunk to release semaphore and allow 5th segment. futures.get(0).getRecycleCallback().run(); // the 5th segment should success. retry(Duration.ofMinutes(1), () -> assertThat(futures.get(4).isDone()).isTrue()); - assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4); assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5); assertThat(scannerMetricGroup.remoteFetchBytes().getCount()) .isEqualTo( remoteLogSegmentFilesLength(remoteLogSegments, remoteLogTabletDir, 5)); assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0); + // consume remaining first 4 segment chunks to release semaphore futures.get(1).getRecycleCallback().run(); futures.get(2).getRecycleCallback().run(); - assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2); - // the removal of log files are async, so we need to wait for the removal. - retry( - Duration.ofMinutes(1), - () -> assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(2)); - - // test cleanup + futures.get(3).getRecycleCallback().run(); + futures.get(4).getRecycleCallback().run(); + // Stop the download thread before checking permits. The idle thread + // briefly holds a permit during its poll() loop which causes flakiness. remoteLogDownloader.close(); - assertThat(localLogDir.toFile().exists()).isFalse(); + assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(4); } finally { IOUtils.closeQuietly(remoteLogDownloader); - IOUtils.closeQuietly(remoteFileDownloader); } } @Test - void testDownloadLogInParallelAndInPriority() throws Exception { - class TestRemoteFileDownloader extends RemoteFileDownloader { - final Set threadNames = Collections.synchronizedSet(new HashSet<>()); - - private TestRemoteFileDownloader(int threadNum) { - super(threadNum); - } - - @Override - protected long downloadFile(Path targetFilePath, FsPath remoteFilePath) - throws IOException { - threadNames.add(Thread.currentThread().getName()); - return super.downloadFile(targetFilePath, remoteFilePath); - } - } - - // prepare the environment, 4 download threads, pre-fetch 4 segments, 10 segments to fetch. - TestRemoteFileDownloader fileDownloader = new TestRemoteFileDownloader(4); + void testDownloadInPriority() throws Exception { RemoteLogDownloader remoteLogDownloader = - new RemoteLogDownloader( - DATA1_TABLE_PATH, - conf, // max 4 pre-fetch num - fileDownloader, - scannerMetricGroup, - 10L); + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); TableBucket bucket1 = new TableBucket(DATA1_TABLE_ID, 1); TableBucket bucket2 = new TableBucket(DATA1_TABLE_ID, 2); TableBucket bucket3 = new TableBucket(DATA1_TABLE_ID, 3); @@ -191,7 +165,7 @@ protected long downloadFile(Path targetFilePath, FsPath remoteFilePath) remoteLogTabletDir( remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, segment.tableBucket()); RemoteLogDownloadFuture future = - remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment); + remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment, 0); futures.put(segment.remoteLogSegmentId(), future); } @@ -214,8 +188,6 @@ protected long downloadFile(Path targetFilePath, FsPath remoteFilePath) assertThat(future.isDone()).isTrue(); } }); - // make sure 4 threads are used. - assertThat(fileDownloader.threadNames.size()).isEqualTo(4); // only 4 segments are pre-fetched. assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(totalSegments - 4); @@ -230,7 +202,6 @@ protected long downloadFile(Path targetFilePath, FsPath remoteFilePath) // all segments are fetched. assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(0); } finally { - IOUtils.closeQuietly(fileDownloader); IOUtils.closeQuietly(remoteLogDownloader); } } @@ -282,6 +253,117 @@ void testOrderOfRemoteLogDownloadRequest() { assertThat(results).isEqualTo(expected); } + @Test + void testMaxPrefetchChunks() throws Exception { + // Set maxPrefetchChunks = 2 to verify flow control. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS, 2); + // Set prefetchNum = 1 so only one segment is active. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + RemoteLogDownloader remoteLogDownloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + remoteLogDownloader.start(); + + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + // Build a large segment with multiple records so multiple chunks are produced. + List remoteLogSegments = + buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 1, conf, 10); + FsPath remoteLogTabletDir = + remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + List futures = + requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, remoteLogSegments); + + // The first chunk should be done. + retry(Duration.ofMinutes(1), () -> assertThat(futures.get(0).isDone()).isTrue()); + + // Verify the maxPrefetchChunks is correctly set. + assertThat(remoteLogDownloader.getMaxPrefetchChunks()).isEqualTo(2); + + // Consume to release the segment. + futures.get(0).getRecycleCallback().run(); + + // Stop the download thread before checking permits. + remoteLogDownloader.close(); + assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(1); + } finally { + IOUtils.closeQuietly(remoteLogDownloader); + } + } + + @Test + void testTempFileCleanup() throws Exception { + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + RemoteLogDownloader remoteLogDownloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + remoteLogDownloader.start(); + + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + List remoteLogSegments = + buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 1, conf, 10); + FsPath remoteLogTabletDir = + remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + List futures = + requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, remoteLogSegments); + + // Wait for first chunk. + retry(Duration.ofMinutes(1), () -> assertThat(futures.get(0).isDone()).isTrue()); + + // Verify temp files exist before cleanup. + File[] tmpFiles = tmpDir.listFiles((dir, name) -> name.startsWith("remote-chunk-")); + assertThat(tmpFiles).isNotNull(); + assertThat(tmpFiles.length).isGreaterThanOrEqualTo(1); + + // Consume the chunk to trigger cleanup. + futures.get(0).getRecycleCallback().run(); + + // Stop the download thread before checking permits. + remoteLogDownloader.close(); + assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(1); + + // Verify temp files are cleaned up. + tmpFiles = tmpDir.listFiles((dir, name) -> name.startsWith("remote-chunk-")); + assertThat(tmpFiles == null || tmpFiles.length == 0).isTrue(); + } finally { + IOUtils.closeQuietly(remoteLogDownloader); + } + } + + @Test + void testChunkDataIsReadableFromSlice() throws Exception { + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + RemoteLogDownloader remoteLogDownloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + remoteLogDownloader.start(); + + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + List remoteLogSegments = + buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 1, conf, 10); + FsPath remoteLogTabletDir = + remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + List futures = + requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, remoteLogSegments); + + // Wait for first chunk. + retry(Duration.ofMinutes(1), () -> assertThat(futures.get(0).isDone()).isTrue()); + + // Verify the returned LogRecords is readable and has data. + LogRecords logRecords = futures.get(0).getLogRecords(); + assertThat(logRecords.sizeInBytes()).isGreaterThan(0); + assertThat(logRecords.batches().iterator().hasNext()).isTrue(); + + // Consume to cleanup. + futures.get(0).getRecycleCallback().run(); + + // Stop the download thread before checking permits. + remoteLogDownloader.close(); + assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(1); + } finally { + IOUtils.closeQuietly(remoteLogDownloader); + } + } + private RemoteLogDownloadRequest createDownloadRequest( TableBucket tableBucket, long startOffset, long maxTimestamp) { RemoteLogSegment remoteLogSegment = @@ -294,7 +376,8 @@ private RemoteLogDownloadRequest createDownloadRequest( .maxTimestamp(maxTimestamp) .segmentSizeInBytes(Integer.MAX_VALUE) .build(); - return new RemoteLogDownloadRequest(remoteLogSegment, remoteLogDir); + return new RemoteLogDownloadRequest( + remoteLogSegment, remoteLogDir, 0, new CompletableFuture<>()); } private List requestRemoteLogs( @@ -304,7 +387,7 @@ private List requestRemoteLogs( List futures = new ArrayList<>(); for (RemoteLogSegment segment : remoteLogSegments) { RemoteLogDownloadFuture future = - remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment); + remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment, 0); futures.add(future); } return futures; @@ -320,12 +403,11 @@ private static List buildRemoteLogSegmentList( List remoteLogSegmentList = new ArrayList<>(); for (int i = 0; i < num; i++) { long baseOffset = i * 10L; - UUID segmentId = UUID.randomUUID(); RemoteLogSegment remoteLogSegment = RemoteLogSegment.Builder.builder() .tableBucket(tableBucket) .physicalTablePath(physicalTablePath) - .remoteLogSegmentId(segmentId) + .remoteLogSegmentId(UUID.randomUUID()) .remoteLogStartOffset(baseOffset) .remoteLogEndOffset(baseOffset + 9) .maxTimestamp(maxTimestamp) @@ -338,6 +420,311 @@ private static List buildRemoteLogSegmentList( return remoteLogSegmentList; } + /** + * Tests chunked download with a small chunk size, verifying that: 1. A large segment is split + * into multiple chunks. 2. Chunks can be consumed while subsequent chunks are still being + * downloaded (边读边下载). 3. Flow control (maxPrefetchChunks) pauses downloading when unconsumed + * chunks reach the limit. + */ + @Test + void testChunkedDownloadAndReadWhileDownloading() throws Exception { + // Use a small chunk size (200 bytes) to force multiple chunks from one segment. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE, MemorySize.parse("200b")); + // Allow at most 2 unconsumed chunks ahead. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS, 2); + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + + RemoteLogDownloader downloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + // Build a segment with 20 batches so total size > 200 bytes * multiple chunks. + RemoteLogSegment segment = buildLargeRemoteLogSegment(tb, 20); + FsPath tabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + + // Collect all chunk futures via a self-referencing callback chain. + List chunkFutures = + Collections.synchronizedList(new ArrayList<>()); + RemoteLogDownloadFuture firstFuture = + downloader.requestRemoteLog(tabletDir, segment, 0); + chunkFutures.add(firstFuture); + + Consumer chunkCollector = + new Consumer() { + @Override + public void accept(RemoteLogDownloadFuture nextFuture) { + chunkFutures.add(nextFuture); + nextFuture.setNextChunkCallback(this); + } + }; + firstFuture.setNextChunkCallback(chunkCollector); + + // Start after callback is set to avoid race condition. + downloader.start(); + + // Wait for the first chunk to be ready. + retry(Duration.ofMinutes(1), () -> assertThat(chunkFutures.get(0).isDone()).isTrue()); + + // With maxPrefetchChunks=2, we expect at most 2 chunks downloaded before consumption. + retry( + Duration.ofSeconds(5), + () -> assertThat(chunkFutures.size()).isGreaterThanOrEqualTo(2)); + + // Read chunks while downloading continues (边读边下载). + int totalBytesRead = 0; + int chunksConsumed = 0; + while (true) { + if (chunksConsumed >= chunkFutures.size()) { + // Wait for next chunk if available. + Thread.sleep(50); + if (chunksConsumed >= chunkFutures.size()) { + break; + } + } + RemoteLogDownloadFuture chunkFuture = chunkFutures.get(chunksConsumed); + waitUntil(chunkFuture::isDone, Duration.ofMinutes(1), "chunk download timeout"); + LogRecords records = chunkFuture.getLogRecords(); + if (records.sizeInBytes() == 0) { + chunkFuture.getRecycleCallback().run(); + chunksConsumed++; + break; + } + totalBytesRead += records.sizeInBytes(); + // Consuming a chunk triggers more downloads. + chunkFuture.getRecycleCallback().run(); + chunksConsumed++; + } + + // Verify we got multiple chunks (not a single monolithic download). + assertThat(chunksConsumed).isGreaterThan(1); + + // Verify total bytes matches the segment file size. + File segFile = + new File( + RemoteLogDownloader.getRemoteLogFilePath(tabletDir, segment).getPath()); + assertThat(totalBytesRead).isEqualTo((int) segFile.length()); + } finally { + IOUtils.closeQuietly(downloader); + } + } + + /** + * Tests segment switching: after all chunks of segment 1 are consumed, the downloader + * automatically starts downloading segment 2. + */ + @Test + void testSegmentSwitchingAfterChunksConsumed() throws Exception { + // Small chunk size to produce multiple chunks per segment. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE, MemorySize.parse("200b")); + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS, 3); + // Only 1 segment can be active at a time. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + + RemoteLogDownloader downloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + // Create 2 segments, each with multiple batches. + RemoteLogSegment segment1 = buildLargeRemoteLogSegment(tb, 10); + RemoteLogSegment segment2 = buildLargeRemoteLogSegment(tb, 10); + FsPath tabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + + // Request both segments. + List seg1Chunks = + Collections.synchronizedList(new ArrayList<>()); + RemoteLogDownloadFuture seg1First = downloader.requestRemoteLog(tabletDir, segment1, 0); + seg1Chunks.add(seg1First); + Consumer seg1Collector = + new Consumer() { + @Override + public void accept(RemoteLogDownloadFuture f) { + seg1Chunks.add(f); + f.setNextChunkCallback(this); + } + }; + seg1First.setNextChunkCallback(seg1Collector); + + List seg2Chunks = + Collections.synchronizedList(new ArrayList<>()); + RemoteLogDownloadFuture seg2First = downloader.requestRemoteLog(tabletDir, segment2, 0); + seg2Chunks.add(seg2First); + Consumer seg2Collector = + new Consumer() { + @Override + public void accept(RemoteLogDownloadFuture f) { + seg2Chunks.add(f); + f.setNextChunkCallback(this); + } + }; + seg2First.setNextChunkCallback(seg2Collector); + + // Start after all callbacks are set to avoid race condition. + downloader.start(); + + // Segment 1 should start downloading first (prefetchNum=1 blocks segment 2). + retry(Duration.ofMinutes(1), () -> assertThat(seg1Chunks.get(0).isDone()).isTrue()); + // Segment 2 should NOT have started yet. + assertThat(seg2First.isDone()).isFalse(); + + // Consume all chunks from segment 1. + int seg1Index = 0; + while (true) { + if (seg1Index >= seg1Chunks.size()) { + Thread.sleep(50); + if (seg1Index >= seg1Chunks.size()) { + break; + } + } + RemoteLogDownloadFuture chunkFuture = seg1Chunks.get(seg1Index); + waitUntil(chunkFuture::isDone, Duration.ofMinutes(1), "seg1 chunk timeout"); + LogRecords records = chunkFuture.getLogRecords(); + chunkFuture.getRecycleCallback().run(); + seg1Index++; + if (records.sizeInBytes() == 0) { + break; + } + } + + // After segment 1 is fully consumed, segment 2 should start downloading. + retry(Duration.ofMinutes(1), () -> assertThat(seg2Chunks.get(0).isDone()).isTrue()); + + // Verify segment 2 data is readable. + LogRecords seg2Records = seg2Chunks.get(0).getLogRecords(); + assertThat(seg2Records.sizeInBytes()).isGreaterThan(0); + + // Consume segment 2 chunks to cleanup. + int seg2Index = 0; + while (true) { + if (seg2Index >= seg2Chunks.size()) { + Thread.sleep(50); + if (seg2Index >= seg2Chunks.size()) { + break; + } + } + RemoteLogDownloadFuture chunkFuture = seg2Chunks.get(seg2Index); + waitUntil(chunkFuture::isDone, Duration.ofMinutes(1), "seg2 chunk timeout"); + // Read data BEFORE recycling: recycleCallback may trigger cleanup of the backing + // file, making subsequent getLogRecords() calls unsafe. + boolean isEmpty = chunkFuture.getLogRecords().sizeInBytes() == 0; + chunkFuture.getRecycleCallback().run(); + seg2Index++; + if (isEmpty) { + break; + } + } + } finally { + IOUtils.closeQuietly(downloader); + } + } + + /** + * Tests that the configurable chunk size option works correctly by comparing the number of + * chunks produced with different chunk sizes. + */ + @Test + void testConfigurableChunkSize() throws Exception { + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1); + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS, 100); + + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + RemoteLogSegment segment = buildLargeRemoteLogSegment(tb, 30); + FsPath tabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + + // Test with a very small chunk size (100 bytes). + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE, MemorySize.parse("100b")); + int smallChunkCount = countChunksForSegment(segment, tabletDir); + + // Test with a large chunk size (10 MB) - should produce 1 chunk. + conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE, MemorySize.parse("10mb")); + int largeChunkCount = countChunksForSegment(segment, tabletDir); + + // Small chunk size should produce more chunks. + assertThat(smallChunkCount).isGreaterThan(largeChunkCount); + // Large chunk size should read the entire segment in one chunk. + assertThat(largeChunkCount).isEqualTo(1); + } + + private int countChunksForSegment(RemoteLogSegment segment, FsPath tabletDir) throws Exception { + RemoteLogDownloader downloader = + new RemoteLogDownloader(DATA1_TABLE_PATH, conf, scannerMetricGroup, 10L); + try { + List chunks = Collections.synchronizedList(new ArrayList<>()); + RemoteLogDownloadFuture first = downloader.requestRemoteLog(tabletDir, segment, 0); + chunks.add(first); + Consumer collector = + new Consumer() { + @Override + public void accept(RemoteLogDownloadFuture f) { + chunks.add(f); + f.setNextChunkCallback(this); + } + }; + first.setNextChunkCallback(collector); + + // Start after callback is set. + downloader.start(); + + int nonEmptyChunks = 0; + int consumed = 0; + while (true) { + if (consumed >= chunks.size()) { + Thread.sleep(50); + if (consumed >= chunks.size()) { + break; + } + } + RemoteLogDownloadFuture chunkFuture = chunks.get(consumed); + waitUntil(chunkFuture::isDone, Duration.ofMinutes(1), "chunk timeout"); + LogRecords records = chunkFuture.getLogRecords(); + chunkFuture.getRecycleCallback().run(); + consumed++; + if (records.sizeInBytes() == 0) { + break; + } + nonEmptyChunks++; + } + return nonEmptyChunks; + } finally { + IOUtils.closeQuietly(downloader); + } + } + + /** + * Builds a remote log segment file with multiple batches to ensure the segment is large enough + * to span multiple chunks when a small chunk size is configured. + */ + private RemoteLogSegment buildLargeRemoteLogSegment(TableBucket tb, int numBatches) + throws Exception { + UUID segmentId = UUID.randomUUID(); + RemoteLogSegment segment = + RemoteLogSegment.Builder.builder() + .tableBucket(tb) + .physicalTablePath(DATA1_PHYSICAL_TABLE_PATH) + .remoteLogSegmentId(segmentId) + .remoteLogStartOffset(0) + .remoteLogEndOffset((long) numBatches * 10 - 1) + .maxTimestamp(10) + .segmentSizeInBytes(Integer.MAX_VALUE) + .build(); + + FsPath tabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb); + FsPath segDir = remoteLogSegmentDir(tabletDir, segmentId); + File segDirFile = new File(segDir.toString()); + if (!segDirFile.exists()) { + segDirFile.mkdirs(); + } + + File logFile = FlussPaths.logFile(segDirFile, 0); + FileLogRecords fileLogRecords = + FileLogRecords.open(logFile, false, 10 * 1024 * 1024, false); + for (int i = 0; i < numBatches; i++) { + fileLogRecords.append(genMemoryLogRecordsWithBaseOffset(i * 10L, DATA1)); + } + fileLogRecords.flush(); + fileLogRecords.close(); + return segment; + } + private static Long remoteLogSegmentFilesLength( List remoteLogSegments, FsPath remoteLogTabletDir, int segmentNum) { return remoteLogSegments.stream() @@ -345,9 +732,8 @@ private static Long remoteLogSegmentFilesLength( .mapToLong( segment -> new File( - RemoteLogDownloader.getFsPathAndFileName( + RemoteLogDownloader.getRemoteLogFilePath( remoteLogTabletDir, segment) - .getPath() .getPath()) .length()) .sum(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java index 84b50af2c7..9194b72805 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java @@ -245,6 +245,51 @@ void testPartitionTableFetchFromRemote() throws Exception { table, DATA2_ROW_TYPE, expectPartitionAppendRows); } + @Test + void testScanManyRemoteSegments() throws Exception { + TablePath tablePath = TablePath.of("test_db_many_seg", "test_many_remote_segments"); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build(); + long tableId = createTable(tablePath, tableDescriptor); + + // Write many records to generate many remote log segments. + // With LOG_SEGMENT_FILE_SIZE = 1kb, each segment holds only a few records, + // so 500 records will produce dozens of segments that exercise the chunk + // download pipeline: prefetch semaphore, continuation queue, temp file + // lifecycle and cleanup. + int recordSize = 500; + List expectedRows = new ArrayList<>(); + Table table = conn.getTable(tablePath); + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 0; i < recordSize; i++) { + GenericRow row = row(i, "val_" + i + "_padding_to_fill_segments_faster"); + expectedRows.add(row); + appendWriter.append(row); + if (i % 10 == 0) { + appendWriter.flush(); + } + } + appendWriter.flush(); + + FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0)); + + // Scan from beginning and verify all records are downloaded correctly. + LogScanner logScanner = table.newScan().createLogScanner(); + logScanner.subscribeFromBeginning(0); + List rowList = new ArrayList<>(); + while (rowList.size() < recordSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + InternalRow row = scanRecord.getRow(); + rowList.add(row(row.getInt(0), row.getString(1))); + } + } + assertThat(rowList).hasSize(recordSize); + assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRows); + logScanner.close(); + } + @AfterEach protected void teardown() throws Exception { if (admin != null) { diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..2d8c9fb843 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1271,6 +1271,24 @@ public class ConfigOptions { "The number of remote log segments to keep in local temp file for LogScanner, " + "which download from remote storage. The default setting is 4."); + public static final ConfigOption CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE = + key("client.scanner.remote-log.chunk-size") + .memoryType() + .defaultValue(MemorySize.parse("8mb")) + .withDescription( + "The size of each chunk when downloading remote log segments. " + + "A larger chunk size reduces the number of remote I/O requests but " + + "increases memory usage per chunk read. The default setting is 8MB."); + + public static final ConfigOption CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS = + key("client.scanner.remote-log.max-prefetch-chunks") + .intType() + .defaultValue(5) + .withDescription( + "The maximum number of pre-fetched but unconsumed chunks per remote log segment. " + + "The downloader pauses when this limit is reached " + + "and resumes when chunks are consumed."); + public static final ConfigOption CLIENT_SCANNER_IO_TMP_DIR = key("client.scanner.io.tmpdir") .stringType()