Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rt): add special-casing for CrtHttpEngine aws-chunked requests #760

Merged
merged 17 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/f0ec11be-3875-498a-8e33-74c49416f3b8.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "f0ec11be-3875-498a-8e33-74c49416f3b8",
"type": "bugfix",
"description": "Fix `aws-chunked` requests in the CRT HTTP engine",
"issues": [
"https://github.com/awslabs/smithy-kotlin/issues/759"
]
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ kotlinLoggingVersion=2.1.21
slf4jVersion=1.7.36

# crt
crtKotlinVersion=0.6.6
crtKotlinVersion=0.6.7-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@ import aws.sdk.kotlin.crt.io.*
import aws.smithy.kotlin.runtime.ClientException
import aws.smithy.kotlin.runtime.client.ExecutionContext
import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.operation.getLogger
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.response.HttpCall
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.buffer
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.logging.Logger
import aws.smithy.kotlin.runtime.time.Instant
import kotlinx.coroutines.job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull

internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024

/**
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
Expand Down Expand Up @@ -100,6 +107,12 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
val stream = conn.makeRequest(engineRequest, respHandler)
stream.activate()

if (request.isChunked) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like it belongs here, this chunked send logic should probably be moved to it's own function. (making it's own function and taking in a Stream will also allow you to unit test it independently)

question: This whole function blocks the thread until the entire chunked body is sent, is that the way CRT expects this?
* makeRequest() -> stream.activate() -> send entire chunked body?

fix: Assuming this is correct to block until the body is sent we should move the entirety of this work to SdkDispatchers.IO

stream.activate()

if (request.isChunked) {
    withContext(SdkDispatchers.IO) {
        sendChunkedBody(...)
    }
}
...

internal fun sendChunkedBody(...) { 
    // your new code here
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment/no-op: This is unfortunate that toCrtRequest() doesn't actually do the complete conversion anymore. I suppose this is more CRTs fault than anything because of how they've structured the APIs. I don't have a better suggestion at the moment but this feels "wrong" that we call makeRequest and then have to separately check for chunked bodies and handle them differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline -- since this writeChunk method is only intended for HTTP1.1, we do not have to consider doing this in a non-blocking manner because there are no duplex streams.

I updated to use SdkDispatchers.IO, refactored to its own function and added some tests for it.

withContext(SdkDispatchers.IO) {
stream.sendChunkedBody(request.body)
}
}

val resp = respHandler.waitForResponse()

return HttpCall(request, resp, reqTime, Instant.now(), callContext)
Expand Down Expand Up @@ -131,3 +144,32 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE
}
}
}

internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This extension doesn't feel like it belongs in this file, maybe move to RequestUtil.kt or something.

when (body) {
is HttpBody.SourceContent -> {
val source = body.readFrom()
val bufferedSource = source.buffer()

while (!bufferedSource.exhausted()) {
bufferedSource.request(CHUNK_BUFFER_SIZE)
this.writeChunk(bufferedSource.buffer.readByteArray(), isFinalChunk = bufferedSource.exhausted())
}
}
is HttpBody.ChannelContent -> {
val chan = body.readFrom()
var buffer = SdkBuffer()
val nextBuffer = SdkBuffer()

while (!chan.isClosedForRead) {
chan.read(buffer, CHUNK_BUFFER_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is it possible on the initial iteration for isClosedForRead to return false but then the first read to return -1 as a result of suspension?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its possible, although wouldn't that be an empty body? We wouldn't be able to handle that if I understood CRT requirements on sending empty chunks this morning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is possible. It's not necessarily an empty body, it could be a nonempty body that got closed prematurely for some reason. CRT wouldn't be able to handle it either way though. Should I try to handle this case and throw an exception or just let it blow up? It seems like a very unlikely thing to happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I advocate for additional handling of this case. When we have to debug a failure here, it'll be nice to have a clear indication what happened rather than a cryptic CRT exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I added some logic to handle this case where the initial chunk hasn't been sent but the channel is closed. Thanks!


val isFinalChunk = chan.read(nextBuffer, CHUNK_BUFFER_SIZE) == -1L

this.writeChunk(buffer.readToByteArray(), isFinalChunk)
if (isFinalChunk) break else buffer = nextBuffer
}
}
else -> {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: else -> error("sendChunkedBody shouldn't be called for non-chunked body types") or something like that.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal val HttpRequest.uri: Uri
internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest {
val body = this.body
check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" }
val bodyStream = when (body) {
val bodyStream = if (isChunked) null else when (body) {
is HttpBody.Empty -> null
is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes())
is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext)
Expand All @@ -61,3 +61,11 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko

return aws.sdk.kotlin.crt.http.HttpRequest(method.name, url.encodedPath, crtHeaders.build(), bodyStream)
}

/**
* @return whether this HttpRequest is a chunked request.
* Specifically, this means return `true` if a request contains a `Transfer-Encoding` header with the value `chunked`,
* and the body is either [HttpBody.SourceContent] or [HttpBody.ChannelContent].
*/
internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) &&
this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: headers.contains("Transfer-Encoding", "chunked") already checks if any header contains the value

nit 2: You can generally leave off qualifying this on extension functions most of the time.

Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.content.ByteArrayContent
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.SdkSource
import aws.smithy.kotlin.runtime.io.source
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.*

class RequestConversionTest {
private fun byteStreamFromContents(contents: String): ByteStream =
Expand Down Expand Up @@ -103,4 +103,44 @@ class RequestConversionTest {
val crtRequest = request.toCrtRequest(testContext)
assertFalse(crtRequest.headers.contains("Content-Length"))
}

@Test
fun testEngineSetsNullBodyForChannelContentChunkedRequests() {
val testData = ByteArray(1024) { 0 }

val request = HttpRequest(
HttpMethod.POST,
Url.parse("https://test.aws.com?foo=bar"),
Headers.invoke { append("Transfer-Encoding", "chunked") },
object : HttpBody.ChannelContent() {
override val contentLength: Long = testData.size.toLong()
override fun readFrom(): SdkByteReadChannel = SdkByteReadChannel(testData)
},
)

val testContext = EmptyCoroutineContext + Job()
val crtRequest = request.toCrtRequest(testContext)
assertNotNull(request.body)
assertNull(crtRequest.body)
}

@Test
fun testEngineSetsNullBodyForSourceContentChunkedRequests() {
val testData = ByteArray(1024) { 0 }

val request = HttpRequest(
HttpMethod.POST,
Url.parse("https://test.aws.com?foo=bar"),
Headers.invoke { append("Transfer-Encoding", "chunked") },
object : HttpBody.SourceContent() {
override val contentLength: Long = testData.size.toLong()
override fun readFrom(): SdkSource = testData.source()
},
)

val testContext = EmptyCoroutineContext + Job()
val crtRequest = request.toCrtRequest(testContext)
assertNotNull(request.body)
assertNull(crtRequest.body)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class SdkStreamResponseHandlerTest {
override fun activate() {}
override fun close() { closed = true }
override fun incrementWindow(size: Int) {}
override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) {}
}

private class MockHttpClientConnection : HttpClientConnection {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package aws.smithy.kotlin.runtime.http.engine.crt

import aws.sdk.kotlin.crt.http.HttpStream
import aws.smithy.kotlin.runtime.http.toHttpBody
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.io.source
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import kotlin.test.*

@OptIn(ExperimentalCoroutinesApi::class)
class SendChunkedBodyTest {
private class MockHttpStream(override val responseStatusCode: Int) : HttpStream {
var closed: Boolean = false
var numChunksWritten = 0
override fun activate() {}
override fun close() { closed = true }
override fun incrementWindow(size: Int) {}
override fun writeChunk(chunkData: ByteArray, isFinalChunk: Boolean) { numChunksWritten += 1 }
}

@Test
fun testSourceContent() = runTest {
val stream = MockHttpStream(200)

val chunkedBytes = """
100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n
""".trimIndent().toByteArray()

val source = chunkedBytes.source()

stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()))

// source should be fully consumed with 1 chunk written
assertEquals(0, source.readToByteArray().size)
assertEquals(1, stream.numChunksWritten)
}

@Test
fun testChannelContentMultipleChunks() = runTest {
val stream = MockHttpStream(200)

val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt()

val chunkedBytes = """
${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n
""".trimIndent().toByteArray()

val source = chunkedBytes.source()

stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()))

// source should be fully consumed
assertEquals(0, source.readToByteArray().size)

// there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are
assertTrue(stream.numChunksWritten > 1)
}

@Test
fun testChannelContent() = runTest {
val stream = MockHttpStream(200)

val chunkedBytes = """
100;chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(256)}\r\n\r\n
""".trimIndent().toByteArray()

val channel = SdkByteReadChannel(chunkedBytes)

stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()))

// channel should be fully consumed with 1 chunk written
assertEquals(0, channel.availableForRead)
assertTrue(channel.isClosedForRead)
assertEquals(1, stream.numChunksWritten)
}

@Test
fun testSourceContentMultipleChunks() = runTest {
val stream = MockHttpStream(200)

val chunkSize = (CHUNK_BUFFER_SIZE * 5).toInt()

val chunkedBytes = """
${chunkSize.toString(16)};chunk-signature=${"0".repeat(64)}\r\n${"0".repeat(chunkSize)}\r\n\r\n
""".trimIndent().toByteArray()

val channel = SdkByteReadChannel(chunkedBytes)

stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()))

// source should be fully consumed
assertEquals(0, channel.availableForRead)
assertTrue(channel.isClosedForRead)

// there should definitely be more than 1 call to `writeChunk`, but in practice we don't care how many there are
assertTrue(stream.numChunksWritten > 1)
}
}