Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapped ByteStreams fail when used as streaming request bodies #613

Closed
ianbotsf opened this issue Mar 28, 2022 · 3 comments · Fixed by #620
Closed

Wrapped ByteStreams fail when used as streaming request bodies #613

ianbotsf opened this issue Mar 28, 2022 · 3 comments · Fixed by #620
Assignees
Labels
bug This issue is a bug.

Comments

@ianbotsf
Copy link
Contributor

Describe the bug

Streaming request operations (e.g., S3 PutObject) require using ByteStream instances. Using ByteStream instances which return readers that directly wrap Ktor channels works correctly (e.g., ByteStream.fromFile(...)). Attempting to wrap those streams, even without adding any additional logic, fails. Specifically, the stream appears to hang and never complete signing.

Expected Behavior

Wrapped ByteStream instances should be usable as streaming request bodies.

Steps to Reproduce

The following example demonstrates the problem:

val file = File("...")
val underlyingStream = ByteStream.fromFile(file) as ByteStream.ReplayableStream

val wrappedStream = object : ByteStream.ReplayableStream() {
    override fun newReader(): SdkByteReadChannel {
        val underlyingReader = underlyingStream.newReader()
        return object : SdkByteReadChannel by underlyingReader
    }
}

val s3 = S3Client.fromEnvironment { }
s3.putObject {
    bucket = "..."
    key = "..."
    body = wrappedStream
}
@ianbotsf ianbotsf added the bug This issue is a bug. label Mar 28, 2022
@ianbotsf
Copy link
Contributor Author

As noted by @aajtodd the reason for the hang while signing is likely an incomplete implementation of the fallback case for stream reading. Specifically, the read channel needs to be read into the tmp buffer before writing out to dest.

Updating the code to this:

internal suspend fun SdkByteReadChannel.readAvailableFallback(dest: SdkByteBuffer, limit: Long): Long {
    if (availableForRead == 0) awaitContent()
    // channel was closed while waiting and no further content was made available
    if (availableForRead == 0 && isClosedForRead) return -1
    val tmp = ByteArray(minOf(availableForRead.toLong(), limit, Int.MAX_VALUE.toLong()).toInt())

    readFully(tmp) // *** NEW LINE ***

    dest.writeFully(tmp)
    return tmp.size.toLong()
}

Causes signing to complete successfully and sending to begin but not succeed. Now the read channel is getting cancelled somehow after reading a subset of the stream contents. A CancellationException is being caught in the Ktor proxy job:

java.util.concurrent.CancellationException: Channel has been cancelled
        at io.ktor.utils.io.ByteBufferChannel.cancel(ByteBufferChannel.kt:180)
        at io.ktor.utils.io.ByteReadChannelKt.cancel(ByteReadChannel.kt:231)
        at io.ktor.utils.io.jvm.javaio.InputAdapter.close(Blocking.kt:73)
        at okio.InputStreamSource.close(JvmOkio.kt:108)
        at kotlin.io.CloseableKt.closeFinally(Closeable.kt:60)
        at io.ktor.client.engine.okhttp.StreamRequestBody.writeTo(StreamRequestBody.kt:20)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:59)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
        at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:517)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

This exception appears to be thrown in ByteBufferChannel when cancelling a channel without a cause. The stack trace is murky from here but it appears that okhttp's StreamRequestBody is closing the channel via the use method. This implies to me that perhaps writeTo is intended to be terminal but is being invoked too early. Or some other underlying cancellation is happening but not bubbling up.

@aajtodd
Copy link
Contributor

aajtodd commented Apr 5, 2022

I was seeing a different issue but I suspect it's similar root cause after fixing the fallback implementation.

The wrapped stream still has to supply a contentLength to get proxied through correctly, e.g.

val wrappedStream = object : ByteStream.ReplayableStream() {
+    override val contentLength: Long = file.size.toLong()
     override fun newReader(): SdkByteReadChannel {
         val underlyingReader = underlyingStream.newReader()
         return object : SdkByteReadChannel by underlyingReader
     }
}

@ianbotsf can you verify adding an explicit content length to the wrapped stream resolves the issue you were seeing?

@ianbotsf
Copy link
Contributor Author

ianbotsf commented Apr 5, 2022

Yes it does, although it was not apparent that custom/proxied streams must provide a content length because the field is open (not abstract) and the default is null. Moreover, I believe there will be times when custom streams need to provide content with an unknown length (e.g., streaming from another source or building content on-the-fly) and we'll need some way to support that. Filing #621 to track that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants