From e4f97287bcb32d8be5dd445ff42468dfc902a700 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Fri, 13 May 2022 17:37:20 +0200 Subject: [PATCH] io-uring: add sendmsg --- src/client/tasks.rs | 11 +-- src/forker.rs | 8 +- src/forker/io.rs | 5 +- src/io_uring.rs | 18 +++- src/io_uring/ops.rs | 2 + src/io_uring/ops/sendmsg.rs | 130 ++++++++++++++++++++++++++++ src/io_uring/ops/timeout.rs | 54 ++++++++++++ src/io_uring/ops/write.rs | 32 +++---- src/it/test_transport.rs | 2 +- src/time.rs | 5 ++ src/tools/tool_client.rs | 2 +- src/utils.rs | 1 + src/utils/buf.rs | 155 +++++++++++++++++++++++++++++++++ src/utils/buffd/buf_out.rs | 160 +++++++++++++---------------------- src/utils/buffd/formatter.rs | 65 +++++++------- src/wheel.rs | 19 ++--- src/xwayland/xwm.rs | 15 ++-- 17 files changed, 493 insertions(+), 191 deletions(-) create mode 100644 src/io_uring/ops/sendmsg.rs create mode 100644 src/io_uring/ops/timeout.rs create mode 100644 src/utils/buf.rs diff --git a/src/client/tasks.rs b/src/client/tasks.rs index 7de23369..64e9f143 100644 --- a/src/client/tasks.rs +++ b/src/client/tasks.rs @@ -3,6 +3,7 @@ use { async_engine::Phase, client::{Client, ClientError}, object::ObjectId, + time::Time, utils::{ buffd::{BufFdIn, BufFdOut, MsgParser}, errorfmt::ErrorFmt, @@ -100,7 +101,7 @@ async fn receive(data: Rc) { async fn send(data: Rc) { let send = async { - let mut out = BufFdOut::new(&data.socket, &data.state.ring, &data.state.wheel); + let mut out = BufFdOut::new(&data.socket, &data.state.ring); let mut buffers = VecDeque::new(); loop { data.flush_request.triggered().await; @@ -109,9 +110,9 @@ async fn send(data: Rc) { swapchain.commit(); mem::swap(&mut swapchain.pending, &mut buffers); } - let mut timeout = None; + let timeout = Time::in_ms(5000).unwrap(); while let Some(mut cur) = buffers.pop_front() { - out.flush(&mut cur, &mut timeout).await?; + out.flush(&mut cur, timeout).await?; data.swapchain.borrow_mut().free.push(cur); } } @@ -122,9 +123,9 @@ async fn send(data: Rc) { log::info!("Client {} terminated the connection", data.id.0); } else { log::error!( - "An error occurred while sending data to client {}: {:#}", + "An error occurred while sending data to client {}: {}", data.id.0, - e + ErrorFmt(e) ); } } diff --git a/src/forker.rs b/src/forker.rs index b7c4455a..cb3cb4a9 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -15,7 +15,6 @@ use { buffd::BufFdError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, queue::AsyncQueue, }, - wheel::Wheel, xwayland, }, bincode::{ @@ -237,7 +236,7 @@ impl ForkerProxy { } async fn outgoing(self: Rc, state: Rc) { - let mut io = IoOut::new(&self.socket, &state.ring, &state.wheel); + let mut io = IoOut::new(&self.socket, &state.ring); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { @@ -301,7 +300,6 @@ struct Forker { socket: Rc, ae: Rc, ring: Rc, - wheel: Rc, fds: RefCell>>, outgoing: AsyncQueue, pending_spawns: CopyHashMap>, @@ -329,12 +327,10 @@ impl Forker { }); let ae = AsyncEngine::new(); let ring = IoUring::new(&ae, 32).unwrap(); - let wheel = Wheel::new(&ae, &ring).unwrap(); let forker = Rc::new(Forker { socket, ae: ae.clone(), ring: ring.clone(), - wheel, fds: RefCell::new(vec![]), outgoing: Default::default(), pending_spawns: Default::default(), @@ -346,7 +342,7 @@ impl Forker { } async fn outgoing(self: Rc) { - let mut io = IoOut::new(&self.socket, &self.ring, &self.wheel); + let mut io = IoOut::new(&self.socket, &self.ring); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { diff --git a/src/forker/io.rs b/src/forker/io.rs index a95fd54a..ffa6f142 100644 --- a/src/forker/io.rs +++ b/src/forker/io.rs @@ -11,7 +11,6 @@ use { buffd::{BufFdIn, BufFdOut}, vec_ext::VecExt, }, - wheel::Wheel, }, jay_config::_private::bincode_ops, uapi::OwnedFd, @@ -63,9 +62,9 @@ pub struct IoOut { } impl IoOut { - pub fn new(fd: &Rc, ring: &Rc, wheel: &Rc) -> Self { + pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { - outgoing: BufFdOut::new(fd, ring, wheel), + outgoing: BufFdOut::new(fd, ring), scratch: vec![], fds: vec![], } diff --git a/src/io_uring.rs b/src/io_uring.rs index a13ef4ae..595211f6 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -1,8 +1,12 @@ +pub use ops::TaskResultExt; use { crate::{ async_engine::AsyncEngine, io_uring::{ - ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask}, + ops::{ + async_cancel::AsyncCancelTask, poll::PollTask, sendmsg::SendmsgTask, + timeout::TimeoutTask, write::WriteTask, + }, pending_result::PendingResults, sys::{ io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, @@ -24,7 +28,7 @@ use { }, }, std::{ - cell::{Cell, UnsafeCell}, + cell::{Cell, RefCell, UnsafeCell}, mem::{self}, rc::Rc, sync::atomic::{ @@ -49,7 +53,6 @@ macro_rules! map_err { } }}; } -pub use ops::TaskResultExt; mod ops; mod pending_result; @@ -58,7 +61,7 @@ mod sys; #[derive(Debug, Error)] pub enum IoUringError { #[error(transparent)] - OsError(OsError), + OsError(#[from] OsError), #[error("Could not create an io-uring")] CreateUring(#[source] OsError), #[error("The kernel does not support the IORING_FEAT_NODROP feature")] @@ -201,6 +204,9 @@ impl IoUring { cached_writes: Default::default(), cached_cancels: Default::default(), cached_polls: Default::default(), + cached_sendmsg: Default::default(), + cached_timeouts: Default::default(), + fd_ids_scratch: Default::default(), }); Ok(Rc::new(Self { ring: data })) } @@ -248,6 +254,10 @@ struct IoUringData { cached_writes: Stack>, cached_cancels: Stack>, cached_polls: Stack>, + cached_sendmsg: Stack>, + cached_timeouts: Stack>, + + fd_ids_scratch: RefCell>, } unsafe trait Task { diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs index fa36b56a..f0786deb 100644 --- a/src/io_uring/ops.rs +++ b/src/io_uring/ops.rs @@ -2,6 +2,8 @@ use crate::{io_uring::IoUringError, utils::oserror::OsError}; pub mod async_cancel; pub mod poll; +pub mod sendmsg; +pub mod timeout; pub mod write; pub type TaskResult = Result, IoUringError>; diff --git a/src/io_uring/ops/sendmsg.rs b/src/io_uring/ops/sendmsg.rs new file mode 100644 index 00000000..0091752d --- /dev/null +++ b/src/io_uring/ops/sendmsg.rs @@ -0,0 +1,130 @@ +use { + crate::{ + io_uring::{ + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_SENDMSG, IOSQE_IO_LINK}, + IoUring, IoUringData, IoUringError, Task, + }, + time::Time, + utils::{buf::Buf, vec_ext::UninitVecExt}, + }, + std::{ + mem::{self, MaybeUninit}, + ptr, + rc::Rc, + }, + uapi::{c, OwnedFd}, +}; + +impl IoUring { + pub async fn sendmsg( + &self, + fd: &Rc, + buf: Buf, + fds: Vec>, + timeout: Option