-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rt): add InputStream adapter for ByteStream #945
Conversation
public fun SdkByteReadChannel.toInputStream(): InputStream = InputAdapter(this) | ||
|
||
private const val DEFAULT_READ_BYTES = 8192L | ||
private class InputAdapter(private val ch: SdkByteReadChannel) : InputStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we support mark
/ reset
? I think it's not supported by default (markSupported()
defaults to false
). It can probably be additive work if requested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd vote to make it additive based on customer feedback/requests. Anyone feel different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
} | ||
override fun read(b: ByteArray, off: Int, len: Int): Int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Missing line break
private fun readBlocking(): Long = | ||
runBlocking { | ||
ch.read(buffer, DEFAULT_READ_BYTES) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Looks like we only ever read DEFAULT_READ_BYTES
(8K) bytes at a time from the channel, even when the read
call may have had a len
higher than that. Given that runBlocking
may be expensive, is it worth passing the requested length through to the channel read? Or making the chunk size configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you feel that runBlocking
is expensive?
The len argument is documented as
len - the maximum number of bytes to read.
We could change it but I chose 8K because thats the underlying segment size used by okio. I'm not sure this will matter all that much in practice but if you disagree I'm happy to change it.
public fun SdkByteReadChannel.toInputStream(): InputStream = InputAdapter(this) | ||
|
||
private const val DEFAULT_READ_BYTES = 8192L | ||
private class InputAdapter(private val ch: SdkByteReadChannel) : InputStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: This adapter works via runBlocking
on every read
. That will work obvi but seems like it may be slow. Would it be better to spin up a background coroutine to handle reads that lives for the lifetime of the adapter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputStream.read
is a blocking call. runBlocking
is designed to bridge blocking and non-blocking worlds (it does so by blocking the current thread). Spinning up a coroutine will result in tying up two threads and/or more context switches to accomplish the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand (could be wrong, ofc) runBlocking
starts a new coroutine and blocks the current thread. Doing that on every 8KB chunk sounds slower than starting a new coroutine once. Sure, it'll still block on every read
call but the cost of setting up a new coroutine was already paid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR I'm not at all worried about the overhead of runBlocking
or starting a new coroutine.
runBlocking
does start a new coroutine and blocks the current thread. This is fine though, coroutines are extremely lightweight. The only benchmarking I can find is here which would suggest 140 byte allocation and 100 nanoseconds.
Introducing a background coroutine will likely introduce additional context switches and a more complicated implementation to synchronize the coroutine and blocking read
call. It also needs a CoroutineScope
to launch into which would also mean taking an additional parameter and dealing with cancellation, error propagation, etc.
class ByteStreamBufferInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.BYTE_ARRAY) | ||
class ByteStreamSourceStreamInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.SDK_SOURCE) | ||
class ByteStreamChannelSourceInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.SDK_CHANNEL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Nice abstraction.
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
Issue #
closes awslabs/aws-sdk-kotlin#617
Description of changes
Adds a conversion to go from
ByteStream
tojava.io.InputStream
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.