-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(rt): add special-casing for CrtHttpEngine aws-chunked
requests
#760
Conversation
@@ -100,6 +103,40 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE | |||
val stream = conn.makeRequest(engineRequest, respHandler) | |||
stream.activate() | |||
|
|||
if (request.isChunked) { | |||
val bytesToRead = 65536L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why is this the right number? I suggest making this a file-level private const val
and adding a comment or KDoc describing how this value was derived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no "right" number here, it is just the size of the buffer used to move data from the aws-chunked
body to the CRT engine. I can clarify that and make it a private const val
.
val source = (request.body as HttpBody.SourceContent).readFrom() | ||
val nextBuffer = SdkBuffer() | ||
var buffer = SdkBuffer() | ||
|
||
while (true) { | ||
source.read(buffer, bytesToRead) | ||
|
||
// read another set of bytes into a second buffer -- this is how we can "peek" to decide if this is the final chunk or not | ||
val isFinalChunk = source.read(nextBuffer, bytesToRead) == -1L | ||
|
||
stream.writeChunk(buffer.readToByteArray(), isFinalChunk) | ||
|
||
if (isFinalChunk) break else buffer = nextBuffer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Am I reading this correctly that the first writeChunk
will get 64K, subsequent writeChunk
s will get 128K (read, swap, read again), and the final writeChunk
will get whatever's left? If so, does the initial chunk's smaller size matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, and the sizes are all arbitrary. We should select a size that we are comfortable loading into memory all-at-once, and I think 128K is fine.
val source = body.readFrom() | ||
callContext.job.invokeOnCompletion { | ||
source.close() | ||
} | ||
SdkSourceBodyStream(source) | ||
} | ||
else -> null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What could else
be if not one of the above cases? Should the stream be null
or should we actually throw an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably throw an error. I wish there was a better way to format this when
block, ideally it would enumerate all the HttpBody
types and nothing else, but I needed access to the HttpRequest
to call isChunked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should throw an error on the else
clause now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored to get rid of the dangling else
:
val bodyStream = if (isChunked) null else when (body) {
is HttpBody.Empty -> null
...
}
@@ -100,6 +103,40 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE | |||
val stream = conn.makeRequest(engineRequest, respHandler) | |||
stream.activate() | |||
|
|||
if (request.isChunked) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like it belongs here, this chunked send logic should probably be moved to it's own function. (making it's own function and taking in a Stream
will also allow you to unit test it independently)
question: This whole function blocks the thread until the entire chunked body is sent, is that the way CRT expects this?
* makeRequest()
-> stream.activate()
-> send entire chunked body
?
fix: Assuming this is correct to block until the body is sent we should move the entirety of this work to SdkDispatchers.IO
stream.activate()
if (request.isChunked) {
withContext(SdkDispatchers.IO) {
sendChunkedBody(...)
}
}
...
internal fun sendChunkedBody(...) {
// your new code here
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment/no-op: This is unfortunate that toCrtRequest()
doesn't actually do the complete conversion anymore. I suppose this is more CRTs fault than anything because of how they've structured the APIs. I don't have a better suggestion at the moment but this feels "wrong" that we call makeRequest
and then have to separately check for chunked bodies and handle them differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline -- since this writeChunk
method is only intended for HTTP1.1, we do not have to consider doing this in a non-blocking manner because there are no duplex streams.
I updated to use SdkDispatchers.IO, refactored to its own function and added some tests for it.
|
||
while (!chan.isClosedForRead) { | ||
val bytesRead = chan.read(buffer, bytesToRead) | ||
check(bytesRead != -1L) { "unexpected exhaustion of channel" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix: This isn't right, we could check isClosedForRead
and then the channel could be closed between reading the boolean and when we next call read()
. In other words the bytes read could absolutely be -1
in that case and be valid.
question: Can CRT accept a 0 byte chunk with isFinalChunk
set to true
and do the right thing or does it require the last chunk and flag to come together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't send a 0-byte chunk as the final chunk, it has to have some data. This is because CRT does a check on the size of the chunk and skips processing if it's 0.
I see the issue, I will refactor it
} | ||
} | ||
is HttpBody.ChannelContent -> { | ||
val chan = (request.body as HttpBody.ChannelContent).readFrom() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast seems like it should be caught by compiler smart casting and not be required explicitly, might be able to remove and just do request.body.readFrom()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I tried it earlier and got this warning:
Smart cast to 'HttpBody.ChannelContent' is impossible, because 'request.body' is a public API property declared in different module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compiler can't know this is safe because body
is a val
which, while always read-only, may be non-constant.
|
||
when (request.body) { | ||
is HttpBody.SourceContent -> { | ||
val source = (request.body as HttpBody.SourceContent).readFrom() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about smart casting, might be worth checking if we can simplify to request.body.readFrom()
while (true) { | ||
source.read(buffer, bytesToRead) | ||
|
||
// read another set of bytes into a second buffer -- this is how we can "peek" to decide if this is the final chunk or not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix: This double buffering logic is unnecessary.
Instead you can structure this like:
val source = request.body.readFrom()
val bufferedSource = source.buffer()
while(true) {
bufferedSource.request(bytesToRead)
bufferedSource.buffer.readByteArray()
...
val isFinalChunk = bufferedSource.exhausted()
...
}
val source = body.readFrom() | ||
callContext.job.invokeOnCompletion { | ||
source.close() | ||
} | ||
SdkSourceBodyStream(source) | ||
} | ||
else -> null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should throw an error on the else
clause now
if (isFinalChunk) break else buffer = nextBuffer | ||
} | ||
} | ||
else -> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: else -> error("sendChunkedBody shouldn't be called for non-chunked body types")
or something like that.
while (!chan.isClosedForRead) { | ||
chan.read(buffer, CHUNK_BUFFER_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Is it possible on the initial iteration for isClosedForRead
to return false
but then the first read
to return -1
as a result of suspension?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes its possible, although wouldn't that be an empty body? We wouldn't be able to handle that if I understood CRT requirements on sending empty chunks this morning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is possible. It's not necessarily an empty body, it could be a nonempty body that got closed prematurely for some reason. CRT wouldn't be able to handle it either way though. Should I try to handle this case and throw an exception or just let it blow up? It seems like a very unlikely thing to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I advocate for additional handling of this case. When we have to debug a failure here, it'll be nice to have a clear indication what happened rather than a cryptic CRT exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I added some logic to handle this case where the initial chunk hasn't been sent but the channel is closed. Thanks!
while (!chan.isClosedForRead) { | ||
chan.read(buffer, CHUNK_BUFFER_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes its possible, although wouldn't that be an empty body? We wouldn't be able to handle that if I understood CRT requirements on sending empty chunks this morning.
* and the body is either [HttpBody.SourceContent] or [HttpBody.ChannelContent]. | ||
*/ | ||
internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.SourceContent || this.body is HttpBody.ChannelContent) && | ||
this.headers.getAll("Transfer-Encoding")?.contains("chunked") == true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: headers.contains("Transfer-Encoding", "chunked")
already checks if any header contains the value
nit 2: You can generally leave off qualifying this
on extension functions most of the time.
@@ -131,3 +144,32 @@ public class CrtHttpEngine(public val config: CrtHttpEngineConfig) : HttpClientE | |||
} | |||
} | |||
} | |||
|
|||
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This extension doesn't feel like it belongs in this file, maybe move to RequestUtil.kt
or something.
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
CRT requires
aws-chunked
requests to use a specific API instead of a normal streaming request body. This PR adds a special-case to the CRT engine, which will use this new API instead of sending anaws-chunked
body.Issue #
#759
Description of changes
Currently
aws-chunked
requests do not work when using the CrtHttpEngine.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.