From 9812a02f87c5ec6f7dfdbeb4d7cc6c5d80a8df2a Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Sat, 31 Dec 2022 17:55:58 +0100 Subject: [PATCH] io: use io_uring for all io There should no longer be any - read - write - connect - sendmsg - recvmsg - accept calls in the codebase. Previously we were using a mix of io_uring and these calls which had some negative effects: Since we were using the old system calls, we had to set the file descriptors to non-blocking. But our io_uring code did not handle EAGAIN. This lead to programs sometimes being killed when the wayland IO was actually blocking. Now all file descriptors are set to blocking, but io_uring makes it non-blocking from our perspective. The one exception are evdev files because they are read via libinput and libinput uses the old system calls. --- src/acceptor.rs | 38 ++-- src/backends/metal.rs | 2 +- src/backends/metal/monitor.rs | 6 +- src/backends/metal/video.rs | 18 +- src/cli/idle.rs | 9 +- src/cli/log.rs | 15 +- src/cli/quit.rs | 11 +- src/cli/screenshot.rs | 13 +- src/cli/seat_test.rs | 15 +- src/cli/set_log_level.rs | 13 +- src/cli/unlock.rs | 13 +- src/client.rs | 6 +- src/compositor.rs | 4 +- src/dbus.rs | 35 ++-- src/dbus/auth.rs | 77 ++++---- src/dbus/dynamic_type.rs | 7 +- src/dbus/formatter.rs | 7 +- src/dbus/holder.rs | 20 +-- src/dbus/incoming.rs | 5 +- src/dbus/socket.rs | 10 +- src/dbus/types.rs | 13 +- src/forker.rs | 29 ++- src/forker/io.rs | 14 +- src/io_uring.rs | 13 +- src/io_uring/ops.rs | 4 +- src/io_uring/ops/accept.rs | 67 +++++++ src/io_uring/ops/connect.rs | 77 ++++++++ src/io_uring/ops/poll.rs | 1 + src/io_uring/ops/{write.rs => read_write.rs} | 41 +++-- src/io_uring/ops/recvmsg.rs | 4 +- src/io_uring/ops/sendmsg.rs | 34 +++- src/it/testrun.rs | 17 +- src/pipewire/pw_con.rs | 22 ++- src/pipewire/pw_formatter.rs | 27 ++- src/pipewire/pw_ifs/pw_client_node.rs | 14 +- src/portal.rs | 43 +++-- src/portal/ptl_display.rs | 10 +- src/sighand.rs | 39 ++-- src/tools/tool_client.rs | 81 +++++---- src/user_session.rs | 12 +- src/utils/buf.rs | 177 ++++++++++++++++++- src/utils/buffd.rs | 8 +- src/utils/buffd/buf_in.rs | 4 +- src/utils/buffd/buf_out.rs | 73 ++------ src/utils/buffd/parser.rs | 2 +- src/utils/bufio.rs | 145 +++++---------- src/utils/timer.rs | 24 +-- src/video/drm.rs | 27 +-- src/wheel.rs | 33 ++-- src/wl_usr.rs | 16 +- src/xcon.rs | 59 ++++--- src/xcon/formatter.rs | 6 +- src/xcon/incoming.rs | 11 +- src/xwayland.rs | 66 +++---- src/xwayland/xwm.rs | 35 ++-- 55 files changed, 900 insertions(+), 672 deletions(-) create mode 100644 src/io_uring/ops/accept.rs create mode 100644 src/io_uring/ops/connect.rs rename src/io_uring/ops/{write.rs => read_write.rs} (60%) diff --git a/src/acceptor.rs b/src/acceptor.rs index dcb23281..f4ae3b21 100644 --- a/src/acceptor.rs +++ b/src/acceptor.rs @@ -113,11 +113,7 @@ fn allocate_socket() -> Result { }; let mut fds = [None, None]; for fd in &mut fds { - let socket = match uapi::socket( - c::AF_UNIX, - c::SOCK_STREAM | c::SOCK_NONBLOCK | c::SOCK_CLOEXEC, - 0, - ) { + let socket = match uapi::socket(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) { Ok(f) => Rc::new(f), Err(e) => return Err(AcceptorError::SocketFailed(e.into())), }; @@ -172,31 +168,17 @@ impl Acceptor { async fn accept(fd: Rc, state: Rc, secure: bool) { loop { - if let Err(e) = state.ring.readable(&fd).await { - log::error!( - "Could not wait for the acceptor to become readable: {}", - ErrorFmt(e) - ); - break; - } - loop { - let fd = match uapi::accept4( - fd.raw(), - uapi::sockaddr_none_mut(), - c::SOCK_NONBLOCK | c::SOCK_CLOEXEC, - ) { - Ok((fd, _)) => fd, - Err(Errno(c::EAGAIN)) => break, - Err(e) => { - log::error!("Could not accept a client: {}", ErrorFmt(OsError::from(e))); - break; - } - }; - let id = state.clients.id(); - if let Err(e) = state.clients.spawn(id, &state, fd, secure) { - log::error!("Could not spawn a client: {}", ErrorFmt(e)); + let fd = match state.ring.accept(&fd, c::SOCK_CLOEXEC).await { + Ok(fd) => fd, + Err(e) => { + log::error!("Could not accept a client: {}", ErrorFmt(e)); break; } + }; + let id = state.clients.id(); + if let Err(e) = state.clients.spawn(id, &state, fd, secure) { + log::error!("Could not spawn a client: {}", ErrorFmt(e)); + break; } } state.ring.stop(); diff --git a/src/backends/metal.rs b/src/backends/metal.rs index 6b0d666c..a1ebe46d 100644 --- a/src/backends/metal.rs +++ b/src/backends/metal.rs @@ -211,7 +211,7 @@ fn dup_fd(fd: c::c_int) -> Result, MetalError> { } pub async fn create(state: &Rc) -> Result, MetalError> { - let socket = match state.dbus.system() { + let socket = match state.dbus.system().await { Ok(s) => s, Err(e) => return Err(MetalError::DbusSystemSocket(e)), }; diff --git a/src/backends/metal/monitor.rs b/src/backends/metal/monitor.rs index 603f73c8..126ef85d 100644 --- a/src/backends/metal/monitor.rs +++ b/src/backends/metal/monitor.rs @@ -213,11 +213,7 @@ impl MetalBackend { return; } }; - if let Err(e) = set_nonblock(res.fd.raw()) { - log::error!("Could set drm fd to non-blocking: {}", ErrorFmt(e)); - return; - } - let master = Rc::new(DrmMaster::new(res.fd.clone())); + let master = Rc::new(DrmMaster::new(&slf.state.ring, res.fd.clone())); let dev = match slf.create_drm_device(dev, &master) { Ok(d) => d, Err(e) => { diff --git a/src/backends/metal/video.rs b/src/backends/metal/video.rs index 4a6f1391..d7ce00f8 100644 --- a/src/backends/metal/video.rs +++ b/src/backends/metal/video.rs @@ -1214,18 +1214,12 @@ impl MetalBackend { async fn handle_drm_events(self: Rc, dev: Rc) { loop { - if let Err(e) = self.state.ring.readable(dev.dev.master.fd()).await { - log::error!("Could not register the DRM fd for reading: {}", ErrorFmt(e)); - break; - } - loop { - match dev.dev.master.event() { - Ok(Some(e)) => self.handle_drm_event(e, &dev), - Ok(None) => break, - Err(e) => { - log::error!("Could not read DRM event: {}", ErrorFmt(e)); - return; - } + match dev.dev.master.event().await { + Ok(Some(e)) => self.handle_drm_event(e, &dev), + Ok(None) => break, + Err(e) => { + log::error!("Could not read DRM event: {}", ErrorFmt(e)); + return; } } } diff --git a/src/cli/idle.rs b/src/cli/idle.rs index 730d3cd4..cbd388e9 100644 --- a/src/cli/idle.rs +++ b/src/cli/idle.rs @@ -1,7 +1,7 @@ use { crate::{ cli::{GlobalArgs, IdleArgs, IdleCmd, IdleSetArgs}, - tools::tool_client::{Handle, ToolClient}, + tools::tool_client::{with_tool_client, Handle, ToolClient}, utils::{errorfmt::ErrorFmt, stack::Stack}, wire::{jay_compositor, jay_idle, JayIdleId, WlSurfaceId}, }, @@ -9,9 +9,10 @@ use { }; pub fn main(global: GlobalArgs, args: IdleArgs) { - let tc = ToolClient::new(global.log_level.into()); - let idle = Idle { tc: tc.clone() }; - tc.run(idle.run(args)); + with_tool_client(global.log_level.into(), |tc| async move { + let idle = Idle { tc: tc.clone() }; + idle.run(args).await; + }); } struct Idle { diff --git a/src/cli/log.rs b/src/cli/log.rs index 47bc807b..20a98a79 100644 --- a/src/cli/log.rs +++ b/src/cli/log.rs @@ -1,7 +1,7 @@ use { crate::{ cli::{GlobalArgs, LogArgs}, - tools::tool_client::{Handle, ToolClient}, + tools::tool_client::{with_tool_client, Handle, ToolClient}, utils::errorfmt::ErrorFmt, wire::{jay_compositor, jay_log_file}, }, @@ -18,13 +18,14 @@ use { }; pub fn main(global: GlobalArgs, args: LogArgs) { - let tc = ToolClient::new(global.log_level.into()); - let logger = Rc::new(Log { - tc: tc.clone(), - path: RefCell::new(None), - args, + with_tool_client(global.log_level.into(), |tc| async move { + let logger = Rc::new(Log { + tc: tc.clone(), + path: RefCell::new(None), + args, + }); + run(logger).await; }); - tc.run(run(logger)); } struct Log { diff --git a/src/cli/quit.rs b/src/cli/quit.rs index a5d55b84..053f2736 100644 --- a/src/cli/quit.rs +++ b/src/cli/quit.rs @@ -1,11 +1,16 @@ use { - crate::{cli::GlobalArgs, tools::tool_client::ToolClient, wire::jay_compositor::Quit}, + crate::{ + cli::GlobalArgs, + tools::tool_client::{with_tool_client, ToolClient}, + wire::jay_compositor::Quit, + }, std::rc::Rc, }; pub fn main(global: GlobalArgs) { - let tc = ToolClient::new(global.log_level.into()); - tc.run(run(tc.clone())); + with_tool_client(global.log_level.into(), |tc| async move { + run(tc).await; + }); } async fn run(tc: Rc) { diff --git a/src/cli/screenshot.rs b/src/cli/screenshot.rs index 0794ca70..b94b0da7 100644 --- a/src/cli/screenshot.rs +++ b/src/cli/screenshot.rs @@ -2,7 +2,7 @@ use { crate::{ cli::{GlobalArgs, ScreenshotArgs}, format::XRGB8888, - tools::tool_client::{Handle, ToolClient}, + tools::tool_client::{with_tool_client, Handle, ToolClient}, utils::{errorfmt::ErrorFmt, queue::AsyncQueue}, video::{ dmabuf::{DmaBuf, DmaBufPlane}, @@ -21,12 +21,13 @@ use { }; pub fn main(global: GlobalArgs, args: ScreenshotArgs) { - let tc = ToolClient::new(global.log_level.into()); - let screenshot = Rc::new(Screenshot { - tc: tc.clone(), - args, + with_tool_client(global.log_level.into(), |tc| async move { + let screenshot = Rc::new(Screenshot { + tc: tc.clone(), + args, + }); + run(screenshot).await; }); - tc.run(run(screenshot)); } struct Screenshot { diff --git a/src/cli/seat_test.rs b/src/cli/seat_test.rs index b391d48c..a58567f2 100644 --- a/src/cli/seat_test.rs +++ b/src/cli/seat_test.rs @@ -2,7 +2,7 @@ use { crate::{ cli::{GlobalArgs, SeatTestArgs}, ifs::wl_seat::wl_pointer::{PendingScroll, CONTINUOUS, FINGER, WHEEL}, - tools::tool_client::{Handle, ToolClient}, + tools::tool_client::{with_tool_client, Handle, ToolClient}, wire::{ jay_compositor::{GetSeats, Seat, SeatEvents}, jay_seat_events::{ @@ -16,13 +16,14 @@ use { }; pub fn main(global: GlobalArgs, args: SeatTestArgs) { - let tc = ToolClient::new(global.log_level.into()); - let screenshot = Rc::new(SeatTest { - tc: tc.clone(), - args, - names: Default::default(), + with_tool_client(global.log_level.into(), |tc| async move { + let screenshot = Rc::new(SeatTest { + tc: tc.clone(), + args, + names: Default::default(), + }); + run(screenshot).await; }); - tc.run(run(screenshot)); } struct SeatTest { diff --git a/src/cli/set_log_level.rs b/src/cli/set_log_level.rs index 3bc17aad..abbde8c4 100644 --- a/src/cli/set_log_level.rs +++ b/src/cli/set_log_level.rs @@ -1,19 +1,20 @@ use { crate::{ cli::{GlobalArgs, SetLogArgs}, - tools::tool_client::ToolClient, + tools::tool_client::{with_tool_client, ToolClient}, wire::jay_compositor::SetLogLevel, }, std::rc::Rc, }; pub fn main(global: GlobalArgs, args: SetLogArgs) { - let tc = ToolClient::new(global.log_level.into()); - let logger = Rc::new(Log { - tc: tc.clone(), - args, + with_tool_client(global.log_level.into(), |tc| async move { + let logger = Rc::new(Log { + tc: tc.clone(), + args, + }); + run(logger).await; }); - tc.run(run(logger)); } struct Log { diff --git a/src/cli/unlock.rs b/src/cli/unlock.rs index 9e5d2f59..aaec531d 100644 --- a/src/cli/unlock.rs +++ b/src/cli/unlock.rs @@ -1,12 +1,17 @@ use { - crate::{cli::GlobalArgs, tools::tool_client::ToolClient, wire::jay_compositor::Unlock}, + crate::{ + cli::GlobalArgs, + tools::tool_client::{with_tool_client, ToolClient}, + wire::jay_compositor::Unlock, + }, std::rc::Rc, }; pub fn main(global: GlobalArgs) { - let tc = ToolClient::new(global.log_level.into()); - let logger = Rc::new(Unlocker { tc: tc.clone() }); - tc.run(run(logger)); + with_tool_client(global.log_level.into(), |tc| async move { + let logger = Rc::new(Unlocker { tc: tc.clone() }); + run(logger).await; + }); } struct Unlocker { diff --git a/src/client.rs b/src/client.rs index a329c2a7..8999f11d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -95,7 +95,7 @@ impl Clients { &self, id: ClientId, global: &Rc, - socket: OwnedFd, + socket: Rc, secure: bool, ) -> Result<(), ClientError> { let (uid, pid) = { @@ -123,7 +123,7 @@ impl Clients { &self, id: ClientId, global: &Rc, - socket: OwnedFd, + socket: Rc, uid: c::uid_t, pid: c::pid_t, secure: bool, @@ -133,7 +133,7 @@ impl Clients { id, state: global.clone(), checking_queue_size: Cell::new(false), - socket: Rc::new(socket), + socket, objects: Objects::new(), swapchain: Default::default(), flush_request: Default::default(), diff --git a/src/compositor.rs b/src/compositor.rs index fd0a4da8..f0f7e7f5 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -234,10 +234,10 @@ async fn start_compositor3(state: Rc, test_future: Option) { if backend.import_environment() { if let Some(acc) = state.acceptor.get() { - import_environment(&state, WAYLAND_DISPLAY, acc.socket_name()); + import_environment(&state, WAYLAND_DISPLAY, acc.socket_name()).await; } for (key, val) in STATIC_VARS { - import_environment(&state, key, val); + import_environment(&state, key, val).await; } } diff --git a/src/dbus.rs b/src/dbus.rs index 65be51f2..9a40ff07 100644 --- a/src/dbus.rs +++ b/src/dbus.rs @@ -8,12 +8,14 @@ use { }, io_uring::{IoUring, IoUringError}, utils::{ + buf::DynamicBuf, bufio::{BufIo, BufIoError}, clonecell::CloneCell, copyhashmap::CopyHashMap, numcell::NumCell, oserror::OsError, run_toplevel::RunToplevel, + stack::Stack, vecstorage::VecStorage, xrd::{xrd, XRD}, }, @@ -89,11 +91,11 @@ pub enum DbusError { #[error("Could not create a socket")] Socket(#[source] OsError), #[error("Could not connect")] - Connect(#[source] OsError), + Connect(#[source] IoUringError), #[error("Could not write to the dbus socket")] - WriteError(#[source] OsError), + WriteError(#[source] IoUringError), #[error("Could not read from the dbus socket")] - ReadError(#[source] OsError), + ReadError(#[source] IoUringError), #[error("timeout")] IoUringError(#[source] Box), #[error("Server did not accept our authentication")] @@ -169,21 +171,25 @@ impl Dbus { self.session.clear(); } - pub fn system(&self) -> Result, DbusError> { - self.system.get( - &self.eng, - &self.ring, - "/var/run/dbus/system_bus_socket", - "System bus", - ) + pub async fn system(&self) -> Result, DbusError> { + self.system + .get( + &self.eng, + &self.ring, + "/var/run/dbus/system_bus_socket", + "System bus", + ) + .await } - pub fn session(&self) -> Result, DbusError> { + pub async fn session(&self) -> Result, DbusError> { let sba = match self.user_path.as_deref() { None => return Err(DbusError::SessionBusAddressNotSet), Some(sba) => sba, }; - self.session.get(&self.eng, &self.ring, sba, "Session bus") + self.session + .get(&self.eng, &self.ring, sba, "Session bus") + .await } } @@ -203,6 +209,7 @@ pub struct DbusSocket { bus_name: &'static str, fd: Rc, ring: Rc, + in_bufs: Stack>, bufio: Rc, eng: Rc, next_serial: NumCell, @@ -361,7 +368,7 @@ pub struct Parser<'a> { pub struct Formatter<'a> { fds: &'a mut Vec>, - buf: &'a mut Vec, + buf: &'a mut DynamicBuf, } pub unsafe trait Message<'a>: Sized + 'a { @@ -469,7 +476,7 @@ impl> Reply { impl> Drop for Reply { fn drop(&mut self) { - self.socket.bufio.add_buf(mem::take(&mut self.buf)); + self.socket.in_bufs.push(mem::take(&mut self.buf)); } } diff --git a/src/dbus/auth.rs b/src/dbus/auth.rs index e55397eb..56bdfc46 100644 --- a/src/dbus/auth.rs +++ b/src/dbus/auth.rs @@ -1,16 +1,15 @@ use { crate::{ dbus::{incoming::handle_incoming, outgoing::handle_outgoing, DbusError, DbusSocket}, - utils::{errorfmt::ErrorFmt, hex}, + utils::{buf::Buf, errorfmt::ErrorFmt, hex}, }, - std::{io::Write, rc::Rc}, - uapi::{c, Errno}, + std::{ops::Deref, rc::Rc}, }; pub(super) async fn handle_auth(socket: Rc) { let mut auth = Auth { socket: socket.clone(), - buf: Box::new([0; BUF_SIZE]), + buf: Buf::new(BUF_SIZE), buf_start: 0, buf_stop: 0, }; @@ -22,7 +21,7 @@ const BUF_SIZE: usize = 128; struct Auth { socket: Rc, - buf: Box<[u8; BUF_SIZE]>, + buf: Buf, buf_start: usize, buf_stop: usize, } @@ -50,61 +49,77 @@ impl Auth { async fn handle_auth(&mut self) -> Result<(), DbusError> { let uid = hex::to_hex(&uapi::getuid().to_string()); - let mut out_buf = Vec::new(); - let _ = write!(out_buf, "\0AUTH EXTERNAL {}\r\n", uid); - self.write_buf(&mut out_buf).await?; + let mut out_buf = Buf::new(128); + { + let buf = out_buf + .write_fmt(format_args!("\0AUTH EXTERNAL {}\r\n", uid)) + .unwrap(); + self.write_buf(buf).await?; + } let line = self.readline().await?; let (cmd, _) = line_to_cmd(&line); if cmd != "OK" { return Err(DbusError::Auth); } - let _ = write!(out_buf, "NEGOTIATE_UNIX_FD\r\n"); - self.write_buf(&mut out_buf).await?; + { + let buf = out_buf + .write_fmt(format_args!("NEGOTIATE_UNIX_FD\r\n")) + .unwrap(); + self.write_buf(buf).await?; + } let line = self.readline().await?; let (cmd, _) = line_to_cmd(&line); if cmd != "AGREE_UNIX_FD" { return Err(DbusError::UnixFd); } - let _ = write!(out_buf, "BEGIN\r\n"); - self.write_buf(&mut out_buf).await?; + { + let buf = out_buf.write_fmt(format_args!("BEGIN\r\n")).unwrap(); + self.write_buf(buf).await?; + } Ok(()) } async fn readline(&mut self) -> Result { let mut s = String::new(); loop { - for i in self.buf_start..self.buf_stop { - let c = self.buf[i % BUF_SIZE] as char; - s.push(c); - if c == '\n' { - self.buf_start = i + 1; - return Ok(s); + { + let buf = self.buf.deref(); + for i in self.buf_start..self.buf_stop { + let c = buf[i % BUF_SIZE] as char; + s.push(c); + if c == '\n' { + self.buf_start = i + 1; + return Ok(s); + } } } self.buf_start = 0; self.buf_stop = 0; - match uapi::read(self.socket.fd.raw(), &mut self.buf[..]) { - Ok(n) => self.buf_stop = n.len(), - Err(Errno(c::EAGAIN)) => { - self.socket.ring.readable(&self.socket.fd).await?; - } - Err(e) => return Err(DbusError::ReadError(e.into())), + let res = self + .socket + .ring + .read(&self.socket.fd, self.buf.clone()) + .await; + match res { + Ok(n) => self.buf_stop = n, + Err(e) => return Err(DbusError::ReadError(e)), } } } - async fn write_buf(&mut self, buf: &mut Vec) -> Result<(), DbusError> { + async fn write_buf(&mut self, mut buf: Buf) -> Result<(), DbusError> { let mut start = 0; while start < buf.len() { - match uapi::write(self.socket.fd.raw(), &buf[start..]) { + let res = self + .socket + .ring + .write(&self.socket.fd, buf.slice(start..), None) + .await; + match res { Ok(n) => start += n, - Err(Errno(c::EAGAIN)) => { - self.socket.ring.writable(&self.socket.fd).await?; - } - Err(e) => return Err(DbusError::WriteError(e.into())), + Err(e) => return Err(DbusError::WriteError(e)), } } - buf.clear(); Ok(()) } } diff --git a/src/dbus/dynamic_type.rs b/src/dbus/dynamic_type.rs index 1ef06cca..074586e2 100644 --- a/src/dbus/dynamic_type.rs +++ b/src/dbus/dynamic_type.rs @@ -3,7 +3,10 @@ use { TY_ARRAY, TY_BOOLEAN, TY_BYTE, TY_DOUBLE, TY_INT16, TY_INT32, TY_INT64, TY_OBJECT_PATH, TY_SIGNATURE, TY_STRING, TY_UINT16, TY_UINT32, TY_UINT64, TY_UNIX_FD, TY_VARIANT, }, - crate::dbus::{types::Variant, DbusError, DynamicType, Parser}, + crate::{ + dbus::{types::Variant, DbusError, DynamicType, Parser}, + utils::buf::DynamicBuf, + }, std::ops::Deref, }; @@ -88,7 +91,7 @@ impl DynamicType { } } - pub fn write_signature(&self, w: &mut Vec) { + pub fn write_signature(&self, w: &mut DynamicBuf) { let c = match self { DynamicType::U8 => TY_BYTE, DynamicType::Bool => TY_BOOLEAN, diff --git a/src/dbus/formatter.rs b/src/dbus/formatter.rs index e793f389..fe0789b6 100644 --- a/src/dbus/formatter.rs +++ b/src/dbus/formatter.rs @@ -1,11 +1,14 @@ use { - crate::dbus::{types::Variant, DbusType, Formatter}, + crate::{ + dbus::{types::Variant, DbusType, Formatter}, + utils::buf::DynamicBuf, + }, std::rc::Rc, uapi::{OwnedFd, Packed}, }; impl<'a> Formatter<'a> { - pub fn new(fds: &'a mut Vec>, buf: &'a mut Vec) -> Self { + pub fn new(fds: &'a mut Vec>, buf: &'a mut DynamicBuf) -> Self { Self { fds, buf } } diff --git a/src/dbus/holder.rs b/src/dbus/holder.rs index 36c5dc96..41d066a3 100644 --- a/src/dbus/holder.rs +++ b/src/dbus/holder.rs @@ -11,7 +11,7 @@ use { }; impl DbusHolder { - pub(super) fn get( + pub(super) async fn get( self: &Rc, eng: &Rc, ring: &Rc, @@ -25,39 +25,35 @@ impl DbusHolder { return Ok(c); } } - let socket = connect(eng, ring, addr, name, &self.run_toplevel)?; + let socket = connect(eng, ring, addr, name, &self.run_toplevel).await?; self.socket.set(Some(socket.clone())); Ok(socket) } } -fn connect( +async fn connect( eng: &Rc, ring: &Rc, addr: &str, name: &'static str, run_toplevel: &Rc, ) -> Result, DbusError> { - let socket = match uapi::socket( - c::AF_UNIX, - c::SOCK_STREAM | c::SOCK_NONBLOCK | c::SOCK_CLOEXEC, - 0, - ) { - Ok(s) => s, + let fd = match uapi::socket(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) { + Ok(s) => Rc::new(s), Err(e) => return Err(DbusError::Socket(e.into())), }; let mut sadr: c::sockaddr_un = uapi::pod_zeroed(); sadr.sun_family = c::AF_UNIX as _; let sun_path = uapi::as_bytes_mut(&mut sadr.sun_path[..]); sun_path[..addr.len()].copy_from_slice(addr.as_bytes()); - if let Err(e) = uapi::connect(socket.raw(), &sadr) { - return Err(DbusError::Connect(e.into())); + if let Err(e) = ring.connect(&fd, &sadr).await { + return Err(DbusError::Connect(e)); } - let fd = Rc::new(socket); let socket = Rc::new(DbusSocket { bus_name: name, fd: fd.clone(), ring: ring.clone(), + in_bufs: Default::default(), bufio: Rc::new(BufIo::new(&fd, ring)), eng: eng.clone(), next_serial: NumCell::new(1), diff --git a/src/dbus/incoming.rs b/src/dbus/incoming.rs index 4563ab5d..d04bff71 100644 --- a/src/dbus/incoming.rs +++ b/src/dbus/incoming.rs @@ -52,8 +52,9 @@ impl Incoming { } async fn handle_msg(&mut self) -> Result<(), DbusError> { - let msg_buf_data = UnsafeCell::new(self.socket.bufio.buf()); + let msg_buf_data = UnsafeCell::new(self.socket.in_bufs.pop().unwrap_or_default()); let msg_buf = unsafe { msg_buf_data.get().deref_mut() }; + msg_buf.clear(); const FIXED_HEADER_SIZE: usize = 16; self.incoming .fill_msg_buf(FIXED_HEADER_SIZE, msg_buf) @@ -235,7 +236,7 @@ impl Incoming { } let msg_buf = msg_buf_data.into_inner(); if msg_buf.capacity() > 0 { - self.socket.bufio.add_buf(msg_buf); + self.socket.in_bufs.push(msg_buf); } Ok(()) } diff --git a/src/dbus/socket.rs b/src/dbus/socket.rs index 8f7b430a..2867d3d1 100644 --- a/src/dbus/socket.rs +++ b/src/dbus/socket.rs @@ -404,7 +404,13 @@ impl DbusSocket { msg.marshal(&mut fmt); let body_len = (buf.len() - body_start) as u32; buf[4..8].copy_from_slice(uapi::as_bytes(&body_len)); - (BufIoMessage { fds, buf }, serial) + ( + BufIoMessage { + fds, + buf: buf.unwrap(), + }, + serial, + ) } fn format_header( @@ -489,7 +495,7 @@ where ) -> Result<(), DbusError> { let msg = as Message>::unmarshal(parser)?; (self.0)(Ok(&msg)); - socket.bufio.add_buf(buf); + socket.in_bufs.push(buf); Ok(()) } } diff --git a/src/dbus/types.rs b/src/dbus/types.rs index 73ef6bda..ed043e13 100644 --- a/src/dbus/types.rs +++ b/src/dbus/types.rs @@ -1,8 +1,11 @@ use { - crate::dbus::{ - DbusError, DbusType, DynamicType, Formatter, Parser, TY_ARRAY, TY_BOOLEAN, TY_BYTE, - TY_DOUBLE, TY_INT16, TY_INT32, TY_INT64, TY_OBJECT_PATH, TY_SIGNATURE, TY_STRING, - TY_UINT16, TY_UINT32, TY_UINT64, TY_UNIX_FD, TY_VARIANT, + crate::{ + dbus::{ + DbusError, DbusType, DynamicType, Formatter, Parser, TY_ARRAY, TY_BOOLEAN, TY_BYTE, + TY_DOUBLE, TY_INT16, TY_INT32, TY_INT64, TY_OBJECT_PATH, TY_SIGNATURE, TY_STRING, + TY_UINT16, TY_UINT32, TY_UINT64, TY_UNIX_FD, TY_VARIANT, + }, + utils::buf::DynamicBuf, }, std::{borrow::Cow, ops::Deref, rc::Rc}, uapi::{OwnedFd, Packed, Pod}, @@ -458,7 +461,7 @@ impl<'a> Variant<'a> { } } - pub fn write_signature(&self, w: &mut Vec) { + pub fn write_signature(&self, w: &mut DynamicBuf) { let c = match self { Variant::U8(..) => TY_BYTE, Variant::Bool(..) => TY_BOOLEAN, diff --git a/src/forker.rs b/src/forker.rs index 1433c3f1..82f505b9 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -50,7 +50,7 @@ pub struct ForkerProxy { } struct PidfdHandoff { - pidfd: Cell>>, + pidfd: Cell, c::pid_t), ForkerError>>>, waiter: Cell>, } @@ -81,14 +81,11 @@ impl ForkerProxy { } pub fn create() -> Result { - let (parent, child) = match uapi::socketpair( - c::AF_UNIX, - c::SOCK_STREAM | c::SOCK_CLOEXEC | c::SOCK_NONBLOCK, - 0, - ) { - Ok(o) => o, - Err(e) => return Err(ForkerError::Socketpair(e.into())), - }; + let (parent, child) = + match uapi::socketpair(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) { + Ok(o) => o, + Err(e) => return Err(ForkerError::Socketpair(e.into())), + }; let pid = uapi::getpid(); match fork_with_pidfd(false)? { Forked::Parent { pid, pidfd } => Ok(ForkerProxy { @@ -135,7 +132,7 @@ impl ForkerProxy { }) } - async fn pidfd(&self, id: u32) -> Result<(OwnedFd, c::pid_t), ForkerError> { + async fn pidfd(&self, id: u32) -> Result<(Rc, c::pid_t), ForkerError> { let handoff = Rc::new(PidfdHandoff { pidfd: Cell::new(None), waiter: Cell::new(None), @@ -159,7 +156,7 @@ impl ForkerProxy { listenfd: Rc, wmfd: Rc, waylandfd: Rc, - ) -> Result<(OwnedFd, c::pid_t), ForkerError> { + ) -> Result<(Rc, c::pid_t), ForkerError> { self.fds .borrow_mut() .extend([stderr, dfd, listenfd, wmfd, waylandfd]); @@ -394,10 +391,10 @@ impl Forker { fn handle_xwayland(self: &Rc, io: &mut IoIn, id: u32) { let stderr = io.pop_fd(); let fds = vec![ - io.pop_fd().unwrap(), - io.pop_fd().unwrap(), - io.pop_fd().unwrap(), - io.pop_fd().unwrap(), + Rc::try_unwrap(io.pop_fd().unwrap()).unwrap(), + Rc::try_unwrap(io.pop_fd().unwrap()).unwrap(), + Rc::try_unwrap(io.pop_fd().unwrap()).unwrap(), + Rc::try_unwrap(io.pop_fd().unwrap()).unwrap(), ]; let (prog, args) = xwayland::build_args(&fds); let env = vec![("WAYLAND_SOCKET".to_string(), fds[3].raw().to_string())]; @@ -424,7 +421,7 @@ impl Forker { prog: String, args: Vec, env: Vec<(String, String)>, - stderr: Option, + stderr: Option>, fds: Vec, pidfd_id: Option, ) { diff --git a/src/forker/io.rs b/src/forker/io.rs index ffa6f142..c4692dab 100644 --- a/src/forker/io.rs +++ b/src/forker/io.rs @@ -8,6 +8,7 @@ use { forker::ForkerError, io_uring::IoUring, utils::{ + buf::DynamicBuf, buffd::{BufFdIn, BufFdOut}, vec_ext::VecExt, }, @@ -29,7 +30,7 @@ impl IoIn { } } - pub fn pop_fd(&mut self) -> Option { + pub fn pop_fd(&mut self) -> Option> { self.incoming.get_fd().ok() } @@ -57,7 +58,7 @@ impl IoIn { pub struct IoOut { outgoing: BufFdOut, - scratch: Vec, + scratch: DynamicBuf, fds: Vec>, } @@ -65,7 +66,7 @@ impl IoOut { pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { outgoing: BufFdOut::new(fd, ring), - scratch: vec![], + scratch: DynamicBuf::new(), fds: vec![], } } @@ -83,7 +84,12 @@ impl IoOut { Err(e) => return Err(ForkerError::EncodeFailed(e)), }; self.scratch[..mem::size_of_val(&len)].copy_from_slice(uapi::as_bytes(&len)); - match self.outgoing.flush2(&self.scratch, &mut self.fds).await { + let mut buf = self.scratch.borrow(); + match self + .outgoing + .flush2(buf.buf.clone(), mem::take(&mut self.fds)) + .await + { Ok(()) => Ok(()), Err(e) => Err(ForkerError::WriteFailed(e)), } diff --git a/src/io_uring.rs b/src/io_uring.rs index 991e8925..550f4f71 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -4,8 +4,9 @@ use { async_engine::AsyncEngine, io_uring::{ ops::{ - async_cancel::AsyncCancelTask, poll::PollTask, recvmsg::RecvmsgTask, - sendmsg::SendmsgTask, timeout::TimeoutTask, write::WriteTask, + accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, + poll::PollTask, read_write::ReadWriteTask, recvmsg::RecvmsgTask, + sendmsg::SendmsgTask, timeout::TimeoutTask, }, pending_result::PendingResults, sys::{ @@ -204,13 +205,15 @@ impl IoUring { pending_in_kernel: Default::default(), tasks: Default::default(), pending_results: Default::default(), - cached_writes: Default::default(), + cached_read_writes: Default::default(), cached_cancels: Default::default(), cached_polls: Default::default(), cached_sendmsg: Default::default(), cached_recvmsg: Default::default(), cached_timeouts: Default::default(), cached_cmsg_bufs: Default::default(), + cached_connects: Default::default(), + cached_accepts: Default::default(), fd_ids_scratch: Default::default(), }); Ok(Rc::new(Self { ring: data })) @@ -257,13 +260,15 @@ struct IoUringData { pending_results: PendingResults, - cached_writes: Stack>, + cached_read_writes: Stack>, cached_cancels: Stack>, cached_polls: Stack>, cached_sendmsg: Stack>, cached_recvmsg: Stack>, cached_timeouts: Stack>, cached_cmsg_bufs: Stack, + cached_connects: Stack>, + cached_accepts: Stack>, fd_ids_scratch: RefCell>, } diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs index a3607664..ba6f79f1 100644 --- a/src/io_uring/ops.rs +++ b/src/io_uring/ops.rs @@ -1,11 +1,13 @@ use crate::{io_uring::IoUringError, utils::oserror::OsError}; +pub mod accept; pub mod async_cancel; +pub mod connect; pub mod poll; +pub mod read_write; pub mod recvmsg; pub mod sendmsg; pub mod timeout; -pub mod write; pub type TaskResult = Result, IoUringError>; diff --git a/src/io_uring/ops/accept.rs b/src/io_uring/ops/accept.rs new file mode 100644 index 00000000..6f751dcf --- /dev/null +++ b/src/io_uring/ops/accept.rs @@ -0,0 +1,67 @@ +use { + crate::io_uring::{ + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_ACCEPT}, + IoUring, IoUringData, IoUringError, Task, TaskResultExt, + }, + std::rc::Rc, + uapi::{c, OwnedFd}, +}; + +impl IoUring { + pub async fn accept( + &self, + fd: &Rc, + flags: c::c_int, + ) -> Result, IoUringError> { + self.ring.check_destroyed()?; + let id = self.ring.id(); + let pr = self.ring.pending_results.acquire(); + { + let mut pw = self.ring.cached_accepts.pop().unwrap_or_default(); + pw.id = id.id; + pw.fd = fd.raw() as _; + pw.flags = flags as _; + pw.data = Some(Data { + pr: pr.clone(), + _fd: fd.clone(), + }); + self.ring.schedule(pw); + } + Ok(pr.await.map(OwnedFd::new).map(Rc::new)).merge() + } +} + +struct Data { + pr: PendingResult, + _fd: Rc, +} + +#[derive(Default)] +pub struct AcceptTask { + id: u64, + fd: i32, + flags: u32, + data: Option, +} + +unsafe impl Task for AcceptTask { + fn id(&self) -> u64 { + self.id + } + + fn complete(mut self: Box, ring: &IoUringData, res: i32) { + if let Some(data) = self.data.take() { + data.pr.complete(res); + } + ring.cached_accepts.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + sqe.opcode = IORING_OP_ACCEPT; + sqe.fd = self.fd; + sqe.u2.addr = 0; + sqe.u1.addr2 = 0; + sqe.u3.accept_flags = self.flags; + } +} diff --git a/src/io_uring/ops/connect.rs b/src/io_uring/ops/connect.rs new file mode 100644 index 00000000..0aca6829 --- /dev/null +++ b/src/io_uring/ops/connect.rs @@ -0,0 +1,77 @@ +use { + crate::io_uring::{ + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_CONNECT}, + IoUring, IoUringData, IoUringError, Task, TaskResultExt, + }, + std::{mem, ptr, rc::Rc}, + uapi::{c, OwnedFd, SockAddr}, +}; + +impl IoUring { + pub async fn connect(&self, fd: &Rc, t: &T) -> Result<(), IoUringError> { + self.ring.check_destroyed()?; + let id = self.ring.id(); + let pr = self.ring.pending_results.acquire(); + { + let mut pw = self.ring.cached_connects.pop().unwrap_or_default(); + pw.id = id.id; + pw.fd = fd.raw() as _; + unsafe { + ptr::copy_nonoverlapping(t, &mut pw.sockaddr as *mut _ as *mut _, 1); + } + pw.addrlen = mem::size_of::() as _; + pw.data = Some(Data { + pr: pr.clone(), + _fd: fd.clone(), + }); + self.ring.schedule(pw); + } + Ok(pr.await.map(drop)).merge() + } +} + +struct Data { + pr: PendingResult, + _fd: Rc, +} + +pub struct ConnectTask { + id: u64, + fd: i32, + sockaddr: c::sockaddr_storage, + addrlen: u64, + data: Option, +} + +impl Default for ConnectTask { + fn default() -> Self { + Self { + id: 0, + fd: 0, + sockaddr: uapi::pod_zeroed(), + addrlen: 0, + data: None, + } + } +} + +unsafe impl Task for ConnectTask { + fn id(&self) -> u64 { + self.id + } + + fn complete(mut self: Box, ring: &IoUringData, res: i32) { + if let Some(data) = self.data.take() { + data.pr.complete(res); + } + ring.cached_connects.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + sqe.opcode = IORING_OP_CONNECT; + sqe.fd = self.fd; + sqe.u2.addr = &self.sockaddr as *const _ as _; + sqe.u1.off = self.addrlen; + } +} diff --git a/src/io_uring/ops/poll.rs b/src/io_uring/ops/poll.rs index c10f0893..d8b9ce40 100644 --- a/src/io_uring/ops/poll.rs +++ b/src/io_uring/ops/poll.rs @@ -32,6 +32,7 @@ impl IoUring { self.poll(fd, c::POLLIN).await.merge() } + #[allow(dead_code)] pub async fn writable(&self, fd: &Rc) -> Result { self.poll(fd, c::POLLOUT).await.merge() } diff --git a/src/io_uring/ops/write.rs b/src/io_uring/ops/read_write.rs similarity index 60% rename from src/io_uring/ops/write.rs rename to src/io_uring/ops/read_write.rs index 6453dc25..d8b8ba9f 100644 --- a/src/io_uring/ops/write.rs +++ b/src/io_uring/ops/read_write.rs @@ -1,10 +1,9 @@ use { crate::{ io_uring::{ - ops::TaskResult, pending_result::PendingResult, - sys::{io_uring_sqe, IORING_OP_WRITE}, - IoUring, IoUringData, Task, + sys::{io_uring_sqe, IORING_OP_READ, IORING_OP_WRITE}, + IoUring, IoUringData, IoUringError, Task, TaskResultExt, }, time::Time, utils::buf::Buf, @@ -14,23 +13,38 @@ use { }; impl IoUring { + pub async fn read(&self, fd: &Rc, buf: Buf) -> Result { + self.perform(fd, buf, None, IORING_OP_READ).await + } + pub async fn write( &self, fd: &Rc, buf: Buf, timeout: Option