From 7a1504056518e0619a42efb647a4dc76c2686a68 Mon Sep 17 00:00:00 2001 From: Vladislav Sidorovich Date: Mon, 20 Apr 2026 00:25:43 +0200 Subject: [PATCH 1/5] Handle EOF in GCS inputStream --- .../iceberg/gcp/gcs/GCSInputStream.java | 14 +++++++++-- .../iceberg/gcp/gcs/TestGCSInputStream.java | 23 ++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java index 910e97e0c178..10f8569563f6 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java @@ -126,13 +126,19 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); singleByteBuffer.position(0); - pos += 1; + int bytesRead; try { - channel.read(singleByteBuffer); + bytesRead = channel.read(singleByteBuffer); } catch (IOException e) { GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId); throw e; } + + if (bytesRead == -1) { + return -1; + } + + pos += 1; readBytes.increment(); readOperations.increment(); @@ -144,6 +150,10 @@ public int read(byte[] b, int off, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : ByteBuffer.wrap(b); int bytesRead = read(channel, byteBuffer, off, len); + if (bytesRead == -1) { + return -1; + } + pos += bytesRead; readBytes.increment(bytesRead); readOperations.increment(); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java index 8cc85fad72fd..aa61e37a5f60 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; public class TestGCSInputStream { + private static final int EOF_FLAG = -1; private final Random random = new Random(1); @@ -54,7 +55,6 @@ public void testRead() throws Exception { try (SeekableInputStream in = new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) { int readSize = 1024; - byte[] actual = new byte[readSize]; readAndCheck(in, in.getPos(), readSize, data, false); readAndCheck(in, in.getPos(), readSize, data, true); @@ -92,6 +92,27 @@ public void testReadSingle() throws Exception { new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) { assertThat(in.read()).isEqualTo(i0); assertThat(in.read()).isEqualTo(i1); + assertThat(in.read()).isEqualTo(EOF_FLAG); + } + } + + @Test + public void testReadBufferedEOF() throws Exception { + BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read_buffered_eof.dat"); + int dataSize = 8; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize + 1]; + + writeGCSData(uri, expected); + + try (SeekableInputStream in = + new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) { + int bytesRead = in.read(actual, 0, dataSize + 1); + assertThat(bytesRead).isEqualTo(dataSize); + assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); + + assertThat(in.read(actual, 0, 10)).isEqualTo(EOF_FLAG); + assertThat(in.getPos()).isEqualTo(dataSize); } } From 987bd3382ce944d0fc2e8b7487fe0c9787069fb8 Mon Sep 17 00:00:00 2001 From: Vladislav Sidorovich Date: Mon, 20 Apr 2026 00:34:28 +0200 Subject: [PATCH 2/5] Align test path with other tests in TestGCSInputStream --- .../java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java index aa61e37a5f60..d53c96a1c511 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java @@ -98,7 +98,7 @@ public void testReadSingle() throws Exception { @Test public void testReadBufferedEOF() throws Exception { - BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read_buffered_eof.dat"); + BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat"); int dataSize = 8; byte[] expected = randomData(dataSize); byte[] actual = new byte[dataSize + 1]; From 1979c575c10ffe1f6ff2164da0e1e06846145b68 Mon Sep 17 00:00:00 2001 From: Vladislav Sidorovich Date: Mon, 20 Apr 2026 20:06:19 +0200 Subject: [PATCH 3/5] Rename constant in TestGCSInputStream --- .../java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java index d53c96a1c511..107b668fa94c 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java @@ -37,7 +37,7 @@ import org.junit.jupiter.api.Test; public class TestGCSInputStream { - private static final int EOF_FLAG = -1; + private static final int EOF = -1; private final Random random = new Random(1); @@ -92,7 +92,7 @@ public void testReadSingle() throws Exception { new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) { assertThat(in.read()).isEqualTo(i0); assertThat(in.read()).isEqualTo(i1); - assertThat(in.read()).isEqualTo(EOF_FLAG); + assertThat(in.read()).isEqualTo(EOF); } } @@ -111,7 +111,7 @@ public void testReadBufferedEOF() throws Exception { assertThat(bytesRead).isEqualTo(dataSize); assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); - assertThat(in.read(actual, 0, 10)).isEqualTo(EOF_FLAG); + assertThat(in.read(actual, 0, 10)).isEqualTo(EOF); assertThat(in.getPos()).isEqualTo(dataSize); } } From 66db6a8c24b55fd9b3aaf1164f2150ca0ba2c847 Mon Sep 17 00:00:00 2001 From: Vladislav Sidorovich Date: Mon, 20 Apr 2026 20:34:34 +0200 Subject: [PATCH 4/5] Handle EOF in GCS, S3, ADLS input streams --- .../iceberg/aws/s3/TestS3InputStream.java | 36 ++++++++++++ .../apache/iceberg/aws/s3/S3InputStream.java | 8 +++ .../azure/adlsv2/TestADLSInputStream.java | 22 +++++++ .../iceberg/azure/adlsv2/ADLSInputStream.java | 11 +++- .../azure/adlsv2/TestADLSInputStream.java | 57 ++++++++++++++++++- .../iceberg/gcp/gcs/GCSInputStream.java | 22 ++++--- 6 files changed, 142 insertions(+), 14 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index f8903842df37..b5fec2665192 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -42,6 +42,7 @@ @Testcontainers public class TestS3InputStream { @Container private static final MinIOContainer MINIO = MinioUtil.createContainer(); + private static final int EOF = -1; private final S3Client s3 = MinioUtil.createS3Client(MINIO); private final Random random = new Random(1); @@ -92,6 +93,41 @@ protected void testRead(S3Client s3Client) throws Exception { } } + @Test + public void testReadSingle() throws Exception { + S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); + int i0 = 1; + int i1 = 255; + byte[] data = {(byte) i0, (byte) i1}; + + writeS3Data(uri, data); + + try (SeekableInputStream in = newInputStream(s3, uri)) { + assertThat(in.read()).isEqualTo(i0); + assertThat(in.read()).isEqualTo(i1); + assertThat(in.read()).isEqualTo(EOF); + } + } + + @Test + public void testReadBufferedEOF() throws Exception { + S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); + int dataSize = 8; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize + 1]; + + writeS3Data(uri, expected); + + try (SeekableInputStream in = newInputStream(s3, uri)) { + int bytesRead = in.read(actual, 0, dataSize + 1); + assertThat(bytesRead).isEqualTo(dataSize); + assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); + + assertThat(in.read(actual, 0, 10)).isEqualTo(EOF); + assertThat(in.getPos()).isEqualTo(dataSize); + } + } + private void readAndCheck( SeekableInputStream in, long rangeStart, int size, byte[] original, boolean buffered) throws IOException { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 9c91cc58f8d5..ef889d23639b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -122,6 +122,10 @@ public int read() throws IOException { positionStream(); try { int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read()); + if (bytesRead == -1) { + return -1; + } + pos += 1; next += 1; readBytes.increment(); @@ -144,6 +148,10 @@ public int read(byte[] b, int off, int len) throws IOException { try { int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read(b, off, len)); + if (bytesRead == -1) { + return -1; + } + pos += bytesRead; next += bytesRead; readBytes.increment(bytesRead); diff --git a/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java b/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java index 1edf48eaec35..5af8e43b5b5f 100644 --- a/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java +++ b/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java @@ -35,6 +35,7 @@ public class TestADLSInputStream extends AzuriteTestBase { private static final String FILE_PATH = "path/to/file"; + private static final int EOF = -1; private final Random random = new Random(1); private final AzureProperties azureProperties = new AzureProperties(); @@ -99,6 +100,27 @@ public void testReadSingle() throws Exception { location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) { assertThat(in.read()).isEqualTo(i0); assertThat(in.read()).isEqualTo(i1); + assertThat(in.read()).isEqualTo(EOF); + } + } + + @Test + public void testReadBufferedEOF() throws Exception { + int dataSize = 8; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize + 1]; + + setupData(expected); + + try (SeekableInputStream in = + new ADLSInputStream( + location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) { + int bytesRead = in.read(actual, 0, dataSize + 1); + assertThat(bytesRead).isEqualTo(dataSize); + assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); + + assertThat(in.read(actual, 0, 10)).isEqualTo(EOF); + assertThat(in.getPos()).isEqualTo(dataSize); } } diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java index b1a2d3abfa32..bb96fc890010 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java @@ -114,12 +114,17 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); + int bytesRead = stream.read(); + if (bytesRead == -1) { + return -1; + } + pos += 1; next += 1; readBytes.increment(); readOperations.increment(); - return stream.read(); + return bytesRead; } @Override @@ -128,6 +133,10 @@ public int read(byte[] b, int off, int len) throws IOException { positionStream(); int bytesRead = stream.read(b, off, len); + if (bytesRead == -1) { + return -1; + } + pos += bytesRead; next += bytesRead; readBytes.increment(bytesRead); diff --git a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java index e98061846a88..a1047934553b 100644 --- a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java +++ b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.azure.adlsv2; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -25,8 +26,11 @@ import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.implementation.models.InternalDataLakeFileOpenInputStreamResult; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import org.apache.iceberg.metrics.MetricsContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,6 +39,7 @@ @ExtendWith(MockitoExtension.class) class TestADLSInputStream { + private static final int EOF = -1; @Mock private DataLakeFileClient fileClient; @Mock private InputStream inputStream; @@ -52,7 +57,57 @@ void before() { fileClient, 0L, mock(), - mock()); + MetricsContext.nullMetrics()); + } + + @Test + void testReadSingle() throws IOException { + int i0 = 1; + int i1 = 255; + byte[] data = {(byte) i0, (byte) i1}; + InputStream byteStream = new ByteArrayInputStream(data); + InternalDataLakeFileOpenInputStreamResult openInputStreamResult = + new InternalDataLakeFileOpenInputStreamResult(byteStream, mock()); + when(fileClient.openInputStream(any())).thenReturn(openInputStreamResult); + + try (ADLSInputStream in = + new ADLSInputStream( + "abfs://container@account.dfs.core.windows.net/path/to/file", + fileClient, + 2L, + mock(), + MetricsContext.nullMetrics())) { + + assertThat(in.read()).isEqualTo(i0); + assertThat(in.read()).isEqualTo(i1); + assertThat(in.read()).isEqualTo(EOF); + } + } + + @Test + void testReadBufferedEOF() throws IOException { + byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8}; + InputStream byteStream = new ByteArrayInputStream(data); + InternalDataLakeFileOpenInputStreamResult openInputStreamResult = + new InternalDataLakeFileOpenInputStreamResult(byteStream, mock()); + when(fileClient.openInputStream(any())).thenReturn(openInputStreamResult); + + try (ADLSInputStream in = + new ADLSInputStream( + "abfs://container@account.dfs.core.windows.net/path/to/file", + fileClient, + 8L, + mock(), + MetricsContext.nullMetrics())) { + + byte[] actual = new byte[10]; + int bytesRead = in.read(actual, 0, 10); + assertThat(bytesRead).isEqualTo(8); + assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(data); + + assertThat(in.read(actual, 0, 10)).isEqualTo(EOF); + assertThat(in.getPos()).isEqualTo(8); + } } @Test diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java index 10f8569563f6..1f59b8ffa2cb 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java @@ -126,23 +126,21 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); singleByteBuffer.position(0); - int bytesRead; try { - bytesRead = channel.read(singleByteBuffer); + int bytesRead = channel.read(singleByteBuffer); + if (bytesRead == -1) { + return -1; + } + + pos += 1; + readBytes.increment(); + readOperations.increment(); + + return singleByteBuffer.array()[0] & 0xFF; } catch (IOException e) { GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId); throw e; } - - if (bytesRead == -1) { - return -1; - } - - pos += 1; - readBytes.increment(); - readOperations.increment(); - - return singleByteBuffer.array()[0] & 0xFF; } @Override From 93739f6086655ff2695646de30e976c342ff8001 Mon Sep 17 00:00:00 2001 From: Vladislav Sidorovich Date: Mon, 20 Apr 2026 22:11:42 +0200 Subject: [PATCH 5/5] Fix ADLS test for EOF --- .../org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java b/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java index 5af8e43b5b5f..b06848524f51 100644 --- a/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java +++ b/azure/src/integration/java/org/apache/iceberg/azure/adlsv2/TestADLSInputStream.java @@ -119,7 +119,8 @@ public void testReadBufferedEOF() throws Exception { assertThat(bytesRead).isEqualTo(dataSize); assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); - assertThat(in.read(actual, 0, 10)).isEqualTo(EOF); + // Pos is in the end of data, any read should EOF + assertThat(in.read(actual, 0, actual.length)).isEqualTo(EOF); assertThat(in.getPos()).isEqualTo(dataSize); } }