Skip to content

Commit

Permalink
WASI Preview 2: rewrite streams and pollable implementation (bytecode…
Browse files Browse the repository at this point in the history
…alliance/wasmtime#6556)

* preview2: make everything but streams/io and poll/poll synchronous

* streams: get rid of as_any method, which is no longer used

* delete legacy sched and pollable concepts

* more code motion and renaming

* make tokio a workspace dep, because we need it directly in wasmtime-wasi

* HostPollable exists

* more fixes

* pollable can trap, and implement clock properly

* HostPollable is now a generator of futures

because we need to be able to poll a pollable many times

* explain various todo!s

* Synchronous version of the wasi-preview2-components tests

* Change with_tokio to accept the future as an argument

* Store futures in the PollOneoff struct instead, to avoid dropping them

* Remove TODO for HostOutputStream impl for WritePipe

* Implement pollable for ReadPipe

* Use a Notify when ReadPipe is ready

* wip

* wip

* Read/write pipe ends with tokio channels

* Empty reader/writer wrappers

* EmptyStream, and warning cleanup

* Wrapped reader/writer structs

* Rework stdio in terms of wrapped read/write

* Add MemoryOutputPipe and update tests

* Remove todo

* rewrite nearly everything

* implement the pipe stuff

* wibble

* fix MemoryOutputPipe just enough to make the tests compile

* Move the table iteration into a helper function

* AsyncFd stream implementation to fix stdin on unix

* Rename Wrapped{Read,Write} streams to Async{Read,Write}Stream

* Move async io wrappers into stream.rs

* Fix the sync tests

* fix test uses of pipes, juggle tokio context for stdin construction

* add some fixmes

* the future i named Never is defined in futures-util as pending

which is a better name

* i believe this is a correct implementation of one global stdin resource

* move unix stdin to its own file

* make most of the mods private

* fix build - we are skipping rust 1.70

due to llvm regressions in s390x and riscv64 which are fixed in 1.71 and
will not be backported

* preview1-in-preview2: use async funcs for io, and the async io interface

prtest:full

* windows stdin support

* done!

* table ext functions: fix tests

* tests: expect poll_oneoff_{files,stdio} to pass on all platforms

* export the bindings under wasmtime_wasi::preview2::bindings

rather than preview2::wasi.

and command moves to wasmtime_wasi::preview2::command as well.

* fix renaming of wasi to bindings in tests

* use block_in_place throughout filesystem

and move block_on and block_in_place to be pub crate at the root

* AsyncFdStream: ensure file is nonblocking

* tests: block_in_place requires multi-threaded runtime

* actually, use fcntl_setfl to make the asyncfd file nonblocking

* fix windows block_on

* docs, remove unnecessary methods

* more docs

* Add a workspace dependency on bytes-1.4

* Remove vectored stream operations

* Rework the read/write stream traits

* Add a size parameter to `read`, and switch to usize for traits

* Pipe through the bool -> stream-status change in wit

* Plumb stream-status through write operations in wit

* write host trait also gives streamstate

* hook new stream host read/write back up to the wit bindgen

* sketchy AsyncReadStream impl

* Fill out implementations for AsyncReadStream and AsyncWriteStream

* some reasonable read tests

* more

* first smoke test for AsyncWriteStream

* bunch of AsyncWriteStream tests

* half-baked idea that the output-stream interface will need a flush mechanism

* adapter: fixes for changes to stream wit

* fix new rust 1.71 warnings

* make stdin work on unix without using AsyncFdStream

inline the tokio docs example of how to impl AsyncRead for an AsyncFd,
except theres some "minor" changes because stdin doesnt impl Read on
&Stdin whereas tcpstream from the example does

* delete AsyncFdStream for now

it turns out to be kinda hard and we can always work on adding it back
in later.

* Implement some memory pipe operations, and move async wrappers to the pipe mod

* Make blocking_write actually block until everything is written

* Remove debug print

* Adapter stdio should use blocking write

Rust guests will panic if the write returns less than the number of
bytes sent with stdio.

* Clean up implementations of {blocking_}write_zeros and skip

* Remove debug macro usage

* Move EmptyStream to pipe, and split it into four variants

Use EmptyInputStream and SinkOutputStream as the defaults for stdin and
stdout/stderr respectively.

* Add a big warning about resource lifetime tracking in pollables

* Start working through changes to the filesystem implementation

* Remove todos in the filesystem implementation

* Avoid lifetime errors by moving blocking operations to File and Dir

* Fix more lifetime issues with `block`

* Finish filling out translation impl

* fix warnings

* we can likely eliminate block_in_place in the stdin implementations

* sync command uses sync filesystem, start of translation layer

* symc filesystem: all the trait boilerplate is in place

just need to finish the from impl boilerplate

* finish type conversion boilerplate

* Revert "half-baked idea that the output-stream interface will need a flush mechanism"

This reverts commit 3eb762e3330a7228318bfe01296483b52d0fdc16.

* cargo fmt

* test type fixes

* renames and comments

* refactor stream table internals so we can have a blocking variant...

* preview1 host adapter: stdout/stderr use blocking_write here too

* filesystem streams are blocking now

* fixes

* satisfy cargo doc

* cargo vet: dep upgrades taken care of by imports from mozilla

* unix stdio: eliminate block_in_place

* replace private in_tokio with spawn, since its only used for spawning

* comments

* worker thread stdin implementation can be tested on linux, i guess

and start outlining a test plan

* eliminate tokio boilerplate - no longer using tokios lock

* rename our private block_on to in_tokio

* fill in missing file input skip

* code review: fix MemoryInputPipe. Closed status is always available immediately.

* code review: empty input stream is not essential, closed input stream is a better fi for stdin

* code review: unreachable

* turn worker thread (windows) stdin off

* expect preview2-based poll_oneoff_stdio to fail on windows

* command directory_list test: no need to inherit stdin

* preview1 in preview2: turn off inherit_stdio except for poll_oneoff_stdio

* wasi-preview2-components: apparently inherit_stdio was on everywhere here as well. turn it off

except for poll_oneoff_stdio

* extend timeout for riscv64 i suppose

---------

Co-authored-by: Trevor Elliott <[email protected]>
  • Loading branch information
2 people authored and rvolosatovs committed Aug 24, 2023
1 parent 00d4a4c commit cd72925
Showing 1 changed file with 53 additions and 20 deletions.
73 changes: 53 additions & 20 deletions streams.wit
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,22 @@ interface streams {
/// doesn't provide any additional information.
record stream-error {}

/// Streams provide a sequence of data and then end; once they end, they
/// no longer provide any further data.
///
/// For example, a stream reading from a file ends when the stream reaches
/// the end of the file. For another example, a stream reading from a
/// socket ends when the socket is closed.
enum stream-status {
/// The stream is open and may produce further data.
open,
/// When reading, this indicates that the stream will not produce
/// further data.
/// When writing, this indicates that the stream will no longer be read.
/// Further writes are still permitted.
ended,
}

/// An input bytestream. In the future, this will be replaced by handle
/// types.
///
Expand All @@ -31,12 +47,12 @@ interface streams {
/// This [represents a resource](https://github.com/WebAssembly/WASI/blob/main/docs/WitInWasi.md#Resources).
type input-stream = u32

/// Read bytes from a stream.
/// Perform a non-blocking read from the stream.
///
/// This function returns a list of bytes containing the data that was
/// read, along with a bool which, when true, indicates that the end of the
/// stream was reached. The returned list will contain up to `len` bytes; it
/// may return fewer than requested, but not more.
/// read, along with a `stream-status` which, indicates whether further
/// reads are expected to produce data. The returned list will contain up to
/// `len` bytes; it may return fewer than requested, but not more.
///
/// Once a stream has reached the end, subsequent calls to read or
/// `skip` will always report end-of-stream rather than producing more
Expand All @@ -49,11 +65,17 @@ interface streams {
/// The len here is a `u64`, but some callees may not be able to allocate
/// a buffer as large as that would imply.
/// FIXME: describe what happens if allocation fails.
///
/// When the returned `stream-status` is `open`, the length of the returned
/// value may be less than `len`. When an empty list is returned, this
/// indicates that no more bytes were available from the stream at that
/// time. In that case the subscribe-to-input-stream pollable will indicate
/// when additional bytes are available for reading.
read: func(
this: input-stream,
/// The maximum number of bytes to read
len: u64
) -> result<tuple<list<u8>, bool>, stream-error>
) -> result<tuple<list<u8>, stream-status>, stream-error>

/// Read bytes from a stream, with blocking.
///
Expand All @@ -63,7 +85,7 @@ interface streams {
this: input-stream,
/// The maximum number of bytes to read
len: u64
) -> result<tuple<list<u8>, bool>, stream-error>
) -> result<tuple<list<u8>, stream-status>, stream-error>

/// Skip bytes from a stream.
///
Expand All @@ -81,7 +103,7 @@ interface streams {
this: input-stream,
/// The maximum number of bytes to skip.
len: u64,
) -> result<tuple<u64, bool>, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Skip bytes from a stream, with blocking.
///
Expand All @@ -91,7 +113,7 @@ interface streams {
this: input-stream,
/// The maximum number of bytes to skip.
len: u64,
) -> result<tuple<u64, bool>, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Create a `pollable` which will resolve once either the specified stream
/// has bytes available to read or the other end of the stream has been
Expand All @@ -113,23 +135,33 @@ interface streams {
/// always return promptly, after the number of bytes that can be written
/// promptly, which could even be zero. To wait for the stream to be ready to
/// accept data, the `subscribe-to-output-stream` function to obtain a
/// `pollable` which can be polled for using `wasi_poll`.
/// `pollable` which can be polled for using `wasi:poll`.
///
/// And at present, it is a `u32` instead of being an actual handle, until
/// the wit-bindgen implementation of handles and resources is ready.
///
/// This [represents a resource](https://github.com/WebAssembly/WASI/blob/main/docs/WitInWasi.md#Resources).
type output-stream = u32

/// Write bytes to a stream.
/// Perform a non-blocking write of bytes to a stream.
///
/// This function returns a `u64` and a `stream-status`. The `u64` indicates
/// the number of bytes from `buf` that were written, which may be less than
/// the length of `buf`. The `stream-status` indicates if further writes to
/// the stream are expected to be read.
///
/// This function returns a `u64` indicating the number of bytes from
/// `buf` that were written; it may be less than the full list.
/// When the returned `stream-status` is `open`, the `u64` return value may
/// be less than the length of `buf`. This indicates that no more bytes may
/// be written to the stream promptly. In that case the
/// subscribe-to-output-stream pollable will indicate when additional bytes
/// may be promptly written.
///
/// TODO: document what happens when an empty list is written
write: func(
this: output-stream,
/// Data to write
buf: list<u8>
) -> result<u64, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Write bytes to a stream, with blocking.
///
Expand All @@ -139,7 +171,7 @@ interface streams {
this: output-stream,
/// Data to write
buf: list<u8>
) -> result<u64, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Write multiple zero bytes to a stream.
///
Expand All @@ -149,7 +181,7 @@ interface streams {
this: output-stream,
/// The number of zero bytes to write
len: u64
) -> result<u64, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Write multiple zero bytes to a stream, with blocking.
///
Expand All @@ -159,7 +191,7 @@ interface streams {
this: output-stream,
/// The number of zero bytes to write
len: u64
) -> result<u64, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Read from one stream and write to another.
///
Expand All @@ -174,7 +206,7 @@ interface streams {
src: input-stream,
/// The number of bytes to splice
len: u64,
) -> result<tuple<u64, bool>, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Read from one stream and write to another, with blocking.
///
Expand All @@ -186,7 +218,7 @@ interface streams {
src: input-stream,
/// The number of bytes to splice
len: u64,
) -> result<tuple<u64, bool>, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Forward the entire contents of an input stream to an output stream.
///
Expand All @@ -198,12 +230,13 @@ interface streams {
/// of the input stream is seen and all the data has been written to
/// the output stream.
///
/// This function returns the number of bytes transferred.
/// This function returns the number of bytes transferred, and the status of
/// the output stream.
forward: func(
this: output-stream,
/// The stream to read from
src: input-stream
) -> result<u64, stream-error>
) -> result<tuple<u64, stream-status>, stream-error>

/// Create a `pollable` which will resolve once either the specified stream
/// is ready to accept bytes or the other end of the stream has been closed.
Expand Down

0 comments on commit cd72925

Please sign in to comment.