Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@Testcontainers
public class TestS3InputStream {
@Container private static final MinIOContainer MINIO = MinioUtil.createContainer();
private static final int EOF = -1;

private final S3Client s3 = MinioUtil.createS3Client(MINIO);
private final Random random = new Random(1);
Expand Down Expand Up @@ -92,6 +93,41 @@ protected void testRead(S3Client s3Client) throws Exception {
}
}

@Test
public void testReadSingle() throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/read.dat");
int i0 = 1;
int i1 = 255;
byte[] data = {(byte) i0, (byte) i1};

writeS3Data(uri, data);

try (SeekableInputStream in = newInputStream(s3, uri)) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
assertThat(in.read()).isEqualTo(EOF);
}
}

@Test
public void testReadBufferedEOF() throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/read.dat");
int dataSize = 8;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize + 1];

writeS3Data(uri, expected);

try (SeekableInputStream in = newInputStream(s3, uri)) {
int bytesRead = in.read(actual, 0, dataSize + 1);
assertThat(bytesRead).isEqualTo(dataSize);
assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected);

assertThat(in.read(actual, 0, 10)).isEqualTo(EOF);
assertThat(in.getPos()).isEqualTo(dataSize);
}
}

private void readAndCheck(
SeekableInputStream in, long rangeStart, int size, byte[] original, boolean buffered)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public int read() throws IOException {
positionStream();
try {
int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read());
if (bytesRead == -1) {
return -1;
}

pos += 1;
next += 1;
readBytes.increment();
Expand All @@ -144,6 +148,10 @@ public int read(byte[] b, int off, int len) throws IOException {

try {
int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read(b, off, len));
if (bytesRead == -1) {
return -1;
}

pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class TestADLSInputStream extends AzuriteTestBase {

private static final String FILE_PATH = "path/to/file";
private static final int EOF = -1;

private final Random random = new Random(1);
private final AzureProperties azureProperties = new AzureProperties();
Expand Down Expand Up @@ -99,6 +100,28 @@ public void testReadSingle() throws Exception {
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
assertThat(in.read()).isEqualTo(EOF);
}
}

@Test
public void testReadBufferedEOF() throws Exception {
int dataSize = 8;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize + 1];

setupData(expected);

try (SeekableInputStream in =
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
int bytesRead = in.read(actual, 0, dataSize + 1);
assertThat(bytesRead).isEqualTo(dataSize);
assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected);

// Pos is in the end of data, any read should EOF
assertThat(in.read(actual, 0, actual.length)).isEqualTo(EOF);
assertThat(in.getPos()).isEqualTo(dataSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,17 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = stream.read();
if (bytesRead == -1) {
return -1;
}

pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();

return stream.read();
return bytesRead;
}

@Override
Expand All @@ -128,6 +133,10 @@ public int read(byte[] b, int off, int len) throws IOException {
positionStream();

int bytesRead = stream.read(b, off, len);
if (bytesRead == -1) {
return -1;
}

pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.iceberg.azure.adlsv2;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.implementation.models.InternalDataLakeFileOpenInputStreamResult;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.apache.iceberg.metrics.MetricsContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -35,6 +39,7 @@

@ExtendWith(MockitoExtension.class)
class TestADLSInputStream {
private static final int EOF = -1;

@Mock private DataLakeFileClient fileClient;
@Mock private InputStream inputStream;
Expand All @@ -52,7 +57,57 @@ void before() {
fileClient,
0L,
mock(),
mock());
MetricsContext.nullMetrics());
}

@Test
void testReadSingle() throws IOException {
int i0 = 1;
int i1 = 255;
byte[] data = {(byte) i0, (byte) i1};
InputStream byteStream = new ByteArrayInputStream(data);
InternalDataLakeFileOpenInputStreamResult openInputStreamResult =
new InternalDataLakeFileOpenInputStreamResult(byteStream, mock());
when(fileClient.openInputStream(any())).thenReturn(openInputStreamResult);

try (ADLSInputStream in =
new ADLSInputStream(
"abfs://container@account.dfs.core.windows.net/path/to/file",
fileClient,
2L,
mock(),
MetricsContext.nullMetrics())) {

assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
assertThat(in.read()).isEqualTo(EOF);
}
}

@Test
void testReadBufferedEOF() throws IOException {
byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8};
InputStream byteStream = new ByteArrayInputStream(data);
InternalDataLakeFileOpenInputStreamResult openInputStreamResult =
new InternalDataLakeFileOpenInputStreamResult(byteStream, mock());
when(fileClient.openInputStream(any())).thenReturn(openInputStreamResult);

try (ADLSInputStream in =
new ADLSInputStream(
"abfs://container@account.dfs.core.windows.net/path/to/file",
fileClient,
8L,
mock(),
MetricsContext.nullMetrics())) {

byte[] actual = new byte[10];
int bytesRead = in.read(actual, 0, 10);
assertThat(bytesRead).isEqualTo(8);
assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(data);

assertThat(in.read(actual, 0, 10)).isEqualTo(EOF);
assertThat(in.getPos()).isEqualTo(8);
}
}

@Test
Expand Down
20 changes: 14 additions & 6 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,32 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
singleByteBuffer.position(0);

pos += 1;
try {
channel.read(singleByteBuffer);
int bytesRead = channel.read(singleByteBuffer);
if (bytesRead == -1) {
return -1;
}

pos += 1;
readBytes.increment();
readOperations.increment();

return singleByteBuffer.array()[0] & 0xFF;
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
throw e;
}
readBytes.increment();
readOperations.increment();

return singleByteBuffer.array()[0] & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : ByteBuffer.wrap(b);
int bytesRead = read(channel, byteBuffer, off, len);
if (bytesRead == -1) {
return -1;
}

pos += bytesRead;
readBytes.increment(bytesRead);
readOperations.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.Test;

public class TestGCSInputStream {
private static final int EOF = -1;

private final Random random = new Random(1);

Expand All @@ -54,7 +55,6 @@ public void testRead() throws Exception {
try (SeekableInputStream in =
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
int readSize = 1024;
byte[] actual = new byte[readSize];

readAndCheck(in, in.getPos(), readSize, data, false);
readAndCheck(in, in.getPos(), readSize, data, true);
Expand Down Expand Up @@ -92,6 +92,27 @@ public void testReadSingle() throws Exception {
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
assertThat(in.read()).isEqualTo(EOF);
}
}

@Test
public void testReadBufferedEOF() throws Exception {
BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
int dataSize = 8;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize + 1];

writeGCSData(uri, expected);

try (SeekableInputStream in =
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
int bytesRead = in.read(actual, 0, dataSize + 1);
assertThat(bytesRead).isEqualTo(dataSize);
assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected);

assertThat(in.read(actual, 0, 10)).isEqualTo(EOF);
assertThat(in.getPos()).isEqualTo(dataSize);
}
}

Expand Down
Loading