Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UnsubmittedRead and Link API #294

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ keywords = ["async", "fs", "io-uring"]
tokio = { version = "1.2", features = ["net", "rt", "sync"] }
slab = "0.4.2"
libc = "0.2.80"
io-uring = "0.6.0"
io-uring = { git = "https://github.com/ileixe/io-uring" }
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
pin-project-lite = "0.2.13"

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
4 changes: 2 additions & 2 deletions examples/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
{env, io},
};

use tokio_uring::fs::File;
use tokio_uring::{fs::File, Submit};

fn main() {
// The file to `cat` is passed as a CLI argument
Expand All @@ -29,7 +29,7 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let (res, b) = file.read_at(buf, pos).submit().await;
let n = res.unwrap();

if n == 0 {
Expand Down
4 changes: 2 additions & 2 deletions examples/mix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::env;

use tokio_uring::{fs::File, net::TcpListener};
use tokio_uring::{fs::File, net::TcpListener, Submit};

fn main() {
// The file to serve over TCP is passed as a CLI argument
Expand Down Expand Up @@ -34,7 +34,7 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let (res, b) = file.read_at(buf, pos).submit().await;
let n = res.unwrap();

if n == 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, net::SocketAddr};

use tokio_uring::net::TcpStream;
use tokio_uring::{net::TcpStream, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
2 changes: 1 addition & 1 deletion examples/unix_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::env;

use tokio_uring::net::UnixListener;
use tokio_uring::{net::UnixListener, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
2 changes: 1 addition & 1 deletion examples/unix_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::env;

use tokio_uring::net::UnixStream;
use tokio_uring::{net::UnixStream, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
1 change: 1 addition & 0 deletions examples/wrk-bench.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;
use std::rc::Rc;
use tokio::task::JoinHandle;
use tokio_uring::Submit;

pub const RESPONSE: &'static [u8] =
b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!";
Expand Down
44 changes: 19 additions & 25 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice};
use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use crate::runtime::driver::op::{Op, Submit};
use crate::{
UnsubmittedOneshot, UnsubmittedRead, UnsubmittedReadv, UnsubmittedWrite, UnsubmittedWritev,
};
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
Expand Down Expand Up @@ -32,6 +34,7 @@ use std::path::Path;
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -158,14 +161,15 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
/// let f = File::open("foo.txt").await?;
/// let buffer = vec![0; 10];
///
/// // Read up to 10 bytes
/// let (res, buffer) = f.read_at(buffer, 0).await;
/// let (res, buffer) = f.read_at(buffer, 0).submit().await;
/// let n = res?;
///
/// println!("The bytes: {:?}", &buffer[..n]);
Expand All @@ -176,10 +180,8 @@ impl File {
/// })
/// }
/// ```
pub async fn read_at<T: BoundedBufMut>(&self, buf: T, pos: u64) -> crate::BufResult<usize, T> {
// Submit the read operation
let op = Op::read_at(&self.fd, buf, pos).unwrap();
op.await
pub fn read_at<T: BoundedBufMut>(&self, buf: T, pos: u64) -> UnsubmittedRead<T> {
UnsubmittedOneshot::read_at(&self.fd, buf, pos)
}

/// Read some bytes at the specified offset from the file into the specified
Expand Down Expand Up @@ -216,7 +218,7 @@ impl File {
/// let buffers = vec![Vec::<u8>::with_capacity(10), Vec::<u8>::with_capacity(10)];
///
/// // Read up to 20 bytes
/// let (res, buffer) = f.readv_at(buffers, 0).await;
/// let (res, buffer) = f.readv_at(buffers, 0).submit().await;
/// let n = res?;
///
/// println!("Read {} bytes", n);
Expand All @@ -227,14 +229,8 @@ impl File {
/// })
/// }
/// ```
pub async fn readv_at<T: BoundedBufMut>(
&self,
bufs: Vec<T>,
pos: u64,
) -> crate::BufResult<usize, Vec<T>> {
// Submit the read operation
let op = Op::readv_at(&self.fd, bufs, pos).unwrap();
op.await
pub fn readv_at<T: BoundedBufMut>(&self, bufs: Vec<T>, pos: u64) -> UnsubmittedReadv<T> {
UnsubmittedOneshot::readv_at(&self.fd, bufs, pos)
}

/// Write data from buffers into this file at the specified offset,
Expand Down Expand Up @@ -271,7 +267,7 @@ impl File {
///
/// // Writes some prefix of the byte string, not necessarily all of it.
/// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()];
/// let (res, _) = file.writev_at(bufs, 0).await;
/// let (res, _) = file.writev_at(bufs, 0).submit().await;
/// let n = res?;
///
/// println!("wrote {} bytes", n);
Expand All @@ -284,13 +280,8 @@ impl File {
/// ```
///
/// [`Ok(n)`]: Ok
pub async fn writev_at<T: BoundedBuf>(
&self,
buf: Vec<T>,
pos: u64,
) -> crate::BufResult<usize, Vec<T>> {
let op = Op::writev_at(&self.fd, buf, pos).unwrap();
op.await
pub fn writev_at<T: BoundedBuf>(&self, bufs: Vec<T>, pos: u64) -> UnsubmittedWritev<T> {
UnsubmittedOneshot::writev_at(&self.fd, bufs, pos)
}

/// Like `writev_at` but will call the `io_uring` `writev` operation multiple times if
Expand Down Expand Up @@ -417,7 +408,7 @@ impl File {
}

while buf.bytes_total() != 0 {
let (res, slice) = self.read_at(buf, pos).await;
let (res, slice) = self.read_at(buf, pos).submit().await;
match res {
Ok(0) => {
return (
Expand Down Expand Up @@ -520,6 +511,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -769,6 +761,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -806,6 +799,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down
6 changes: 3 additions & 3 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ pub(crate) use noop::NoOp;

mod open;

mod read;
pub(crate) mod read;

mod read_fixed;

mod readv;
pub(crate) mod readv;

mod recv_from;

Expand Down Expand Up @@ -52,7 +52,7 @@ pub(crate) mod write;

mod write_fixed;

mod writev;
pub(crate) mod writev;

mod writev_all;
pub(crate) use writev_all::writev_at_all;
91 changes: 48 additions & 43 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,69 @@
use io_uring::cqueue::Entry;

use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;
use crate::{BufResult, OneshotOutputTransform, UnsubmittedOneshot};

use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::marker::PhantomData;

/// An unsubmitted read operation.
pub type UnsubmittedRead<T> = UnsubmittedOneshot<ReadData<T>, ReadTransform<T>>;

pub(crate) struct Read<T> {
#[allow(missing_docs)]
pub struct ReadData<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
_fd: SharedFd,

/// Reference to the in-flight buffer.
pub(crate) buf: T,
buf: T,
}

impl<T: BoundedBufMut> Op<Read<T>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result<Op<Read<T>>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
Read {
fd: fd.clone(),
buf,
},
|read| {
// Get raw buffer info
let ptr = read.buf.stable_mut_ptr();
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
},
)
})
}
#[allow(missing_docs)]
pub struct ReadTransform<T> {
_phantom: PhantomData<T>,
}

impl<T> Completable for Read<T>
impl<T> OneshotOutputTransform for ReadTransform<T>
where
T: BoundedBufMut,
{
type Output = BufResult<usize, T>;
type StoredData = ReadData<T>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let mut buf = self.buf;

// If the operation was successful, advance the initialized cursor.
if let Ok(n) = res {
fn transform_oneshot_output(self, mut data: Self::StoredData, cqe: Entry) -> Self::Output {
let n = cqe.result();
let res = if n >= 0 {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf.set_init(n);
}
}
unsafe { data.buf.set_init(n as usize) };
Ok(n as usize)
} else {
Err(io::Error::from_raw_os_error(-n))
};

(res, data.buf)
}
}

impl<T: BoundedBufMut> UnsubmittedRead<T> {
pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self {
use io_uring::{opcode, types};

// Get raw buffer info
let ptr = buf.stable_mut_ptr();
let len = buf.bytes_total();

(res, buf)
Self::new(
ReadData {
_fd: fd.clone(),
buf,
},
ReadTransform {
_phantom: PhantomData,
},
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build(),
)
}
}
Loading