From 1b082c212dfe576e0df71ced0dc74991cee15725 Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Wed, 3 Jun 2026 16:46:34 +0000 Subject: [PATCH 1/6] fix(storage): resolve flaky integration tests and Surefire crashes in CI --- .../storage/BidiUploadStreamingStream.java | 5 + ...apicBidiUnbufferedWritableByteChannel.java | 6 + ...icUnbufferedDirectWritableByteChannel.java | 5 +- .../storage/AsyncAppendingQueueTest.java | 23 +- ...efaultBufferedWritableByteChannelTest.java | 3 - .../google/cloud/storage/FakeHttpServer.java | 7 +- .../storage/ITAppendableUploadFakeTest.java | 56 ++--- ...BidiUnbufferedWritableByteChannelTest.java | 213 +++++++++-------- ...unkedResumableWritableByteChannelTest.java | 226 ++++++++++-------- ...bufferedDirectWritableByteChannelTest.java | 38 ++- ...CloseResumableWritableByteChannelTest.java | 59 +++-- ...apicUnbufferedReadableByteChannelTest.java | 16 +- ...ompositeUploadWritableByteChannelTest.java | 94 ++++---- .../storage/it/runner/registry/Registry.java | 19 +- .../storage/it/runner/registry/TestBench.java | 23 +- 15 files changed, 416 insertions(+), 377 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index f6b9ae399e34..1e39f96c73f1 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -209,6 +209,11 @@ public void sendClose() { if (tmp != null) { tmp.closeSend(); } + if (pendingReconciliation != null) { + pendingReconciliation.cancel(true); + pendingReconciliation = null; + } + retryContext.reset(); } finally { lock.unlock(); } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java index a5b4904a0f28..9c4e65ec5a44 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -251,6 +251,12 @@ private void flush(@NonNull List segments) { responseObserver.await(); return null; } catch (Throwable t) { + if (stream != null) { + try { + stream.onError(io.grpc.Status.CANCELLED.withCause(t).asRuntimeException()); + } catch (Exception ignored) { + } + } stream = null; first = true; t.addSuppressed(new AsyncStorageTaskException()); diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java index 95f6472acae7..e04020c0bc63 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java @@ -134,6 +134,10 @@ public boolean isOpen() { @Override public void close() throws IOException { + if (!open) { + return; + } + open = false; ApiStreamObserver openedStream = openedStream(); if (!finished) { WriteObjectRequest message = finishMessage(); @@ -154,7 +158,6 @@ public void close() throws IOException { throw e; } } - open = false; responseObserver.await(); } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/AsyncAppendingQueueTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/AsyncAppendingQueueTest.java index d4c107ef6659..2798bfb32a79 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/AsyncAppendingQueueTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/AsyncAppendingQueueTest.java @@ -106,17 +106,18 @@ public void getResultAlwaysReturnsTheSameFuture() { @Test public void closingWithoutAppending_throwNoSuchElementException() { Executor exec = MoreExecutors.newDirectExecutorService(); - //noinspection resource - AsyncAppendingQueue q = AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg); - - ApiFuture result = q.getResult(); - NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close); - NoSuchElementException nse2 = - assertThrows( - NoSuchElementException.class, () -> ApiExceptions.callAndTranslateApiException(result)); - - assertThat(nse1).hasMessageThat().contains("Never appended to"); - assertThat(nse2).hasMessageThat().contains("Never appended to"); + try (AsyncAppendingQueue q = + AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg)) { + ApiFuture result = q.getResult(); + NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close); + NoSuchElementException nse2 = + assertThrows( + NoSuchElementException.class, + () -> ApiExceptions.callAndTranslateApiException(result)); + + assertThat(nse1).hasMessageThat().contains("Never appended to"); + assertThat(nse2).hasMessageThat().contains("Never appended to"); + } } @SuppressWarnings("resource") diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java index 954d2917fd53..eba571002baf 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java @@ -556,7 +556,6 @@ void illegalStateExceptionIfWrittenLt0_slice_eqBuffer() { ChecksummedTestContent all = ChecksummedTestContent.gen(11); IllegalStateException ise = assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 4).asByteBuffer())); - ise.printStackTrace(System.out); } @Example @@ -568,7 +567,6 @@ void illegalStateExceptionIfWrittenLt0_slice_gtBuffer() { ChecksummedTestContent all = ChecksummedTestContent.gen(11); IllegalStateException ise = assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 5).asByteBuffer())); - ise.printStackTrace(System.out); } @Example @@ -587,7 +585,6 @@ void illegalStateExceptionIfWrittenLt0_slice_ltBuffer() { c.write(all.slice(3, 3).asByteBuffer()); fail("should have errored in previous write call"); }); - ise.printStackTrace(System.out); } @Example diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java index 566a7765ebd5..c93690d8b233 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java @@ -51,13 +51,18 @@ import java.time.Duration; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + final class FakeHttpServer implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(FakeHttpServer.class); private final URI endpoint; private final Channel channel; private final Runnable shutdown; private final HttpStorageOptions httpStorageOptions; + private FakeHttpServer( URI endpoint, Channel channel, Runnable shutdown, HttpStorageOptions httpStorageOptions) { this.endpoint = endpoint; @@ -195,7 +200,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws E @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + LOGGER.warn("Exception caught in pipeline", cause); ctx.close(); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java index 78d07c4d9425..9d4d3efb3274 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java @@ -271,10 +271,10 @@ public void bidiWriteObjectRedirectedError_maxAttempts() throws Exception { assertThrows( IOException.class, () -> { - AppendableUploadWriteableByteChannel channel = b.open(); - ByteBuffer wrap = ByteBuffer.wrap(content.getBytes()); - Buffers.emptyTo(wrap, channel); - channel.close(); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + ByteBuffer wrap = ByteBuffer.wrap(content.getBytes()); + Buffers.emptyTo(wrap, channel); + } }); assertAll( @@ -466,7 +466,7 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti GrpcStorageImpl storage = (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) { SettableApiFuture done = SettableApiFuture.create(); - BidiAppendableUnbufferedWritableByteChannel channel = + try (BidiAppendableUnbufferedWritableByteChannel channel = new BidiAppendableUnbufferedWritableByteChannel( new BidiUploadStreamingStream( BidiUploadState.appendableNew( @@ -490,11 +490,11 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti storage.storageDataClient.retryContextProvider.create()), smallSegmenter, 3, - 0); - ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); - StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); - channel.nextWriteShouldFinalize(); - channel.close(); + 0)) { + ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); + channel.nextWriteShouldFinalize(); + } assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(10); assertThat(map.get(req1)).isEqualTo(1); @@ -616,7 +616,7 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part GrpcStorageImpl storage = (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) { SettableApiFuture done = SettableApiFuture.create(); - BidiAppendableUnbufferedWritableByteChannel channel = + try (BidiAppendableUnbufferedWritableByteChannel channel = new BidiAppendableUnbufferedWritableByteChannel( new BidiUploadStreamingStream( BidiUploadState.appendableNew( @@ -640,13 +640,13 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part storage.storageDataClient.retryContextProvider.create()), smallSegmenter, 3, - 0); - ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); - ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10); - StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content1.getBytes()), channel); - StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content2.getBytes()), channel); - channel.nextWriteShouldFinalize(); - channel.close(); + 0)) { + ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); + ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10); + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content1.getBytes()), channel); + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content2.getBytes()), channel); + channel.nextWriteShouldFinalize(); + } assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(20); assertThat(map.get(reconnect)).isEqualTo(1); @@ -792,12 +792,12 @@ public void testFlushMultipleSegments_200ResponsePartialFlushHalfway() throws Ex storage.storageClient.bidiWriteObjectCallable(), 3, storage.storageDataClient.retryContextProvider.create()); - BidiAppendableUnbufferedWritableByteChannel channel = - new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 3, 0); ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); - StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); - channel.nextWriteShouldFinalize(); - channel.close(); + try (BidiAppendableUnbufferedWritableByteChannel channel = + new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 3, 0)) { + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); + channel.nextWriteShouldFinalize(); + } assertThat(stream.getResultFuture().get(777, TimeUnit.MILLISECONDS).getResource().getSize()) .isEqualTo(10); @@ -1116,11 +1116,11 @@ private static void runTestFlushMultipleSegments(FakeStorage fake) throws Except storage.storageClient.bidiWriteObjectCallable(), 3, storage.storageDataClient.retryContextProvider.create()); - BidiAppendableUnbufferedWritableByteChannel channel = - new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 32, 0); - StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); - channel.nextWriteShouldFinalize(); - channel.close(); + try (BidiAppendableUnbufferedWritableByteChannel channel = + new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 32, 0)) { + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel); + channel.nextWriteShouldFinalize(); + } BidiWriteObjectResponse response = stream.getResultFuture().get(777, TimeUnit.MILLISECONDS); assertThat(response.getResource().getSize()).isEqualTo(10); assertThat(response.getResource().getChecksums().getCrc32C()).isEqualTo(content.getCrc32c()); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java index 475ba6b35c9f..11fcb3f450db 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java @@ -128,23 +128,23 @@ public void scenario1() throws Exception { BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); SettableApiFuture done = SettableApiFuture.create(); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("invalid"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -210,22 +210,22 @@ public void scenario2() throws Exception { writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("invalid"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -291,22 +291,22 @@ public void scenario3() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -374,19 +374,20 @@ public void scenario4() throws Exception { writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); + GrpcCallContext::createDefault)) { - channel.close(); + channel.close(); - BidiWriteObjectResponse BidiWriteObjectResponse = done.get(2, TimeUnit.SECONDS); - assertThat(BidiWriteObjectResponse).isEqualTo(resp1); + BidiWriteObjectResponse BidiWriteObjectResponse = done.get(2, TimeUnit.SECONDS); + assertThat(BidiWriteObjectResponse).isEqualTo(resp1); + } } } @@ -454,22 +455,22 @@ public void scenario4_1() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -537,22 +538,22 @@ public void scenario4_2() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -630,23 +631,23 @@ public void scenario5() throws Exception { writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -694,23 +695,23 @@ public void scenario7() throws Exception { BidiResumableWrite resumableWrite = getResumableWrite(uploadId); BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -744,23 +745,23 @@ public void incremental_success() throws Exception { BidiResumableWrite resumableWrite = getResumableWrite(uploadId); BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, CHUNK_SEGMENTER, writeCtx, - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); - int written = channel.write(buf); - assertAll( - () -> assertThat(buf.remaining()).isEqualTo(0), - () -> assertThat(written).isEqualTo(_256KiB), - () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + GrpcCallContext::createDefault)) { + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(0), + () -> assertThat(written).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + } } } @@ -796,29 +797,29 @@ public void incremental_partialSuccess() throws Exception { ChunkSegmenter chunkSegmenter = new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); - //noinspection resource - GapicBidiUnbufferedWritableByteChannel channel = + try (GapicBidiUnbufferedWritableByteChannel channel = new GapicBidiUnbufferedWritableByteChannel( storageClient.bidiWriteObjectCallable(), RetrierWithAlg.attemptOnce(), done, chunkSegmenter, writeCtx, - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); - int written = channel.write(buf); - assertAll( - () -> assertThat(buf.remaining()).isEqualTo(_256KiB), - () -> assertThat(written).isEqualTo(_256KiB), - () -> - assertWithMessage("totalSentBytes") - .that(writeCtx.getTotalSentBytes().get()) - .isEqualTo(_256KiB), - () -> - assertWithMessage("confirmedBytes") - .that(writeCtx.getConfirmedBytes().get()) - .isEqualTo(_256KiB)); + GrpcCallContext::createDefault)) { + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(_256KiB), + () -> assertThat(written).isEqualTo(_256KiB), + () -> + assertWithMessage("totalSentBytes") + .that(writeCtx.getTotalSentBytes().get()) + .isEqualTo(_256KiB), + () -> + assertWithMessage("confirmedBytes") + .that(writeCtx.getConfirmedBytes().get()) + .isEqualTo(_256KiB)); + } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java index 473e73a9c1e5..8f624dbaa8e4 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java @@ -119,23 +119,22 @@ public void scenario1() throws Exception { WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); SettableApiFuture done = SettableApiFuture.create(); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("invalid"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -201,22 +200,21 @@ public void scenario2() throws Exception { writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("invalid"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -282,22 +280,21 @@ public void scenario3() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -447,22 +444,21 @@ public void scenario4_1() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -531,22 +527,21 @@ public void scenario4_2() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(_512KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -621,23 +616,22 @@ public void scenario5() throws Exception { writeCtx.getTotalSentBytes().set(_256KiB); writeCtx.getConfirmedBytes().set(_256KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -680,40 +674,51 @@ public void scenario7() throws Exception { ResumableWrite resumableWrite = getResumableWrite(uploadId); WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); - StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + GrpcCallContext::createDefault)) { + StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @Test public void incremental_success() throws Exception { String uploadId = "uploadId"; + byte[] data = DataGenerator.base64Characters().genBytes(_256KiB); + long crc32c = HASHER.hash(ByteBuffer.wrap(data)).getValue(); WriteObjectRequest req1 = WriteObjectRequest.newBuilder() .setUploadId(uploadId) - .setChecksummedData( - ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)) - .asChecksummedData()) + .setChecksummedData(ChecksummedTestContent.of(data).asChecksummedData()) .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + WriteObjectRequest req2 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C((int) crc32c).build()) + .build(); + WriteObjectResponse resp2 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_256KiB).build()) + .build(); + ImmutableMap, WriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1); + ImmutableMap.of(ImmutableList.of(req1), resp1, ImmutableList.of(req2), resp2); DirectWriteService service1 = new DirectWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); @@ -725,23 +730,22 @@ public void incremental_success() throws Exception { ResumableWrite resumableWrite = getResumableWrite(uploadId); WriteCtx writeCtx = WriteCtx.of(resumableWrite, HASHER); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); - int written = channel.write(buf); - assertAll( - () -> assertThat(buf.remaining()).isEqualTo(0), - () -> assertThat(written).isEqualTo(_256KiB), - () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + GrpcCallContext::createDefault)) { + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(0), + () -> assertThat(written).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + } } } @@ -759,8 +763,19 @@ public void incremental_partialSuccess() throws Exception { .build(); WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + WriteObjectRequest req2 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp2 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_256KiB).build()) + .build(); + ImmutableMap, WriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1); + ImmutableMap.of(ImmutableList.of(req1), resp1, ImmutableList.of(req2), resp2); DirectWriteService service1 = new DirectWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); @@ -774,29 +789,28 @@ public void incremental_partialSuccess() throws Exception { ChunkSegmenter chunkSegmenter = new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); - //noinspection resource - GapicUnbufferedChunkedResumableWritableByteChannel channel = + try (GapicUnbufferedChunkedResumableWritableByteChannel channel = new GapicUnbufferedChunkedResumableWritableByteChannel( done, chunkSegmenter, storageClient.writeObjectCallable(), writeCtx, RetrierWithAlg.attemptOnce(), - GrpcCallContext::createDefault); - - ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); - int written = channel.write(buf); - assertAll( - () -> assertThat(buf.remaining()).isEqualTo(_256KiB), - () -> assertThat(written).isEqualTo(_256KiB), - () -> - assertWithMessage("totalSentBytes") - .that(writeCtx.getTotalSentBytes().get()) - .isEqualTo(_256KiB), - () -> - assertWithMessage("confirmedBytes") - .that(writeCtx.getConfirmedBytes().get()) - .isEqualTo(_256KiB)); + GrpcCallContext::createDefault)) { + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(_256KiB), + () -> assertThat(written).isEqualTo(_256KiB), + () -> + assertWithMessage("totalSentBytes") + .that(writeCtx.getTotalSentBytes().get()) + .isEqualTo(_256KiB), + () -> + assertWithMessage("confirmedBytes") + .that(writeCtx.getConfirmedBytes().get()) + .isEqualTo(_256KiB)); + } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java index e8b9cd7d3b11..5fd050bc20cd 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedDirectWritableByteChannelTest.java @@ -117,17 +117,16 @@ public void ack_lt() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); - //noinspection resource - GapicUnbufferedDirectWritableByteChannel channel = + try (GapicUnbufferedDirectWritableByteChannel channel = new GapicUnbufferedDirectWritableByteChannel( - done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -160,17 +159,16 @@ public void ack_gt() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); - //noinspection resource - GapicUnbufferedDirectWritableByteChannel channel = + try (GapicUnbufferedDirectWritableByteChannel channel = new GapicUnbufferedDirectWritableByteChannel( - done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java index e9dd60f7fec6..87fbe72a2319 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java @@ -73,18 +73,17 @@ public void incrementalResponseForFinalizingRequest() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); - //noinspection resource - GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + try (GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("invalid"), - () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_512KiB), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_512KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -230,17 +229,16 @@ public void scenario4_1() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); - //noinspection resource - GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + try (GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } @@ -309,17 +307,16 @@ public void scenario4_2() throws Exception { writeCtx.getTotalSentBytes().set(_512KiB); writeCtx.getConfirmedBytes().set(0); - //noinspection resource - GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + try (GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); - - StorageException se = assertThrows(StorageException.class, channel::close); - assertAll( - () -> assertThat(se.getCode()).isEqualTo(0), - () -> assertThat(se.getReason()).isEqualTo("dataLoss"), - () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), - () -> assertThat(channel.isOpen()).isFalse()); + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx)) { + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index 1e1c05915ab5..793209365b15 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -319,14 +319,14 @@ public void readObject( retrier, retryOnly(DataLossException.class))); byte[] actualBytes = new byte[41]; - //noinspection resource - UnbufferedReadableByteChannel c = session.open(); - ByteBuffer buf = ByteBuffer.wrap(actualBytes); - int read1 = c.read(buf); - assertThat(read1).isAtLeast(1); - int read2 = c.read(buf); - assertThat(read2).isEqualTo(-1); - assertThrows(ClosedChannelException.class, () -> c.read(buf)); + try (UnbufferedReadableByteChannel c = session.open()) { + ByteBuffer buf = ByteBuffer.wrap(actualBytes); + int read1 = c.read(buf); + assertThat(read1).isAtLeast(1); + int read2 = c.read(buf); + assertThat(read2).isEqualTo(-1); + assertThrows(ClosedChannelException.class, () -> c.read(buf)); + } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java index 2990aae5dd69..8b0eed565502 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java @@ -435,8 +435,7 @@ public void callingFlushWhileBufferIsEmptyIsANoOp() throws Exception { @Test public void creatingAnEmptyObjectWhichFailsIsSetAsResultFailureAndThrowFromClose() throws Exception { - //noinspection resource - ParallelCompositeUploadWritableByteChannel pcu = + try (ParallelCompositeUploadWritableByteChannel pcu = new ParallelCompositeUploadWritableByteChannel( bufferHandlePool, MoreExecutors.directExecutor(), @@ -455,22 +454,22 @@ public BlobInfo internalDirectUpload( } }, info, - opts); - StorageException se1 = assertThrows(StorageException.class, pcu::close); - StorageException se2 = - assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); + opts)) { + StorageException se1 = assertThrows(StorageException.class, pcu::close); + StorageException se2 = + assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); - assertAll( - () -> assertThat(se1).hasMessageThat().isEqualTo("Error: PERMISSION_DENIED"), - () -> assertThat(se2).hasMessageThat().isEqualTo("Error: PERMISSION_DENIED"), - () -> assertThat(se1.getCode()).isEqualTo(403), - () -> assertThat(se2.getCode()).isEqualTo(403)); + assertAll( + () -> assertThat(se1).hasMessageThat().isEqualTo("Error: PERMISSION_DENIED"), + () -> assertThat(se2).hasMessageThat().isEqualTo("Error: PERMISSION_DENIED"), + () -> assertThat(se1.getCode()).isEqualTo(403), + () -> assertThat(se2.getCode()).isEqualTo(403)); + } } @Test public void badServerCrc32cResultsInException() throws Exception { - //noinspection resource - ParallelCompositeUploadWritableByteChannel pcu = + try (ParallelCompositeUploadWritableByteChannel pcu = new ParallelCompositeUploadWritableByteChannel( bufferHandlePool, MoreExecutors.directExecutor(), @@ -488,18 +487,19 @@ public BlobInfo compose(ComposeRequest composeRequest) { } }, info, - opts); - pcu.write(DataGenerator.base64Characters().genByteBuffer(3)); + opts)) { + pcu.write(DataGenerator.base64Characters().genByteBuffer(3)); - AsynchronousCloseException se1 = assertThrows(AsynchronousCloseException.class, pcu::close); - StorageException se2 = - assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); + AsynchronousCloseException se1 = assertThrows(AsynchronousCloseException.class, pcu::close); + StorageException se2 = + assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); - assertAll( - () -> assertThat(se1).hasCauseThat().isInstanceOf(StorageException.class), - () -> assertThat(se1).hasCauseThat().hasMessageThat().contains("Checksum mismatch"), - () -> assertThat(se2).hasMessageThat().contains("Checksum mismatch"), - () -> assertThat(se2.getCode()).isEqualTo(400)); + assertAll( + () -> assertThat(se1).hasCauseThat().isInstanceOf(StorageException.class), + () -> assertThat(se1).hasCauseThat().hasMessageThat().contains("Checksum mismatch"), + () -> assertThat(se2).hasMessageThat().contains("Checksum mismatch"), + () -> assertThat(se2.getCode()).isEqualTo(400)); + } } @Test @@ -656,8 +656,7 @@ public BlobInfo internalDirectUpload( @Test public void errorContextIsPopulated() throws Exception { - //noinspection resource - ParallelCompositeUploadWritableByteChannel pcu = + try (ParallelCompositeUploadWritableByteChannel pcu = new ParallelCompositeUploadWritableByteChannel( bufferHandlePool, MoreExecutors.directExecutor(), @@ -675,31 +674,32 @@ public BlobInfo compose(ComposeRequest composeRequest) { } }, info, - opts); - pcu.write(DataGenerator.base64Characters().genByteBuffer(3)); + opts)) { + pcu.write(DataGenerator.base64Characters().genByteBuffer(3)); - AsynchronousCloseException se1 = assertThrows(AsynchronousCloseException.class, pcu::close); - StorageException se2 = - assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); + AsynchronousCloseException se1 = assertThrows(AsynchronousCloseException.class, pcu::close); + StorageException se2 = + assertThrows(StorageException.class, () -> ApiFutureUtils.await(finalObject)); - String name = info.getName(); - // parts 1-4 - BlobId p1 = id(partNamingStrategy.fmtName(name, PartRange.of(1)), 1L); - // ultimate object - BlobId expectedId = id(name, 2L); + String name = info.getName(); + // parts 1-4 + BlobId p1 = id(partNamingStrategy.fmtName(name, PartRange.of(1)), 1L); + // ultimate object + BlobId expectedId = id(name, 2L); - assertAll( - () -> assertThat(se1).hasCauseThat().isInstanceOf(StorageException.class), - () -> assertThat(se1).hasCauseThat().hasMessageThat().contains("Checksum mismatch"), - () -> assertThat(se2).hasMessageThat().contains("Checksum mismatch"), - () -> { - assertThat(se2).hasCauseThat().isInstanceOf(ParallelCompositeUploadException.class); - ParallelCompositeUploadException pcue = (ParallelCompositeUploadException) se2.getCause(); - // since we fail client side with a checksum validation, we expect the object to have been - // created - assertThat(pcue.getCreatedObjects().get()).containsExactly(p1, expectedId); - }, - () -> assertThat(se2.getCode()).isEqualTo(400)); + assertAll( + () -> assertThat(se1).hasCauseThat().isInstanceOf(StorageException.class), + () -> assertThat(se1).hasCauseThat().hasMessageThat().contains("Checksum mismatch"), + () -> assertThat(se2).hasMessageThat().contains("Checksum mismatch"), + () -> { + assertThat(se2).hasCauseThat().isInstanceOf(ParallelCompositeUploadException.class); + ParallelCompositeUploadException pcue = (ParallelCompositeUploadException) se2.getCause(); + // since we fail client side with a checksum validation, we expect the object to have been + // created + assertThat(pcue.getCreatedObjects().get()).containsExactly(p1, expectedId); + }, + () -> assertThat(se2.getCode()).isEqualTo(400)); + } } @Test diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java index c232cd32de6c..0152ef1e911b 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java @@ -74,7 +74,15 @@ public final class Registry extends RunListener { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-run-%d").build())); private final TestRunScopedInstance testBench = - TestRunScopedInstance.of("fixture/TEST_BENCH", () -> TestBench.newBuilder().build()); + TestRunScopedInstance.of("fixture/TEST_BENCH", () -> { + int httpPort = findFreePort(); + int grpcPort = findFreePort(); + return TestBench.newBuilder() + .setBaseUri("http://localhost:" + httpPort) + .setGRPCBaseUri("http://localhost:" + grpcPort) + .setContainerName("fork-" + httpPort) + .build(); + }); private final TestRunScopedInstance generator = TestRunScopedInstance.of("fixture/GENERATOR", Generator::new); @@ -325,4 +333,13 @@ private static boolean isClassAnnotatedSingleBackendTestBench(Description descri } }); } + + private static int findFreePort() { + try (java.net.ServerSocket socket = new java.net.ServerSocket(0)) { + socket.setReuseAddress(true); + return socket.getLocalPort(); + } catch (java.io.IOException e) { + throw new RuntimeException("Failed to allocate a free port", e); + } + } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 4d9340762093..4dcbed0fd392 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -383,21 +383,16 @@ public boolean shouldRetry(Throwable previousThrowable, List previousResponse } private void dumpServerLogs(Path outFile, Path errFile) throws IOException { + LOGGER.warn("TestBench container failed or timeout occurred. Server stdout: {}, stderr: {}", + outFile.toAbsolutePath(), errFile.toAbsolutePath()); try { - LOGGER.warn("Dumping contents of stdout"); - dumpServerLog("stdout", outFile.toFile()); - } finally { - LOGGER.warn("Dumping contents of stderr"); - dumpServerLog("stderr", errFile.toFile()); - } - } - - private void dumpServerLog(String prefix, File out) throws IOException { - try (BufferedReader reader = new BufferedReader(new FileReader(out))) { - String line; - while ((line = reader.readLine()) != null) { - LOGGER.warn("<{}> {}", prefix, line); - } + Path targetLogsDir = java.nio.file.Paths.get("target/testbench-logs"); + java.nio.file.Files.createDirectories(targetLogsDir); + java.nio.file.Files.copy(outFile, targetLogsDir.resolve(outFile.getFileName()), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + java.nio.file.Files.copy(errFile, targetLogsDir.resolve(errFile.getFileName()), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + LOGGER.warn("Copied Testbench logs to target directory: {}", targetLogsDir.toAbsolutePath()); + } catch (Exception e) { + // ignore failures to copy } } From 7545a1979761a7c910cc702c8811aeba0a7e3efb Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Wed, 3 Jun 2026 17:43:45 +0000 Subject: [PATCH 2/6] test(storage): fix mock finalization mapping in ITGapicBidiUnbufferedWritableByteChannelTest --- .../google/cloud/storage/FakeHttpServer.java | 2 -- ...BidiUnbufferedWritableByteChannelTest.java | 24 +++++++++++++++++-- ...ompositeUploadWritableByteChannelTest.java | 6 +++-- .../storage/it/runner/registry/Registry.java | 20 +++++++++------- .../storage/it/runner/registry/TestBench.java | 18 +++++++++----- 5 files changed, 49 insertions(+), 21 deletions(-) diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java index c93690d8b233..44b9f9e27c6d 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java @@ -50,7 +50,6 @@ import java.net.URI; import java.time.Duration; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +61,6 @@ final class FakeHttpServer implements AutoCloseable { private final Runnable shutdown; private final HttpStorageOptions httpStorageOptions; - private FakeHttpServer( URI endpoint, Channel channel, Runnable shutdown, HttpStorageOptions httpStorageOptions) { this.endpoint = endpoint; diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java index 11fcb3f450db..a74c9bc672d0 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java @@ -732,8 +732,18 @@ public void incremental_success() throws Exception { BidiWriteObjectResponse resp1 = BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + BidiWriteObjectRequest req2 = + BidiWriteObjectRequest.newBuilder().setWriteOffset(_256KiB).setFinishWrite(true).build(); + BidiWriteObjectResponse resp2 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_256KiB).build()) + .build(); + ImmutableMap, BidiWriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1); + ImmutableMap.of( + ImmutableList.of(req1), resp1, + ImmutableList.of(req2), resp2, + ImmutableList.of(req1, req2), resp2); BidiWriteService service1 = new BidiWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); @@ -782,8 +792,18 @@ public void incremental_partialSuccess() throws Exception { BidiWriteObjectResponse resp1 = BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + BidiWriteObjectRequest req2 = + BidiWriteObjectRequest.newBuilder().setWriteOffset(_256KiB).setFinishWrite(true).build(); + BidiWriteObjectResponse resp2 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_256KiB).build()) + .build(); + ImmutableMap, BidiWriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1); + ImmutableMap.of( + ImmutableList.of(req1), resp1, + ImmutableList.of(req2), resp2, + ImmutableList.of(req1, req2), resp2); BidiWriteService service1 = new BidiWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java index 8b0eed565502..71849e9064b5 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java @@ -693,8 +693,10 @@ public BlobInfo compose(ComposeRequest composeRequest) { () -> assertThat(se2).hasMessageThat().contains("Checksum mismatch"), () -> { assertThat(se2).hasCauseThat().isInstanceOf(ParallelCompositeUploadException.class); - ParallelCompositeUploadException pcue = (ParallelCompositeUploadException) se2.getCause(); - // since we fail client side with a checksum validation, we expect the object to have been + ParallelCompositeUploadException pcue = + (ParallelCompositeUploadException) se2.getCause(); + // since we fail client side with a checksum validation, we expect the object to have + // been // created assertThat(pcue.getCreatedObjects().get()).containsExactly(p1, expectedId); }, diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java index 0152ef1e911b..1d0d7ffe0f1e 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java @@ -74,15 +74,17 @@ public final class Registry extends RunListener { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-run-%d").build())); private final TestRunScopedInstance testBench = - TestRunScopedInstance.of("fixture/TEST_BENCH", () -> { - int httpPort = findFreePort(); - int grpcPort = findFreePort(); - return TestBench.newBuilder() - .setBaseUri("http://localhost:" + httpPort) - .setGRPCBaseUri("http://localhost:" + grpcPort) - .setContainerName("fork-" + httpPort) - .build(); - }); + TestRunScopedInstance.of( + "fixture/TEST_BENCH", + () -> { + int httpPort = findFreePort(); + int grpcPort = findFreePort(); + return TestBench.newBuilder() + .setBaseUri("http://localhost:" + httpPort) + .setGRPCBaseUri("http://localhost:" + grpcPort) + .setContainerName("fork-" + httpPort) + .build(); + }); private final TestRunScopedInstance generator = TestRunScopedInstance.of("fixture/GENERATOR", Generator::new); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 4dcbed0fd392..7936b1668918 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -42,9 +42,7 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -383,13 +381,21 @@ public boolean shouldRetry(Throwable previousThrowable, List previousResponse } private void dumpServerLogs(Path outFile, Path errFile) throws IOException { - LOGGER.warn("TestBench container failed or timeout occurred. Server stdout: {}, stderr: {}", - outFile.toAbsolutePath(), errFile.toAbsolutePath()); + LOGGER.warn( + "TestBench container failed or timeout occurred. Server stdout: {}, stderr: {}", + outFile.toAbsolutePath(), + errFile.toAbsolutePath()); try { Path targetLogsDir = java.nio.file.Paths.get("target/testbench-logs"); java.nio.file.Files.createDirectories(targetLogsDir); - java.nio.file.Files.copy(outFile, targetLogsDir.resolve(outFile.getFileName()), java.nio.file.StandardCopyOption.REPLACE_EXISTING); - java.nio.file.Files.copy(errFile, targetLogsDir.resolve(errFile.getFileName()), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + java.nio.file.Files.copy( + outFile, + targetLogsDir.resolve(outFile.getFileName()), + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + java.nio.file.Files.copy( + errFile, + targetLogsDir.resolve(errFile.getFileName()), + java.nio.file.StandardCopyOption.REPLACE_EXISTING); LOGGER.warn("Copied Testbench logs to target directory: {}", targetLogsDir.toAbsolutePath()); } catch (Exception e) { // ignore failures to copy From 1bf425208276e078a94d6baa760f256b824126cc Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Wed, 3 Jun 2026 18:18:19 +0000 Subject: [PATCH 3/6] test(storage): resolve flakiness and state machine mismatch in integration tests --- .../java/com/google/cloud/storage/BidiUploadState.java | 3 +++ ...ufferedChunkedResumableWritableByteChannelTest.java | 10 ++++++++-- .../cloud/storage/ITStorageDataClientFakeTest.java | 4 ++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java index 7894e35f9934..6168277ba946 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java @@ -726,6 +726,9 @@ ApiFuture beginReconciliation() { final void sendVia(Consumer consumer) { lock.lock(); try { + if (state == State.TERMINAL_SUCCESS || state == State.TERMINAL_ERROR) { + return; + } validateCurrentStateIsOneOf( State.INITIALIZING, State.RUNNING, State.RETRYING, State.TAKEOVER); BidiWriteObjectRequest prev = null; diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java index 8f624dbaa8e4..513eb8834a55 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java @@ -718,7 +718,10 @@ public void incremental_success() throws Exception { .build(); ImmutableMap, WriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1, ImmutableList.of(req2), resp2); + ImmutableMap.of( + ImmutableList.of(req1), resp1, + ImmutableList.of(req2), resp2, + ImmutableList.of(req1, req2), resp2); DirectWriteService service1 = new DirectWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); @@ -775,7 +778,10 @@ public void incremental_partialSuccess() throws Exception { .build(); ImmutableMap, WriteObjectResponse> map = - ImmutableMap.of(ImmutableList.of(req1), resp1, ImmutableList.of(req2), resp2); + ImmutableMap.of( + ImmutableList.of(req1), resp1, + ImmutableList.of(req2), resp2, + ImmutableList.of(req1, req2), resp2); DirectWriteService service1 = new DirectWriteService(map); try (FakeServer fakeServer = FakeServer.of(service1); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITStorageDataClientFakeTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITStorageDataClientFakeTest.java index 276889ab44c5..146735edc00c 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITStorageDataClientFakeTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITStorageDataClientFakeTest.java @@ -55,7 +55,7 @@ public final class ITStorageDataClientFakeTest { public void fastOpen_futureBytes() throws Exception { doTest( ReadProjectionConfigs.asFutureBytes().withRangeSpec(RangeSpec.of(10, 20)), - f -> f.get(10, TimeUnit.MILLISECONDS)); + f -> f.get(3, TimeUnit.SECONDS)); } @Test @@ -74,7 +74,7 @@ public void fastOpen_futureByteString() throws Exception { doTest( ReadProjectionConfigs.asFutureByteString().withRangeSpec(RangeSpec.of(10, 20)), f -> { - try (DisposableByteString disposableByteString = f.get(10, TimeUnit.MILLISECONDS)) { + try (DisposableByteString disposableByteString = f.get(3, TimeUnit.SECONDS)) { ByteString byteString = disposableByteString.byteString(); return byteString.toByteArray(); } From d7b26b7261380bbe1c9cd94a723527e4a9f88856 Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Thu, 4 Jun 2026 07:13:37 +0000 Subject: [PATCH 4/6] test(storage): resolve thread leak in OtelSdkShim shutdown causing build hang --- .../cloud/storage/it/runner/registry/TestRunScopedInstance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java index 4973e5fe9525..abb8b343f79e 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java @@ -91,7 +91,7 @@ public void shutdown() throws Exception { synchronized (this) { instance = null; } - if (name.equals("OTEL_SDK")) { + if (name.equals("fixture/OTEL_SDK")) { tmp.stop(); } else { Span span = From 5ae3892d666c9b6f25a7be55e301739b8b1bced7 Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Thu, 4 Jun 2026 10:16:02 +0000 Subject: [PATCH 5/6] test: replace Truth assertThat with assertArrayEquals for byte array validation to prevent hang on mismatch --- .../conformance/retry/RpcMethodMappings.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index 9ed0177e354a..e441410e771e 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -1372,8 +1372,7 @@ private static void get(ArrayList a) { (tmpOutFile) -> { state.getBlob().downloadTo(tmpOutFile); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - assertThat(downloadedBytes) - .isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1391,8 +1390,7 @@ private static void get(ArrayList a) { .downloadTo( tmpOutFile, Blob.BlobSourceOption.generationMatch()); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - assertThat(downloadedBytes) - .isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1404,7 +1402,7 @@ private static void get(ArrayList a) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); state.getBlob().downloadTo(baos); byte[] downloadedBytes = baos.toByteArray(); - assertThat(downloadedBytes).isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1418,7 +1416,7 @@ private static void get(ArrayList a) { .getBlob() .downloadTo(baos, Blob.BlobSourceOption.generationMatch()); byte[] downloadedBytes = baos.toByteArray(); - assertThat(downloadedBytes).isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1428,7 +1426,7 @@ private static void get(ArrayList a) { ctx.peek( state -> { byte[] downloadedBytes = state.getBlob().getContent(); - assertThat(downloadedBytes).isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1441,7 +1439,7 @@ private static void get(ArrayList a) { state .getBlob() .getContent(Blob.BlobSourceOption.metagenerationMatch()); - assertThat(downloadedBytes).isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1480,8 +1478,7 @@ private static void get(ArrayList a) { (tmpOutFile) -> { ctx.getStorage().downloadTo(state.getBlobId(), tmpOutFile); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - assertThat(downloadedBytes) - .isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1498,8 +1495,7 @@ private static void get(ArrayList a) { new FileOutputStream(tmpOutFile.toFile()); ctx.getStorage().downloadTo(state.getBlobId(), fos); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - assertThat(downloadedBytes) - .isEqualTo(c.getHelloWorldUtf8Bytes()); + org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); } From 7b33bfc1b220ac1afc16030c2857e6b1cb5e6462 Mon Sep 17 00:00:00 2001 From: Nidhi Nandwani Date: Thu, 4 Jun 2026 11:30:14 +0000 Subject: [PATCH 6/6] Fix intermittent CI hang by correctly shutting down test thread pools --- ...ableUnbufferedWritableByteChannelTest.java | 153 +++++++++--------- .../cloud/storage/BufferHandleTest.java | 34 ++-- .../conformance/retry/RpcMethodMappings.java | 24 ++- .../it/ITMultipartUploadClientTest.java | 69 ++++---- 4 files changed, 150 insertions(+), 130 deletions(-) diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java index 0364ddaf18c2..d6ab1a4b4d0a 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java @@ -41,81 +41,86 @@ public final class BidiAppendableUnbufferedWritableByteChannelTest { @Test public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOException { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - ChecksummedTestContent ctc = ChecksummedTestContent.gen(27); - AppendableUploadState state = - BidiUploadState.appendableNew( - BidiUploadTest.appendRequestNew, - GrpcCallContext::createDefault, - 16, - SettableApiFuture.create(), - Crc32cValue.zero()); - AtomicLong finishWriteOffset = new AtomicLong(-1); - BidiUploadStreamingStream stream = - new BidiUploadStreamingStream( - state, - executor, - BidiUploadTestUtils.adaptOnlySend( - respond -> - request -> { - if (request.getFinishWrite()) { - finishWriteOffset.set( - request.getWriteOffset() - + request.getChecksummedData().getContent().size()); - } - executor.submit( - () -> { - switch ((int) request.getWriteOffset()) { - case 0: - respond.onResponse(BidiUploadTest.resourceWithSize(0)); - break; - case 4: - case 8: - // do not ack any bytes until we receive 16, this simulates - // latency on the bytes being ack'd. - break; - case 12: - respond.onResponse(BidiUploadTestUtils.incremental(8)); - break; - case 16: - respond.onResponse(BidiUploadTestUtils.incremental(12)); - break; - case 20: - respond.onResponse(BidiUploadTestUtils.incremental(16)); - break; - case 24: - BidiWriteObjectResponse.Builder b = - BidiUploadTest.resourceFor(ctc).toBuilder(); - b.getResourceBuilder() - .setFinalizeTime( - Conversions.grpc() - .timestampCodec - .encode(OffsetDateTime.now())); - respond.onResponse(b.build()); - break; - default: - respond.onError( - FakeStorage.unexpectedRequest(request, ImmutableList.of())); - break; - } - }); - }), - 3, - RetryContext.neverRetry()); - ChunkSegmenter chunkSegmenter = - new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2); - BidiAppendableUnbufferedWritableByteChannel channel = - new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0); + try { + ChecksummedTestContent ctc = ChecksummedTestContent.gen(27); + AppendableUploadState state = + BidiUploadState.appendableNew( + BidiUploadTest.appendRequestNew, + GrpcCallContext::createDefault, + 16, + SettableApiFuture.create(), + Crc32cValue.zero()); + AtomicLong finishWriteOffset = new AtomicLong(-1); + BidiUploadStreamingStream stream = + new BidiUploadStreamingStream( + state, + executor, + BidiUploadTestUtils.adaptOnlySend( + respond -> + request -> { + if (request.getFinishWrite()) { + finishWriteOffset.set( + request.getWriteOffset() + + request.getChecksummedData().getContent().size()); + } + executor.submit( + () -> { + switch ((int) request.getWriteOffset()) { + case 0: + respond.onResponse(BidiUploadTest.resourceWithSize(0)); + break; + case 4: + case 8: + // do not ack any bytes until we receive 16, this simulates + // latency on the bytes being ack'd. + break; + case 12: + respond.onResponse(BidiUploadTestUtils.incremental(8)); + break; + case 16: + respond.onResponse(BidiUploadTestUtils.incremental(12)); + break; + case 20: + respond.onResponse(BidiUploadTestUtils.incremental(16)); + break; + case 24: + BidiWriteObjectResponse.Builder b = + BidiUploadTest.resourceFor(ctc).toBuilder(); + b.getResourceBuilder() + .setFinalizeTime( + Conversions.grpc() + .timestampCodec + .encode(OffsetDateTime.now())); + respond.onResponse(b.build()); + break; + default: + respond.onError( + FakeStorage.unexpectedRequest(request, ImmutableList.of())); + break; + } + }); + }), + 3, + RetryContext.neverRetry()); + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2); + BidiAppendableUnbufferedWritableByteChannel channel = + new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0); - ByteBuffer buf = ctc.asByteBuffer(); - int written1 = channel.write(buf); - // fill up the outbound queue - assertThat(written1).isEqualTo(16); + ByteBuffer buf = ctc.asByteBuffer(); + int written1 = channel.write(buf); + // fill up the outbound queue + assertThat(written1).isEqualTo(16); - // asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in the - // outbound queue to allow writeAndClose to start consuming bytes. - channel.nextWriteShouldFinalize(); - int written2 = channel.writeAndClose(buf); - assertThat(written2).isEqualTo(11); - assertThat(finishWriteOffset.get()).isEqualTo(ctc.length()); + // asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in + // the + // outbound queue to allow writeAndClose to start consuming bytes. + channel.nextWriteShouldFinalize(); + int written2 = channel.writeAndClose(buf); + assertThat(written2).isEqualTo(11); + assertThat(finishWriteOffset.get()).isEqualTo(ctc.length()); + } finally { + executor.shutdownNow(); + } } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferHandleTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferHandleTest.java index d88b56b2d6b9..08b48a225dc8 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferHandleTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferHandleTest.java @@ -76,21 +76,25 @@ public void lazyBufferHandle_afterAllocationOnlyBackingIsReferenced() { public void lazyBufferHandle_initIsThreadSafe() throws ExecutionException, InterruptedException { int capacity = 10; ExecutorService exec = Executors.newFixedThreadPool(2); - AtomicBoolean alloc = new AtomicBoolean(false); - LazyBufferHandle handle = - new LazyBufferHandle( - capacity, - i -> { - alloc.compareAndSet(false, true); - return ByteBuffer.allocate(capacity); - }); - - Future f1 = exec.submit(handle::get); - Future f2 = exec.submit(handle::get); - - assertThat(f1.get()).isSameInstanceAs(f2.get()); - - assertThat(handle.get().capacity()).isEqualTo(capacity); + try { + AtomicBoolean alloc = new AtomicBoolean(false); + LazyBufferHandle handle = + new LazyBufferHandle( + capacity, + i -> { + alloc.compareAndSet(false, true); + return ByteBuffer.allocate(capacity); + }); + + Future f1 = exec.submit(handle::get); + Future f2 = exec.submit(handle::get); + + assertThat(f1.get()).isSameInstanceAs(f2.get()); + + assertThat(handle.get().capacity()).isEqualTo(capacity); + } finally { + exec.shutdownNow(); + } } @Test diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index e441410e771e..7b554cc01278 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -1372,7 +1372,8 @@ private static void get(ArrayList a) { (tmpOutFile) -> { state.getBlob().downloadTo(tmpOutFile); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1390,7 +1391,8 @@ private static void get(ArrayList a) { .downloadTo( tmpOutFile, Blob.BlobSourceOption.generationMatch()); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1402,7 +1404,8 @@ private static void get(ArrayList a) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); state.getBlob().downloadTo(baos); byte[] downloadedBytes = baos.toByteArray(); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1416,7 +1419,8 @@ private static void get(ArrayList a) { .getBlob() .downloadTo(baos, Blob.BlobSourceOption.generationMatch()); byte[] downloadedBytes = baos.toByteArray(); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1426,7 +1430,8 @@ private static void get(ArrayList a) { ctx.peek( state -> { byte[] downloadedBytes = state.getBlob().getContent(); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1439,7 +1444,8 @@ private static void get(ArrayList a) { state .getBlob() .getContent(Blob.BlobSourceOption.metagenerationMatch()); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); })) .build()); a.add( @@ -1478,7 +1484,8 @@ private static void get(ArrayList a) { (tmpOutFile) -> { ctx.getStorage().downloadTo(state.getBlobId(), tmpOutFile); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); a.add( @@ -1495,7 +1502,8 @@ private static void get(ArrayList a) { new FileOutputStream(tmpOutFile.toFile()); ctx.getStorage().downloadTo(state.getBlobId(), fos); byte[] downloadedBytes = Files.readAllBytes(tmpOutFile); - org.junit.Assert.assertArrayEquals(c.getHelloWorldUtf8Bytes(), downloadedBytes); + org.junit.Assert.assertArrayEquals( + c.getHelloWorldUtf8Bytes(), downloadedBytes); }))) .build()); } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITMultipartUploadClientTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITMultipartUploadClientTest.java index 1b60312750f9..7b963498c649 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITMultipartUploadClientTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITMultipartUploadClientTest.java @@ -402,41 +402,44 @@ private List parallelUpload( throws ExecutionException, InterruptedException { int numThreads = Runtime.getRuntime().availableProcessors(); ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List> futures = new ArrayList<>(); - - long numParts = (objectSize + partSize - 1) / partSize; - - for (int i = 0; i < numParts; i++) { - final int partNumber = i + 1; - final long offset = (long) i * partSize; - final long len = Math.min(partSize, objectSize - offset); - - Callable uploadTask = - () -> { - ByteBuffer partBuffer = ByteBuffer.allocate((int) len); - try (FileChannel fileChannel = FileChannel.open(localFile, StandardOpenOption.READ)) { - fileChannel.read(partBuffer, offset); - } - partBuffer.flip(); - RequestBody partBody = RequestBody.of(partBuffer); - UploadPartResponse uploadPartResponse = - uploadPart(info, uploadId, partNumber, partBody); - return CompletedPart.builder() - .partNumber(partNumber) - .eTag(uploadPartResponse.eTag()) - .build(); - }; - futures.add(executor.submit(uploadTask)); - } + try { + List> futures = new ArrayList<>(); + + long numParts = (objectSize + partSize - 1) / partSize; + + for (int i = 0; i < numParts; i++) { + final int partNumber = i + 1; + final long offset = (long) i * partSize; + final long len = Math.min(partSize, objectSize - offset); + + Callable uploadTask = + () -> { + ByteBuffer partBuffer = ByteBuffer.allocate((int) len); + try (FileChannel fileChannel = FileChannel.open(localFile, StandardOpenOption.READ)) { + fileChannel.read(partBuffer, offset); + } + partBuffer.flip(); + RequestBody partBody = RequestBody.of(partBuffer); + UploadPartResponse uploadPartResponse = + uploadPart(info, uploadId, partNumber, partBody); + return CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadPartResponse.eTag()) + .build(); + }; + futures.add(executor.submit(uploadTask)); + } - List completedParts = new ArrayList<>(); - for (Future future : futures) { - completedParts.add(future.get()); - } - executor.shutdown(); + List completedParts = new ArrayList<>(); + for (Future future : futures) { + completedParts.add(future.get()); + } - completedParts.sort(Comparator.comparingInt(CompletedPart::partNumber)); - return completedParts; + completedParts.sort(Comparator.comparingInt(CompletedPart::partNumber)); + return completedParts; + } finally { + executor.shutdownNow(); + } } private void verifyContents(BlobInfo info, Path expectedFile) throws IOException {