From 64c5ed83a290bb55847adb98263d635b3cb4f9ab Mon Sep 17 00:00:00 2001 From: SJZ <734609160@qq.com> Date: Tue, 25 Jul 2023 22:57:37 +0800 Subject: [PATCH] fix: S3OutputStream write/close checks should be thread-safe. (#721) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jason.Song-宋金泽 --- .../cloud/storage/s3fs/S3OutputStream.java | 43 +++++-- .../storage/s3fs/S3OutputStreamTest.java | 109 ++++++++++++++++++ 2 files changed, 143 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/carlspring/cloud/storage/s3fs/S3OutputStream.java b/src/main/java/org/carlspring/cloud/storage/s3fs/S3OutputStream.java index 33f3c402..fef162c8 100644 --- a/src/main/java/org/carlspring/cloud/storage/s3fs/S3OutputStream.java +++ b/src/main/java/org/carlspring/cloud/storage/s3fs/S3OutputStream.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -77,7 +78,7 @@ public final class S3OutputStream /** * Indicates if the stream has been closed. */ - private volatile boolean closed; + private volatile AtomicBoolean closed = new AtomicBoolean(false); /** * Internal buffer. May be {@code null} if no bytes are buffered. @@ -220,12 +221,24 @@ public void write(final int bytes) write(new byte[]{ (byte) bytes }); } + @Override + public void write(byte[] bytes) throws IOException + { + write(bytes, 0, bytes.length); + } + @Override public void write(final byte[] bytes, final int offset, final int length) throws IOException { + + if (closed.get()) + { + throw new StreamAlreadyClosedException(); + } + if ((offset < 0) || (offset > bytes.length) || (length < 0) || ((offset + length) > bytes.length) || ((offset + length) < 0)) { @@ -237,11 +250,6 @@ public void write(final byte[] bytes, return; } - if (closed) - { - throw new IOException("Already closed"); - } - synchronized (this) { if (uploadId != null && partETags.size() >= MAX_ALLOWED_UPLOAD_PARTS) @@ -267,11 +275,19 @@ public void write(final byte[] bytes, } } + /** + * @return True if the stream has been closed, false if the stream is still open. + */ + public boolean isClosed() + { + return this.closed.get(); + } + @Override public void close() throws IOException { - if (closed) + if (closed.get()) { return; } @@ -292,7 +308,7 @@ public void close() completeMultipartUpload(); } - closed = true; + closed.set(true); } } @@ -374,13 +390,14 @@ private void uploadPart(final long contentLength, { if (!success) { - closed = true; + closed.set(true); abortMultipartUpload(); } } if (partNumber >= MAX_ALLOWED_UPLOAD_PARTS) { + LOGGER.warn("Uploaded part is out of max allowed parts, stream closed."); close(); } } @@ -525,4 +542,12 @@ private String getValueFromMetadata(final String key) return null; } + + public static class StreamAlreadyClosedException extends IOException + { + public StreamAlreadyClosedException() { + super("Stream has already been closed."); + } + } + } diff --git a/src/test/java/org/carlspring/cloud/storage/s3fs/S3OutputStreamTest.java b/src/test/java/org/carlspring/cloud/storage/s3fs/S3OutputStreamTest.java index e480a66f..c165eda5 100644 --- a/src/test/java/org/carlspring/cloud/storage/s3fs/S3OutputStreamTest.java +++ b/src/test/java/org/carlspring/cloud/storage/s3fs/S3OutputStreamTest.java @@ -1,5 +1,6 @@ package org.carlspring.cloud.storage.s3fs; +import org.assertj.core.api.Assertions; import org.carlspring.cloud.storage.s3fs.util.S3ClientMock; import org.carlspring.cloud.storage.s3fs.util.S3MockFactory; import org.junit.jupiter.api.Test; @@ -22,10 +23,16 @@ import java.util.HashMap; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.UUID.randomUUID; +import static org.assertj.core.api.Assertions.assertThat; import static org.carlspring.cloud.storage.s3fs.S3OutputStream.MAX_ALLOWED_UPLOAD_PARTS; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; @@ -76,6 +83,108 @@ void openAndCloseProducesEmptyObject() assertThatBytesHaveBeenPut(client, data); } + @Test + void writeToClosedStreamShouldProduceExceptionThreadSafe() + throws ExecutionException, InterruptedException, IOException + { + //given + final String key = getTestBasePath() + "/" + randomUUID(); + final S3ClientMock client = S3MockFactory.getS3ClientMock(); + client.bucket(BUCKET_NAME).file(key); + final S3ObjectId objectId = S3ObjectId.builder().bucket(BUCKET_NAME).key(key).build(); + + final S3OutputStream outputStream = new S3OutputStream(client, objectId); + + // Simulate closing the outputStream from another thread. + Runnable closeStreamRunnable = () -> { + try + { + outputStream.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }; + closeStreamRunnable.run(); + + CountDownLatch count = new CountDownLatch(1); + AtomicBoolean alreadyClosedException = new AtomicBoolean(false); + Runnable runnable = () -> { + count.countDown(); + try + { + outputStream.write(new byte[0]); + } + catch (S3OutputStream.StreamAlreadyClosedException e) + { + alreadyClosedException.set(true); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }; + + CompletableFuture[] futures = new CompletableFuture[] { + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + }; + count.countDown(); + + CompletableFuture.allOf(futures).get(); + assertTrue(alreadyClosedException.get()); + } + + @Test + void closingStreamShouldBeThreadSafe() + throws ExecutionException, InterruptedException, IOException + { + //given + final String key = getTestBasePath() + "/" + randomUUID(); + final S3ClientMock client = S3MockFactory.getS3ClientMock(); + client.bucket(BUCKET_NAME).file(key); + final S3ObjectId objectId = S3ObjectId.builder().bucket(BUCKET_NAME).key(key).build(); + + final S3OutputStream outputStream = new S3OutputStream(client, objectId); + + // Simulate closing the outputStream from another thread. + Runnable closeStreamRunnable = () -> { + try + { + outputStream.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }; + closeStreamRunnable.run(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger counter = new AtomicInteger(); + Runnable runnable = () -> { + latch.countDown(); + if(outputStream.isClosed()) { + counter.incrementAndGet(); + } + }; + + CompletableFuture[] futures = new CompletableFuture[] { + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + CompletableFuture.runAsync(runnable), + }; + latch.countDown(); + + CompletableFuture.allOf(futures).get(); + assertThat(counter).hasValue(5); + } + @Test void zeroBytesWrittenProduceEmptyObject() throws IOException