diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 43270a1aa..112d19c53 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; final class GapicUnbufferedReadableByteChannel implements UnbufferedReadableByteChannel, ScatteringByteChannel { @@ -258,6 +259,11 @@ ApiFuture getResult() { private void ensureStreamOpen() { if (readObjectObserver == null) { + java.lang.Object peek = queue.peek(); + if (peek instanceof Throwable || peek == EOF_MARKER) { + // If our queue has an error or EOF, do not send another request + return; + } readObjectObserver = Retrying.run( retryingDeps, @@ -326,13 +332,15 @@ protected void onResponseImpl(ReadObjectResponse response) { @Override protected void onErrorImpl(Throwable t) { - open.setException(t); - if (!alg.shouldRetry(t, null)) { - result.setException(StorageException.coalesce(t)); - } if (t instanceof CancellationException) { cancellation.set(t); } + if (!open.isDone()) { + open.setException(t); + if (!alg.shouldRetry(t, null)) { + result.setException(StorageException.coalesce(t)); + } + } try { queue.offer(t); } catch (InterruptedException e) { @@ -369,6 +377,11 @@ public boolean nonEmpty() { return !queue.isEmpty(); } + @Nullable + public T peek() { + return queue.peek(); + } + @NonNull public T poll() throws InterruptedException { return queue.take(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 35ba536ae..6dff8996e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -723,8 +723,7 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption.. public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); ReadObjectRequest request = getReadObjectRequest(blob, opts); - Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request)); - GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes); + GrpcCallContext grpcCallContext = Retrying.newCallContext(); return new GrpcBlobReadChannel( storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), @@ -1708,10 +1707,7 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts); - Set codes = - resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest)); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes)); + GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(Retrying.newCallContext()); return ResumableMedia.gapic() .read() .byteChannel(