From 1a79215e1c5e8bccf3884c98e82ce2b75bc93d0f Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 13:16:22 -0600 Subject: [PATCH 01/17] add `writeChunk` override implementation --- .../runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt index 53be4b0eb..8dfda77cb 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt @@ -28,6 +28,7 @@ class SdkStreamResponseHandlerTest { override fun activate() {} override fun close() { closed = true } override fun incrementWindow(size: Int) {} + override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) {} } private class MockHttpClientConnection : HttpClientConnection { From 6afcee817c117358f68e6b28d8a082761385a304 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 13:16:32 -0600 Subject: [PATCH 02/17] bump CRT version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 77dd8e61c..2c6e732f6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -44,4 +44,4 @@ kotlinLoggingVersion=2.1.21 slf4jVersion=1.7.36 # crt -crtKotlinVersion=0.6.6 \ No newline at end of file +crtKotlinVersion=0.6.7-SNAPSHOT \ No newline at end of file From 7a15e064d7beb3c40dd78458f765d35dd6b5aace Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 13:23:03 -0600 Subject: [PATCH 03/17] special-case `aws-chunked` requests --- .../runtime/http/engine/crt/CrtHttpEngine.kt | 49 +++++++++++++++++++ .../runtime/http/engine/crt/RequestUtil.kt | 18 +++++-- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index dcc17dab5..4b7b0a9d0 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -10,6 +10,7 @@ import aws.sdk.kotlin.crt.io.* import aws.smithy.kotlin.runtime.ClientException import aws.smithy.kotlin.runtime.client.ExecutionContext import aws.smithy.kotlin.runtime.crt.SdkDefaultIO +import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase import aws.smithy.kotlin.runtime.http.engine.ProxyConfig @@ -17,6 +18,7 @@ import aws.smithy.kotlin.runtime.http.engine.callContext import aws.smithy.kotlin.runtime.http.operation.getLogger import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.response.HttpCall +import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.logging.Logger import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.job @@ -100,6 +102,53 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE val stream = conn.makeRequest(engineRequest, respHandler) stream.activate() + if (request.isAwsChunked) { + when (request.body) { + is HttpBody.SourceContent -> { + val source = (request.body as HttpBody.SourceContent).readFrom() + + val bytesToRead = 65536L + val buffer = SdkBuffer() + + while (true) { + val bytesRead = source.read(buffer, bytesToRead) + check(bytesRead != -1L) { "unexpected exhaustion of source" } + + if (bytesRead < bytesToRead) { + // we've read fewer bytes than expected. try to read again. + val nextBytesRead = source.read(buffer, bytesToRead) + + // if we read -1, that means we've truly exhausted the underlying source + if (nextBytesRead == -1L) { + stream.writeChunk(buffer.readByteArray(bytesRead), true) + break + } + + stream.writeChunk(buffer.readByteArray(bytesRead + nextBytesRead), isFinalChunk = nextBytesRead < bytesToRead) + + // if we read fewer bytes than expected _again_, that means we just fully consumed the final chunk + if (nextBytesRead < bytesToRead) { break } + } else { + // if we read some more bytes, that means there are still more chunks available + stream.writeChunk(buffer.readByteArray(bytesRead), false) + } + } + } + is HttpBody.ChannelContent -> { + val chan = (request.body as HttpBody.ChannelContent).readFrom() + val bytesToRead = 65536L + val buffer = SdkBuffer() + + while (!chan.isClosedForRead) { + val bytesRead = chan.read(buffer, bytesToRead) + check(bytesRead != -1L) { "unexpected exhaustion of channel" } + stream.writeChunk(buffer.readByteArray(bytesRead), isFinalChunk = chan.isClosedForRead) + } + } + else -> {} + } + } + val resp = respHandler.waitForResponse() return HttpCall(request, resp, reqTime, Instant.now(), callContext) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index 2359100cb..e24d537ad 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -34,17 +34,19 @@ internal val HttpRequest.uri: Uri internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest { val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } - val bodyStream = when (body) { - is HttpBody.Empty -> null - is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) - is HttpBody.SourceContent -> { + val bodyStream = when { + body is HttpBody.Empty -> null + body is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) + body is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) + body is HttpBody.SourceContent && this.isAwsChunked -> null // aws-chunked bodies must be null + body is HttpBody.SourceContent -> { val source = body.readFrom() callContext.job.invokeOnCompletion { source.close() } SdkSourceBodyStream(source) } + else -> null } val crtHeaders = HeadersBuilder() @@ -61,3 +63,9 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko return aws.sdk.kotlin.crt.http.HttpRequest(method.name, url.encodedPath, crtHeaders.build(), bodyStream) } + +/** + * @return whether this HttpRequest is an aws-chunked request. + * Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`. + */ +internal val HttpRequest.isAwsChunked: Boolean get() = this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true From e728eb35906d06b28e5cb4d0432166bd61beb77f Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 15:18:04 -0600 Subject: [PATCH 04/17] fix edge case and simplify `SourceContent` logic --- .../runtime/http/engine/crt/CrtHttpEngine.kt | 39 +++++++------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 4b7b0a9d0..733ae7506 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -19,6 +19,7 @@ import aws.smithy.kotlin.runtime.http.operation.getLogger import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.response.HttpCall import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.logging.Logger import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.job @@ -103,40 +104,28 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE stream.activate() if (request.isAwsChunked) { + val bytesToRead = 65536L + when (request.body) { is HttpBody.SourceContent -> { val source = (request.body as HttpBody.SourceContent).readFrom() - - val bytesToRead = 65536L - val buffer = SdkBuffer() + val nextBuffer = SdkBuffer() + var buffer = SdkBuffer() while (true) { - val bytesRead = source.read(buffer, bytesToRead) - check(bytesRead != -1L) { "unexpected exhaustion of source" } - - if (bytesRead < bytesToRead) { - // we've read fewer bytes than expected. try to read again. - val nextBytesRead = source.read(buffer, bytesToRead) - - // if we read -1, that means we've truly exhausted the underlying source - if (nextBytesRead == -1L) { - stream.writeChunk(buffer.readByteArray(bytesRead), true) - break - } - - stream.writeChunk(buffer.readByteArray(bytesRead + nextBytesRead), isFinalChunk = nextBytesRead < bytesToRead) - - // if we read fewer bytes than expected _again_, that means we just fully consumed the final chunk - if (nextBytesRead < bytesToRead) { break } - } else { - // if we read some more bytes, that means there are still more chunks available - stream.writeChunk(buffer.readByteArray(bytesRead), false) - } + source.read(buffer, bytesToRead) + + // read another set of bytes into a second buffer -- this is how we can "peek" to decide if this is the final chunk or not + val isFinalChunk = source.read(nextBuffer, bytesToRead) == -1L + + stream.writeChunk(buffer.readToByteArray(), isFinalChunk) + + if (isFinalChunk) { break } + else { buffer = nextBuffer } } } is HttpBody.ChannelContent -> { val chan = (request.body as HttpBody.ChannelContent).readFrom() - val bytesToRead = 65536L val buffer = SdkBuffer() while (!chan.isClosedForRead) { From cde19cfe9f198478d27494ebc535f3ab799ff719 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 15:45:42 -0600 Subject: [PATCH 05/17] simplify CRT request conversion logic --- .../smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index e24d537ad..f40e94ecc 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -35,10 +35,9 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } val bodyStream = when { - body is HttpBody.Empty -> null + body is HttpBody.Empty || this.isAwsChunked -> null body is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) body is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) - body is HttpBody.SourceContent && this.isAwsChunked -> null // aws-chunked bodies must be null body is HttpBody.SourceContent -> { val source = body.readFrom() callContext.job.invokeOnCompletion { @@ -68,4 +67,5 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko * @return whether this HttpRequest is an aws-chunked request. * Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`. */ -internal val HttpRequest.isAwsChunked: Boolean get() = this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true +internal val HttpRequest.isAwsChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && + this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true From 69758ae484c352c53e95982107370c03a9264530 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 15:47:14 -0600 Subject: [PATCH 06/17] add changelog --- .changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json diff --git a/.changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json b/.changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json new file mode 100644 index 000000000..43a51c755 --- /dev/null +++ b/.changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json @@ -0,0 +1,8 @@ +{ + "id": "f0ec11be-3875-498a-8e33-74c49416f3b8", + "type": "bugfix", + "description": "Fix `aws-chunked` requests in the CRT HTTP engine", + "issues": [ + "https://github.com/awslabs/smithy-kotlin/issues/759" + ] +} \ No newline at end of file From 88b606b998f18eb13d0c816bcc90aa40b7645182 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 15:48:21 -0600 Subject: [PATCH 07/17] ktlint --- .../aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt | 3 +-- .../aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 733ae7506..5d761ada2 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -120,8 +120,7 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE stream.writeChunk(buffer.readToByteArray(), isFinalChunk) - if (isFinalChunk) { break } - else { buffer = nextBuffer } + if (isFinalChunk) break else buffer = nextBuffer } } is HttpBody.ChannelContent -> { diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index f40e94ecc..a2016f828 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -68,4 +68,4 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko * Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`. */ internal val HttpRequest.isAwsChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && - this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true + this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true From 8d079840134723da48442f15356e379b0bf1efe0 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 16:10:05 -0600 Subject: [PATCH 08/17] Add tests --- .../http/engine/crt/RequestConversionTest.kt | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt index afbccc720..23eacb377 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt @@ -11,12 +11,12 @@ import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.content.ByteArrayContent import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.io.SdkSource +import aws.smithy.kotlin.runtime.io.source import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlin.coroutines.EmptyCoroutineContext -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse +import kotlin.test.* class RequestConversionTest { private fun byteStreamFromContents(contents: String): ByteStream = @@ -103,4 +103,45 @@ class RequestConversionTest { val crtRequest = request.toCrtRequest(testContext) assertFalse(crtRequest.headers.contains("Content-Length")) } + + @Test + fun testEngineSetsNullBodyForChannelContentChunkedRequests() { + val testData = ByteArray(1024) { 0 } + + val request = HttpRequest( + HttpMethod.POST, + Url.parse("https://test.aws.com?foo=bar"), + Headers.invoke { append("Transfer-Encoding", "chunked") }, + object : HttpBody.ChannelContent() { + override val contentLength: Long = testData.size.toLong() + override fun readFrom(): SdkByteReadChannel = SdkByteReadChannel(testData) + } + ) + + val testContext = EmptyCoroutineContext + Job() + val crtRequest = request.toCrtRequest(testContext) + assertNotNull(request.body) + assertNull(crtRequest.body) + } + + @Test + fun testEngineSetsNullBodyForSourceContentChunkedRequests() { + val testData = ByteArray(1024) { 0 } + + val request = HttpRequest( + HttpMethod.POST, + Url.parse("https://test.aws.com?foo=bar"), + Headers.invoke { append("Transfer-Encoding", "chunked") }, + object : HttpBody.SourceContent() { + override val contentLength: Long = testData.size.toLong() + override fun readFrom(): SdkSource = testData.source() + } + ) + + val testContext = EmptyCoroutineContext + Job() + val crtRequest = request.toCrtRequest(testContext) + assertNotNull(request.body) + assertNull(crtRequest.body) + } + } From 93c12a63a2a1f33e2b0a099b95b25d7c24e4ad42 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 16:10:21 -0600 Subject: [PATCH 09/17] rename `isAwsChunked` to `isChunked` --- .../kotlin/runtime/http/engine/crt/CrtHttpEngine.kt | 2 +- .../smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 5d761ada2..09f1e5730 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -103,7 +103,7 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE val stream = conn.makeRequest(engineRequest, respHandler) stream.activate() - if (request.isAwsChunked) { + if (request.isChunked) { val bytesToRead = 65536L when (request.body) { diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index a2016f828..2d48d0082 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -35,7 +35,7 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } val bodyStream = when { - body is HttpBody.Empty || this.isAwsChunked -> null + body is HttpBody.Empty || this.isChunked -> null body is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) body is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) body is HttpBody.SourceContent -> { @@ -64,8 +64,9 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko } /** - * @return whether this HttpRequest is an aws-chunked request. - * Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`. + * @return whether this HttpRequest is a chunked request. + * Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`, + * and the body is either [HttpBody.SourceContent] or [HttpBody.ChannelContent]. */ -internal val HttpRequest.isAwsChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && +internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true From 8306147a93e9afd7e3ae4c0324ffd23539750d18 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Fri, 9 Dec 2022 16:11:43 -0600 Subject: [PATCH 10/17] ktlint --- .../kotlin/runtime/http/engine/crt/RequestConversionTest.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt index 23eacb377..73a163eb0 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt @@ -115,7 +115,7 @@ class RequestConversionTest { object : HttpBody.ChannelContent() { override val contentLength: Long = testData.size.toLong() override fun readFrom(): SdkByteReadChannel = SdkByteReadChannel(testData) - } + }, ) val testContext = EmptyCoroutineContext + Job() @@ -135,7 +135,7 @@ class RequestConversionTest { object : HttpBody.SourceContent() { override val contentLength: Long = testData.size.toLong() override fun readFrom(): SdkSource = testData.source() - } + }, ) val testContext = EmptyCoroutineContext + Job() @@ -143,5 +143,4 @@ class RequestConversionTest { assertNotNull(request.body) assertNull(crtRequest.body) } - } From f171fdc53906e984e5a14d7b8f5817b8c44deaee Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Sun, 11 Dec 2022 00:11:37 -0600 Subject: [PATCH 11/17] clean up `when` block --- .../kotlin/runtime/http/engine/crt/RequestUtil.kt | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index 2d48d0082..e1980430b 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -34,18 +34,17 @@ internal val HttpRequest.uri: Uri internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest { val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } - val bodyStream = when { - body is HttpBody.Empty || this.isChunked -> null - body is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - body is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) - body is HttpBody.SourceContent -> { + val bodyStream = if (isChunked) null else when (body) { + is HttpBody.Empty -> null + is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) + is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) + is HttpBody.SourceContent -> { val source = body.readFrom() callContext.job.invokeOnCompletion { source.close() } SdkSourceBodyStream(source) } - else -> null } val crtHeaders = HeadersBuilder() From eff5c16bfe7d568b032fb3fe204213232f5ccea2 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 09:56:36 -0600 Subject: [PATCH 12/17] simplify `SourceContent` chunked upload --- .../runtime/http/engine/crt/CrtHttpEngine.kt | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 09f1e5730..1e76fada9 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -19,7 +19,7 @@ import aws.smithy.kotlin.runtime.http.operation.getLogger import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.response.HttpCall import aws.smithy.kotlin.runtime.io.SdkBuffer -import aws.smithy.kotlin.runtime.io.readToByteArray +import aws.smithy.kotlin.runtime.io.buffer import aws.smithy.kotlin.runtime.logging.Logger import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.job @@ -28,6 +28,7 @@ import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withTimeoutOrNull internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 +internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024 /** * [HttpClientEngine] based on the AWS Common Runtime HTTP client @@ -104,23 +105,14 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE stream.activate() if (request.isChunked) { - val bytesToRead = 65536L - when (request.body) { is HttpBody.SourceContent -> { val source = (request.body as HttpBody.SourceContent).readFrom() - val nextBuffer = SdkBuffer() - var buffer = SdkBuffer() - - while (true) { - source.read(buffer, bytesToRead) - - // read another set of bytes into a second buffer -- this is how we can "peek" to decide if this is the final chunk or not - val isFinalChunk = source.read(nextBuffer, bytesToRead) == -1L - - stream.writeChunk(buffer.readToByteArray(), isFinalChunk) + val bufferedSource = source.buffer() - if (isFinalChunk) break else buffer = nextBuffer + while (!bufferedSource.exhausted()) { + bufferedSource.request(CHUNK_BUFFER_SIZE) + stream.writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted()) } } is HttpBody.ChannelContent -> { @@ -128,7 +120,7 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE val buffer = SdkBuffer() while (!chan.isClosedForRead) { - val bytesRead = chan.read(buffer, bytesToRead) + val bytesRead = chan.read(buffer, CHUNK_BUFFER_SIZE) check(bytesRead != -1L) { "unexpected exhaustion of channel" } stream.writeChunk(buffer.readByteArray(bytesRead), isFinalChunk = chan.isClosedForRead) } From 4a9cf27c2095a4a7f1b66a42068c8fdc6a4cea79 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 11:47:49 -0600 Subject: [PATCH 13/17] refactor chunked writes to HttpStream extension function. use two buffers for ChannelContent --- .../runtime/http/engine/crt/CrtHttpEngine.kt | 55 ++++++++++++------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 1e76fada9..9d7b36763 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -20,11 +20,14 @@ import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.response.HttpCall import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.buffer +import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers +import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.logging.Logger import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.job import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 @@ -105,27 +108,8 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE stream.activate() if (request.isChunked) { - when (request.body) { - is HttpBody.SourceContent -> { - val source = (request.body as HttpBody.SourceContent).readFrom() - val bufferedSource = source.buffer() - - while (!bufferedSource.exhausted()) { - bufferedSource.request(CHUNK_BUFFER_SIZE) - stream.writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted()) - } - } - is HttpBody.ChannelContent -> { - val chan = (request.body as HttpBody.ChannelContent).readFrom() - val buffer = SdkBuffer() - - while (!chan.isClosedForRead) { - val bytesRead = chan.read(buffer, CHUNK_BUFFER_SIZE) - check(bytesRead != -1L) { "unexpected exhaustion of channel" } - stream.writeChunk(buffer.readByteArray(bytesRead), isFinalChunk = chan.isClosedForRead) - } - } - else -> {} + withContext(SdkDispatchers.IO) { + stream.sendChunkedBody(request.body) } } @@ -160,3 +144,32 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE } } } + +internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { + when (body) { + is HttpBody.SourceContent -> { + val source = body.readFrom() + val bufferedSource = source.buffer() + + while (!bufferedSource.exhausted()) { + bufferedSource.request(CHUNK_BUFFER_SIZE) + this.writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted()) + } + } + is HttpBody.ChannelContent -> { + val chan = body.readFrom() + var buffer = SdkBuffer() + val nextBuffer = SdkBuffer() + + while (!chan.isClosedForRead) { + chan.read(buffer, CHUNK_BUFFER_SIZE) + + val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L + + this.writeChunk(buffer.readToByteArray(), isFinalChunk) + if (isFinalChunk) break else buffer = nextBuffer + } + } + else -> {} + } +} From e4f9f63a9020f72776a8767679f36e457f3b08ad Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 11:49:57 -0600 Subject: [PATCH 14/17] add tests --- .../http/engine/crt/SendChunkedBodyTest.kt | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt new file mode 100644 index 000000000..a0464db7f --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt @@ -0,0 +1,103 @@ +package aws.smithy.kotlin.runtime.http.engine.crt + +import aws.sdk.kotlin.crt.http.HttpStream +import aws.smithy.kotlin.runtime.http.toHttpBody +import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.io.readToByteArray +import aws.smithy.kotlin.runtime.io.source +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest +import kotlin.test.* + + +@OptIn(ExperimentalCoroutinesApi::class) +class SendChunkedBodyTest { + private class MockHttpStream(override val responseStatusCode: Int) : HttpStream { + var closed: Boolean = false + var numChunksWritten = 0 + override fun activate() {} + override fun close() { closed = true } + override fun incrementWindow(size: Int) {} + override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) { numChunksWritten += 1 } + } + + @Test + fun testSourceContent() = runTest { + val stream = MockHttpStream(200) + + val chunkedBytes = """ + 100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n + """.trimIndent().toByteArray() + + val source = chunkedBytes.source() + + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + + // source should be fully consumed with 1 chunk written + assertEquals(0, source.readToByteArray().size) + assertEquals(1, stream.numChunksWritten) + } + + @Test + fun testChannelContentMultipleChunks() = runTest { + val stream = MockHttpStream(200) + + val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt() + + val chunkedBytes = """ + ${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n + """.trimIndent().toByteArray() + + val source = chunkedBytes.source() + + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + + // source should be fully consumed + assertEquals(0, source.readToByteArray().size) + + // there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are + assertTrue(stream.numChunksWritten > 1) + } + + @Test + fun testChannelContent() = runTest { + val stream = MockHttpStream(200) + + val chunkedBytes = """ + 100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n + """.trimIndent().toByteArray() + + val channel = SdkByteReadChannel(chunkedBytes) + + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + + // channel should be fully consumed with 1 chunk written + assertEquals(0, channel.availableForRead) + assertTrue(channel.isClosedForRead) + assertEquals(1, stream.numChunksWritten) + } + + @Test + fun testSourceContentMultipleChunks() = runTest { + val stream = MockHttpStream(200) + + val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt() + + val chunkedBytes = """ + ${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n + """.trimIndent().toByteArray() + + val channel = SdkByteReadChannel(chunkedBytes) + + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + + // source should be fully consumed + assertEquals(0, channel.availableForRead) + assertTrue(channel.isClosedForRead) + + // there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are + assertTrue(stream.numChunksWritten > 1) + } + + +} \ No newline at end of file From d33223d55efa30423198845c5e3d1953931589de Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 11:51:08 -0600 Subject: [PATCH 15/17] ktlint --- .../runtime/http/engine/crt/SendChunkedBodyTest.kt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt index a0464db7f..265843c8b 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt @@ -1,3 +1,8 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.HttpStream @@ -9,7 +14,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import kotlin.test.* - @OptIn(ExperimentalCoroutinesApi::class) class SendChunkedBodyTest { private class MockHttpStream(override val responseStatusCode: Int) : HttpStream { @@ -98,6 +102,4 @@ class SendChunkedBodyTest { // there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are assertTrue(stream.numChunksWritten > 1) } - - -} \ No newline at end of file +} From 0df400febf7c333f9bb256752881ce343f13e0d5 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 13:32:05 -0600 Subject: [PATCH 16/17] move `sendChunkedBody` function. remove unnecessary usage of `this` --- .../runtime/http/engine/crt/CrtHttpEngine.kt | 33 ---------------- .../runtime/http/engine/crt/RequestUtil.kt | 39 ++++++++++++++++++- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 9d7b36763..e2f22185b 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -10,7 +10,6 @@ import aws.sdk.kotlin.crt.io.* import aws.smithy.kotlin.runtime.ClientException import aws.smithy.kotlin.runtime.client.ExecutionContext import aws.smithy.kotlin.runtime.crt.SdkDefaultIO -import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase import aws.smithy.kotlin.runtime.http.engine.ProxyConfig @@ -18,10 +17,7 @@ import aws.smithy.kotlin.runtime.http.engine.callContext import aws.smithy.kotlin.runtime.http.operation.getLogger import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.response.HttpCall -import aws.smithy.kotlin.runtime.io.SdkBuffer -import aws.smithy.kotlin.runtime.io.buffer import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers -import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.logging.Logger import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.job @@ -144,32 +140,3 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE } } } - -internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { - when (body) { - is HttpBody.SourceContent -> { - val source = body.readFrom() - val bufferedSource = source.buffer() - - while (!bufferedSource.exhausted()) { - bufferedSource.request(CHUNK_BUFFER_SIZE) - this.writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted()) - } - } - is HttpBody.ChannelContent -> { - val chan = body.readFrom() - var buffer = SdkBuffer() - val nextBuffer = SdkBuffer() - - while (!chan.isClosedForRead) { - chan.read(buffer, CHUNK_BUFFER_SIZE) - - val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L - - this.writeChunk(buffer.readToByteArray(), isFinalChunk) - if (isFinalChunk) break else buffer = nextBuffer - } - } - else -> {} - } -} diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index e1980430b..d17061a14 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -7,6 +7,7 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.HeadersBuilder import aws.sdk.kotlin.crt.http.HttpRequestBodyStream +import aws.sdk.kotlin.crt.http.HttpStream import aws.sdk.kotlin.crt.io.Protocol import aws.sdk.kotlin.crt.io.Uri import aws.sdk.kotlin.crt.io.UserInfo @@ -14,6 +15,9 @@ import aws.smithy.kotlin.runtime.crt.ReadChannelBodyStream import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.request.HttpRequest +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.buffer +import aws.smithy.kotlin.runtime.io.readToByteArray import kotlinx.coroutines.job import kotlin.coroutines.CoroutineContext @@ -68,4 +72,37 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko * and the body is either [HttpBody.SourceContent] or [HttpBody.ChannelContent]. */ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && - this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true + headers.contains("Transfer-Encoding", "chunked") + +/** + * Send a chunked body using the CRT writeChunk bindings. + * @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent] + */ +internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { + when (body) { + is HttpBody.SourceContent -> { + val source = body.readFrom() + val bufferedSource = source.buffer() + + while (!bufferedSource.exhausted()) { + bufferedSource.request(CHUNK_BUFFER_SIZE) + writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted()) + } + } + is HttpBody.ChannelContent -> { + val chan = body.readFrom() + var buffer = SdkBuffer() + val nextBuffer = SdkBuffer() + + while (!chan.isClosedForRead) { + chan.read(buffer, CHUNK_BUFFER_SIZE) + + val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L + + writeChunk(buffer.readToByteArray(), isFinalChunk) + if (isFinalChunk) break else buffer = nextBuffer + } + } + else -> error("sendChunkedBody should not be called for non-chunked body types") + } +} From 54f70c8dd1fd0bfb40dd06af2a7af36fc55b6437 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Mon, 12 Dec 2022 13:48:52 -0600 Subject: [PATCH 17/17] handle case when channel is exhausted but initial chunk has not been written --- .../aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index d17061a14..1d2375421 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/common/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -93,14 +93,17 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { val chan = body.readFrom() var buffer = SdkBuffer() val nextBuffer = SdkBuffer() + var sentFirstChunk = false while (!chan.isClosedForRead) { - chan.read(buffer, CHUNK_BUFFER_SIZE) + val bytesRead = chan.read(buffer, CHUNK_BUFFER_SIZE) + if (!sentFirstChunk && bytesRead == -1L) { throw RuntimeException("CRT does not support empty chunked bodies.") } val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L writeChunk(buffer.readToByteArray(), isFinalChunk) if (isFinalChunk) break else buffer = nextBuffer + sentFirstChunk = true } } else -> error("sendChunkedBody should not be called for non-chunked body types")