diff --git a/src/backends/metal.rs b/src/backends/metal.rs index 154f396a..0d59e19e 100644 --- a/src/backends/metal.rs +++ b/src/backends/metal.rs @@ -4,7 +4,7 @@ mod video; use { crate::{ - async_engine::{AsyncError, AsyncFd, SpawnedFuture}, + async_engine::SpawnedFuture, backend::{ Backend, InputDevice, InputDeviceAccelProfile, InputDeviceCapability, InputDeviceId, InputEvent, KeyState, TransformMatrix, @@ -103,8 +103,6 @@ pub enum MetalError { CreateEncoder(#[source] DrmError), #[error(transparent)] DrmError(#[from] DrmError), - #[error("Could not create an async fd")] - CreateAsyncFd(#[source] AsyncError), #[error("Could not create device-paused signal handler")] DevicePauseSignalHandler(#[source] DbusError), #[error("Could not create device-resumed signal handler")] @@ -115,9 +113,9 @@ pub struct MetalBackend { state: Rc, udev: Rc, monitor: Rc, - monitor_fd: AsyncFd, + monitor_fd: Rc, libinput: Rc, - libinput_fd: AsyncFd, + libinput_fd: Rc, device_holder: Rc, session: Session, pause_handler: Cell>, @@ -204,12 +202,9 @@ impl Backend for MetalBackend { } } -fn dup_async_fd(state: &Rc, fd: c::c_int) -> Result { +fn dup_fd(fd: c::c_int) -> Result, MetalError> { match uapi::fcntl_dupfd_cloexec(fd, 0) { - Ok(m) => match state.eng.fd(&Rc::new(m)) { - Ok(fd) => Ok(fd), - Err(e) => Err(MetalError::CreateAsyncFd(e)), - }, + Ok(m) => Ok(Rc::new(m)), Err(e) => Err(MetalError::Dup(e.into())), } } @@ -238,8 +233,8 @@ pub async fn create(state: &Rc) -> Result, MetalError> { monitor.add_match_subsystem_devtype(Some("drm"), None)?; monitor.enable_receiving()?; let libinput = Rc::new(LibInput::new(device_holder.clone())?); - let monitor_fd = dup_async_fd(&state, monitor.fd())?; - let libinput_fd = dup_async_fd(&state, libinput.fd())?; + let monitor_fd = dup_fd(monitor.fd())?; + let libinput_fd = dup_fd(libinput.fd())?; let metal = Rc::new(MetalBackend { state: state.clone(), udev, diff --git a/src/backends/metal/input.rs b/src/backends/metal/input.rs index 9843e740..95014105 100644 --- a/src/backends/metal/input.rs +++ b/src/backends/metal/input.rs @@ -1,6 +1,5 @@ use { crate::{ - async_engine::FdStatus, backend::{AxisSource, InputEvent, KeyState, ScrollAxis}, backends::metal::MetalBackend, fixed::Fixed, @@ -12,9 +11,10 @@ use { }, event::LibInputEvent, }, - utils::errorfmt::ErrorFmt, + utils::{bitflags::BitflagsExt, errorfmt::ErrorFmt}, }, std::rc::Rc, + uapi::c, }; macro_rules! unpack { @@ -49,7 +49,7 @@ macro_rules! unpack { impl MetalBackend { pub async fn handle_libinput_events(self: Rc) { loop { - match self.libinput_fd.readable().await { + match self.state.ring.readable(&self.libinput_fd).await { Err(e) => { log::error!( "Cannot wait for libinput fd to become readable: {}", @@ -57,7 +57,7 @@ impl MetalBackend { ); break; } - Ok(FdStatus::Err) => { + Ok(n) if n.intersects(c::POLLERR | c::POLLHUP) => { log::error!("libinput fd fd is in an error state"); break; } diff --git a/src/backends/metal/monitor.rs b/src/backends/metal/monitor.rs index 598b644f..47d0dde8 100644 --- a/src/backends/metal/monitor.rs +++ b/src/backends/metal/monitor.rs @@ -1,6 +1,5 @@ use { crate::{ - async_engine::FdStatus, backend::BackendEvent, backends::metal::{ video::{MetalDrmDevice, PendingDrmDevice}, @@ -8,7 +7,7 @@ use { }, dbus::TRUE, udev::UdevDevice, - utils::{errorfmt::ErrorFmt, nonblock::set_nonblock}, + utils::{bitflags::BitflagsExt, errorfmt::ErrorFmt, nonblock::set_nonblock}, video::drm::DrmMaster, wire_dbus::org::freedesktop::login1::session::{PauseDevice, ResumeDevice}, }, @@ -33,7 +32,7 @@ fn is_primary_node(n: &[u8]) -> bool { impl MetalBackend { pub async fn monitor_devices(self: Rc) { loop { - match self.monitor_fd.readable().await { + match self.state.ring.readable(&self.monitor_fd).await { Err(e) => { log::error!( "Cannot wait for udev_monitor to become readable: {}", @@ -41,7 +40,7 @@ impl MetalBackend { ); break; } - Ok(FdStatus::Err) => { + Ok(n) if n.intersects(c::POLLERR | c::POLLHUP) => { log::error!("udev_monitor fd is in an error state"); break; } diff --git a/src/backends/metal/video.rs b/src/backends/metal/video.rs index 98394040..49a3a5c9 100644 --- a/src/backends/metal/video.rs +++ b/src/backends/metal/video.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncFd, Phase, SpawnedFuture}, + async_engine::{Phase, SpawnedFuture}, backend::{ BackendDrmDevice, BackendEvent, Connector, ConnectorEvent, ConnectorId, ConnectorKernelId, DrmDeviceId, MonitorInfo, @@ -64,7 +64,6 @@ pub struct MetalDrmDeviceStatic { pub min_height: u32, pub max_height: u32, pub gbm: GbmDevice, - pub async_fd: AsyncFd, pub handle_events: HandleEvents, } @@ -749,10 +748,6 @@ impl MetalBackend { Ok(g) => g, Err(e) => return Err(MetalError::GbmDevice(e)), }; - let async_fd = match self.state.eng.fd(master.fd()) { - Ok(f) => f, - Err(e) => return Err(MetalError::CreateAsyncFd(e)), - }; let dev = Rc::new(MetalDrmDeviceStatic { id: pending.id, @@ -767,7 +762,6 @@ impl MetalBackend { min_height: resources.min_height, max_height: resources.max_height, gbm, - async_fd, handle_events: HandleEvents { handle_events: Cell::new(None), }, @@ -883,7 +877,7 @@ impl MetalBackend { async fn handle_drm_events(self: Rc, dev: Rc) { loop { - if let Err(e) = dev.dev.async_fd.readable().await { + if let Err(e) = self.state.ring.readable(dev.dev.master.fd()).await { log::error!("Could not register the DRM fd for reading: {}", ErrorFmt(e)); break; } diff --git a/src/backends/x.rs b/src/backends/x.rs index e6e433fa..3f2300e8 100644 --- a/src/backends/x.rs +++ b/src/backends/x.rs @@ -116,7 +116,7 @@ pub enum XBackendError { } pub async fn create(state: &Rc) -> Result, XBackendError> { - let c = match Xcon::connect(state.eng.clone()).await { + let c = match Xcon::connect(state).await { Ok(c) => c, Err(e) => return Err(XBackendError::CannotConnect(e)), }; diff --git a/src/client.rs b/src/client.rs index 5ecd2550..d2ae6ce5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ pub use error::{ClientError, MethodError, ObjectError}; use { crate::{ - async_engine::{AsyncFd, SpawnedFuture}, + async_engine::SpawnedFuture, client::{error::LookupError, objects::Objects}, ifs::{wl_display::WlDisplay, wl_registry::WlRegistry}, leaks::Tracker, @@ -130,7 +130,7 @@ impl Clients { id, state: global.clone(), checking_queue_size: Cell::new(false), - socket: global.eng.fd(&Rc::new(socket))?, + socket: Rc::new(socket), objects: Objects::new(), swapchain: Default::default(), flush_request: Default::default(), @@ -236,7 +236,7 @@ pub struct Client { pub id: ClientId, pub state: Rc, checking_queue_size: Cell, - socket: AsyncFd, + socket: Rc, pub objects: Objects, swapchain: Rc>, flush_request: AsyncEvent, diff --git a/src/client/tasks.rs b/src/client/tasks.rs index 70481bf6..7de23369 100644 --- a/src/client/tasks.rs +++ b/src/client/tasks.rs @@ -37,7 +37,7 @@ pub async fn client(data: Rc) { async fn receive(data: Rc) { let display = data.display().unwrap(); let recv = async { - let mut buf = BufFdIn::new(data.socket.clone()); + let mut buf = BufFdIn::new(&data.socket, &data.state.ring); let mut data_buf = Vec::::new(); loop { let mut hdr = [0u32, 0]; @@ -100,7 +100,7 @@ async fn receive(data: Rc) { async fn send(data: Rc) { let send = async { - let mut out = BufFdOut::new(data.socket.clone(), &data.state.wheel); + let mut out = BufFdOut::new(&data.socket, &data.state.ring, &data.state.wheel); let mut buffers = VecDeque::new(); loop { data.flush_request.triggered().await; diff --git a/src/compositor.rs b/src/compositor.rs index 784bb3f7..51f76d3a 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -119,8 +119,8 @@ fn start_compositor2( let xkb_ctx = XkbContext::new().unwrap(); let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap(); let engine = AsyncEngine::install(&el)?; - let wheel = Wheel::new(&engine)?; - let io_uring = IoUring::new(&engine, 32)?; + let ring = IoUring::new(&engine, 32)?; + let wheel = Wheel::new(&engine, &ring)?; let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine); let node_ids = NodeIds::default(); let state = Rc::new(State { @@ -156,7 +156,7 @@ fn start_compositor2( pending_container_render_data: Default::default(), pending_float_layout: Default::default(), pending_float_titles: Default::default(), - dbus: Dbus::new(&engine, &run_toplevel), + dbus: Dbus::new(&engine, &ring, &run_toplevel), fdcloser: FdCloser::new(), logger, connectors: Default::default(), @@ -186,7 +186,7 @@ fn start_compositor2( tracker: Default::default(), data_offer_ids: Default::default(), drm_dev_ids: Default::default(), - io_uring, + ring, }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); diff --git a/src/dbus.rs b/src/dbus.rs index c0bd0eb4..f32c14b6 100644 --- a/src/dbus.rs +++ b/src/dbus.rs @@ -1,16 +1,18 @@ pub use types::*; use { crate::{ - async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture}, + async_engine::{AsyncEngine, SpawnedFuture}, dbus::{ property::GetReply, types::{ObjectPath, Signature, Variant}, }, + io_uring::{IoUring, IoUringError}, utils::{ bufio::{BufIo, BufIoError}, clonecell::CloneCell, copyhashmap::CopyHashMap, numcell::NumCell, + oserror::OsError, run_toplevel::RunToplevel, vecstorage::VecStorage, xrd::{xrd, XRD}, @@ -78,15 +80,15 @@ pub enum DbusError { #[error("Variant has an invalid type")] InvalidVariantType, #[error("Could not create a socket")] - Socket(#[source] crate::utils::oserror::OsError), + Socket(#[source] OsError), #[error("Could not connect")] - Connect(#[source] crate::utils::oserror::OsError), + Connect(#[source] OsError), #[error("Could not write to the dbus socket")] - WriteError(#[source] crate::utils::oserror::OsError), + WriteError(#[source] OsError), #[error("Could not read from the dbus socket")] - ReadError(#[source] crate::utils::oserror::OsError), + ReadError(#[source] OsError), #[error("timeout")] - AsyncError(#[source] Box), + IoUringError(#[source] Box), #[error("Server did not accept our authentication")] Auth, #[error("Array length is not a multiple of the element size")] @@ -126,17 +128,18 @@ pub enum DbusError { #[error(transparent)] DbusError(Rc), } -efrom!(DbusError, AsyncError); +efrom!(DbusError, IoUringError); pub struct Dbus { eng: Rc, + ring: Rc, system: Rc, session: Rc, user_path: Option, } impl Dbus { - pub fn new(eng: &Rc, run_toplevel: &Rc) -> Self { + pub fn new(eng: &Rc, ring: &Rc, run_toplevel: &Rc) -> Self { let user_path = match xrd() { Some(path) => Some(format!("{}/bus", path)), _ => { @@ -147,6 +150,7 @@ impl Dbus { log::info!("dbus path = {:?}", user_path); Self { eng: eng.clone(), + ring: ring.clone(), system: Rc::new(DbusHolder::new(run_toplevel)), session: Rc::new(DbusHolder::new(run_toplevel)), user_path, @@ -159,8 +163,12 @@ impl Dbus { } pub fn system(&self) -> Result, DbusError> { - self.system - .get(&self.eng, "/var/run/dbus/system_bus_socket", "System bus") + self.system.get( + &self.eng, + &self.ring, + "/var/run/dbus/system_bus_socket", + "System bus", + ) } pub fn session(&self) -> Result, DbusError> { @@ -168,7 +176,7 @@ impl Dbus { None => return Err(DbusError::SessionBusAddressNotSet), Some(sba) => sba, }; - self.session.get(&self.eng, sba, "Session bus") + self.session.get(&self.eng, &self.ring, sba, "Session bus") } } @@ -186,7 +194,8 @@ unsafe trait ReplyHandler { pub struct DbusSocket { bus_name: &'static str, - fd: AsyncFd, + fd: Rc, + ring: Rc, bufio: Rc, eng: Rc, next_serial: NumCell, diff --git a/src/dbus/auth.rs b/src/dbus/auth.rs index 9eb41b2e..e55397eb 100644 --- a/src/dbus/auth.rs +++ b/src/dbus/auth.rs @@ -86,7 +86,7 @@ impl Auth { match uapi::read(self.socket.fd.raw(), &mut self.buf[..]) { Ok(n) => self.buf_stop = n.len(), Err(Errno(c::EAGAIN)) => { - self.socket.fd.readable().await?; + self.socket.ring.readable(&self.socket.fd).await?; } Err(e) => return Err(DbusError::ReadError(e.into())), } @@ -99,7 +99,7 @@ impl Auth { match uapi::write(self.socket.fd.raw(), &buf[start..]) { Ok(n) => start += n, Err(Errno(c::EAGAIN)) => { - self.socket.fd.writable().await?; + self.socket.ring.writable(&self.socket.fd).await?; } Err(e) => return Err(DbusError::WriteError(e.into())), } diff --git a/src/dbus/holder.rs b/src/dbus/holder.rs index ae1060cb..6297edf7 100644 --- a/src/dbus/holder.rs +++ b/src/dbus/holder.rs @@ -2,6 +2,7 @@ use { crate::{ async_engine::AsyncEngine, dbus::{auth::handle_auth, DbusError, DbusHolder, DbusSocket}, + io_uring::IoUring, utils::{bufio::BufIo, errorfmt::ErrorFmt, numcell::NumCell, run_toplevel::RunToplevel}, wire_dbus::org, }, @@ -13,6 +14,7 @@ impl DbusHolder { pub(super) fn get( self: &Rc, eng: &Rc, + ring: &Rc, addr: &str, name: &'static str, ) -> Result, DbusError> { @@ -23,7 +25,7 @@ impl DbusHolder { return Ok(c); } } - let socket = connect(eng, addr, name, &self.run_toplevel)?; + let socket = connect(eng, ring, addr, name, &self.run_toplevel)?; self.socket.set(Some(socket.clone())); Ok(socket) } @@ -31,6 +33,7 @@ impl DbusHolder { fn connect( eng: &Rc, + ring: &Rc, addr: &str, name: &'static str, run_toplevel: &Rc, @@ -50,11 +53,12 @@ fn connect( if let Err(e) = uapi::connect(socket.raw(), &sadr) { return Err(DbusError::Connect(e.into())); } - let fd = eng.fd(&Rc::new(socket))?; + let fd = Rc::new(socket); let socket = Rc::new(DbusSocket { bus_name: name, fd: fd.clone(), - bufio: Rc::new(BufIo::new(fd)), + ring: ring.clone(), + bufio: Rc::new(BufIo::new(&fd, ring)), eng: eng.clone(), next_serial: NumCell::new(1), unique_name: Default::default(), diff --git a/src/forker.rs b/src/forker.rs index 7611ae45..abf38a7f 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -3,13 +3,14 @@ mod io; use { crate::{ - async_engine::{AsyncEngine, AsyncFd, SpawnedFuture}, + async_engine::{AsyncEngine, SpawnedFuture}, compositor::{DISPLAY, WAYLAND_DISPLAY}, event_loop::EventLoop, forker::{ clone3::{fork_with_pidfd, Forked}, io::{IoIn, IoOut}, }, + io_uring::IoUring, state::State, utils::{ buffd::BufFdError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, @@ -112,17 +113,13 @@ impl ForkerProxy { pub fn install(self: &Rc, state: &Rc) { state.forker.set(Some(self.clone())); - let socket = state.eng.fd(&self.socket).unwrap(); self.task_proc.set(Some( state.eng.spawn(self.clone().check_process(state.clone())), )); self.task_in - .set(Some(state.eng.spawn(self.clone().incoming(socket.clone())))); - self.task_out.set(Some( - state - .eng - .spawn(self.clone().outgoing(state.clone(), socket.clone())), - )); + .set(Some(state.eng.spawn(self.clone().incoming(state.clone())))); + self.task_out + .set(Some(state.eng.spawn(self.clone().outgoing(state.clone())))); } pub fn setenv(&self, key: &[u8], val: &[u8]) { @@ -191,8 +188,8 @@ impl ForkerProxy { }) } - async fn incoming(self: Rc, socket: AsyncFd) { - let mut io = IoIn::new(socket); + async fn incoming(self: Rc, state: Rc) { + let mut io = IoIn::new(&self.socket, &state.ring); loop { let msg = match io.read_msg().await { Ok(msg) => msg, @@ -240,8 +237,8 @@ impl ForkerProxy { log::log!(level, "{}", msg); } - async fn outgoing(self: Rc, state: Rc, socket: AsyncFd) { - let mut io = IoOut::new(socket, &state.wheel); + async fn outgoing(self: Rc, state: Rc) { + let mut io = IoOut::new(&self.socket, &state.ring, &state.wheel); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { @@ -257,8 +254,7 @@ impl ForkerProxy { } async fn check_process(self: Rc, state: Rc) { - let pidfd = state.eng.fd(&self.pidfd).unwrap(); - if let Err(e) = pidfd.readable().await { + if let Err(e) = state.ring.readable(&self.pidfd).await { log::error!( "Cannot wait for the forker pidfd to become readable: {}", ErrorFmt(e) @@ -303,8 +299,9 @@ enum ForkerMessage { } struct Forker { - socket: AsyncFd, + socket: Rc, ae: Rc, + ring: Rc, wheel: Rc, fds: RefCell>>, outgoing: AsyncQueue, @@ -333,10 +330,12 @@ impl Forker { }); let el = EventLoop::new().unwrap(); let ae = AsyncEngine::install(&el).unwrap(); - let wheel = Wheel::new(&ae).unwrap(); + let ring = IoUring::new(&ae, 32).unwrap(); + let wheel = Wheel::new(&ae, &ring).unwrap(); let forker = Rc::new(Forker { - socket: ae.fd(&socket).unwrap(), + socket, ae: ae.clone(), + ring, wheel, fds: RefCell::new(vec![]), outgoing: Default::default(), @@ -349,7 +348,7 @@ impl Forker { } async fn outgoing(self: Rc) { - let mut io = IoOut::new(self.socket.clone(), &self.wheel); + let mut io = IoOut::new(&self.socket, &self.ring, &self.wheel); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { @@ -360,7 +359,7 @@ impl Forker { } async fn incoming(self: Rc) { - let mut io = IoIn::new(self.socket.clone()); + let mut io = IoIn::new(&self.socket, &self.ring); loop { let msg = io.read_msg().await.unwrap(); self.handle_msg(msg, &mut io); @@ -456,8 +455,8 @@ impl Forker { drop(write); let slf = self.clone(); let spawn = self.ae.spawn(async move { - let read = slf.ae.fd(&Rc::new(read)).unwrap(); - if let Err(e) = read.readable().await { + let read = Rc::new(read); + if let Err(e) = slf.ring.readable(&read).await { log::error!( "Cannot wait for the child fd to become readable: {}", ErrorFmt(e) diff --git a/src/forker/io.rs b/src/forker/io.rs index 531ff93b..a95fd54a 100644 --- a/src/forker/io.rs +++ b/src/forker/io.rs @@ -5,8 +5,8 @@ use { use { crate::{ - async_engine::AsyncFd, forker::ForkerError, + io_uring::IoUring, utils::{ buffd::{BufFdIn, BufFdOut}, vec_ext::VecExt, @@ -23,9 +23,9 @@ pub struct IoIn { } impl IoIn { - pub fn new(fd: AsyncFd) -> Self { + pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { - incoming: BufFdIn::new(fd), + incoming: BufFdIn::new(fd, ring), scratch: vec![], } } @@ -63,9 +63,9 @@ pub struct IoOut { } impl IoOut { - pub fn new(fd: AsyncFd, wheel: &Rc) -> Self { + pub fn new(fd: &Rc, ring: &Rc, wheel: &Rc) -> Self { Self { - outgoing: BufFdOut::new(fd, wheel), + outgoing: BufFdOut::new(fd, ring, wheel), scratch: vec![], fds: vec![], } diff --git a/src/io_uring.rs b/src/io_uring.rs index c8e63dec..f1c07957 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -2,7 +2,7 @@ use { crate::{ async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture}, io_uring::{ - ops::{async_cancel::AsyncCancelTask, write::WriteTask}, + ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask}, pending_result::PendingResults, sys::{ io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, @@ -46,6 +46,7 @@ macro_rules! map_err { } }}; } +pub use ops::TaskResultExt; mod ops; mod pending_result; @@ -82,7 +83,7 @@ impl Drop for IoUring { } impl IoUring { - pub fn new(eng: &Rc, entries: u32) -> Result { + pub fn new(eng: &Rc, entries: u32) -> Result, IoUringError> { let mut params = io_uring_params::default(); let fd = match io_uring_setup(entries, &mut params) { Ok(f) => Rc::new(f), @@ -196,6 +197,7 @@ impl IoUring { pending_results: Default::default(), cached_writes: Default::default(), cached_cancels: Default::default(), + cached_polls: Default::default(), reader: Cell::new(None), submitter: Cell::new(None), }); @@ -203,7 +205,7 @@ impl IoUring { let reader = eng.spawn(data.clone().reader()); data.reader.set(Some(reader)); data.submitter.set(Some(submitter)); - Ok(Self { ring: data }) + Ok(Rc::new(Self { ring: data })) } } @@ -237,6 +239,7 @@ struct IoUringData { pending_results: PendingResults, cached_writes: Stack>, cached_cancels: Stack>, + cached_polls: Stack>, reader: Cell>>, submitter: Cell>>, @@ -313,6 +316,7 @@ impl IoUringData { let idx = (tail & self.sqmask) as usize; let mut sqe = self.sqesmap.deref()[idx].get().deref_mut(); self.sqmap.deref()[idx].set(idx as _); + *sqe = Default::default(); sqe.user_data = id; task.encode(sqe); tail = tail.wrapping_add(1); diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs index 2c3d89f0..fa36b56a 100644 --- a/src/io_uring/ops.rs +++ b/src/io_uring/ops.rs @@ -1,2 +1,21 @@ +use crate::{io_uring::IoUringError, utils::oserror::OsError}; + pub mod async_cancel; +pub mod poll; pub mod write; + +pub type TaskResult = Result, IoUringError>; + +pub trait TaskResultExt { + fn merge(self) -> Result; +} + +impl TaskResultExt for TaskResult { + fn merge(self) -> Result { + match self { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(IoUringError::OsError(e)), + Err(e) => Err(e), + } + } +} diff --git a/src/io_uring/ops/poll.rs b/src/io_uring/ops/poll.rs new file mode 100644 index 00000000..e6f266a3 --- /dev/null +++ b/src/io_uring/ops/poll.rs @@ -0,0 +1,74 @@ +use { + crate::io_uring::{ + ops::TaskResult, + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_POLL_ADD}, + IoUring, IoUringData, IoUringError, Task, TaskResultExt, + }, + std::{ + cell::{Cell, RefCell}, + rc::Rc, + }, + uapi::{c, OwnedFd}, +}; + +impl IoUring { + pub async fn poll(&self, fd: &Rc, events: c::c_short) -> TaskResult { + self.ring.check_destroyed()?; + let id = self.ring.id(); + let pr = self.ring.pending_results.acquire(); + { + let pw = self.ring.cached_polls.pop().unwrap_or_default(); + pw.id.set(id.id); + *pw.data.borrow_mut() = Some(Data { + pr: pr.clone(), + fd: fd.clone(), + events: events as _, + }); + self.ring.schedule(pw); + } + Ok(pr.await.map(|v| v as c::c_short)) + } + + pub async fn readable(&self, fd: &Rc) -> Result { + self.poll(fd, c::POLLIN).await.merge() + } + + pub async fn writable(&self, fd: &Rc) -> Result { + self.poll(fd, c::POLLOUT).await.merge() + } +} + +struct Data { + pr: PendingResult, + fd: Rc, + events: u16, +} + +#[derive(Default)] +pub struct PollTask { + id: Cell, + data: RefCell>, +} + +unsafe impl Task for PollTask { + fn id(&self) -> u64 { + self.id.get() + } + + fn complete(self: Box, ring: &IoUringData, res: i32) { + let data = self.data.borrow_mut().take(); + if let Some(data) = data { + data.pr.complete(res); + } + ring.cached_polls.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + let data = self.data.borrow_mut(); + let data = data.as_ref().unwrap(); + sqe.opcode = IORING_OP_POLL_ADD; + sqe.fd = data.fd.raw(); + sqe.u3.poll_events = data.events; + } +} diff --git a/src/io_uring/ops/write.rs b/src/io_uring/ops/write.rs index 6165cacf..98625d48 100644 --- a/src/io_uring/ops/write.rs +++ b/src/io_uring/ops/write.rs @@ -1,8 +1,9 @@ use { crate::io_uring::{ + ops::TaskResult, pending_result::PendingResult, sys::{io_uring_sqe, IORING_OP_WRITE}, - IoUring, IoUringData, IoUringError, Task, + IoUring, IoUringData, Task, }, std::{ cell::{Cell, RefCell}, @@ -19,7 +20,7 @@ impl IoUring { buf: &Rc>, offset: usize, n: usize, - ) -> Result { + ) -> TaskResult { self.ring.check_destroyed()?; let id = self.ring.id(); let pr = self.ring.pending_results.acquire(); @@ -39,7 +40,7 @@ impl IoUring { }); self.ring.schedule(pw); } - Ok(pr.await? as usize) + Ok(pr.await.map(|v| v as usize)) } } diff --git a/src/io_uring/pending_result.rs b/src/io_uring/pending_result.rs index a323aad3..544c182e 100644 --- a/src/io_uring/pending_result.rs +++ b/src/io_uring/pending_result.rs @@ -1,8 +1,5 @@ use { - crate::{ - io_uring::IoUringError, - utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack}, - }, + crate::utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack}, std::{ cell::Cell, future::Future, @@ -104,13 +101,13 @@ impl Clone for PendingResult { } impl Future for PendingResult { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let pr = unsafe { self.pr.deref() }; if let Some(res) = pr.res.take() { let res = if res < 0 { - Err(IoUringError::OsError(OsError::from(-res as c::c_int))) + Err(OsError::from(-res as c::c_int)) } else { Ok(res) }; diff --git a/src/io_uring/sys.rs b/src/io_uring/sys.rs index cf97e087..f1a184ba 100644 --- a/src/io_uring/sys.rs +++ b/src/io_uring/sys.rs @@ -2,6 +2,7 @@ use { crate::utils::oserror::OsError, + std::mem::MaybeUninit, uapi::{c, OwnedFd}, }; @@ -23,6 +24,12 @@ pub struct io_uring_sqe { pub __pad2: [u64; 2], } +impl Default for io_uring_sqe { + fn default() -> Self { + unsafe { MaybeUninit::zeroed().assume_init() } + } +} + #[repr(C)] #[derive(Copy, Clone)] pub union io_uring_sqe_union1 { diff --git a/src/it/test_transport.rs b/src/it/test_transport.rs index e90429d3..77883677 100644 --- a/src/it/test_transport.rs +++ b/src/it/test_transport.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncFd, SpawnedFuture}, + async_engine::SpawnedFuture, client::{Client, ClientId, EventFormatter}, it::{ test_error::{StdError, TestError}, @@ -28,11 +28,12 @@ use { rc::Rc, task::Poll, }, + uapi::OwnedFd, }; pub struct TestTransport { pub run: Rc, - pub fd: AsyncFd, + pub socket: Rc, pub client_id: Cell, pub bufs: Stack>, pub swapchain: Rc>, @@ -132,7 +133,7 @@ impl TestTransport { self.run.state.eng.spawn( Incoming { tc: self.clone(), - buf: BufFdIn::new(self.fd.clone()), + buf: BufFdIn::new(&self.socket, &self.run.state.ring), } .run(), ), @@ -141,7 +142,7 @@ impl TestTransport { self.run.state.eng.spawn( Outgoing { tc: self.clone(), - buf: BufFdOut::new(self.fd.clone(), &self.run.state.wheel), + buf: BufFdOut::new(&self.socket, &self.run.state.ring, &self.run.state.wheel), buffers: Default::default(), } .run(), diff --git a/src/it/testrun.rs b/src/it/testrun.rs index 4d33ede3..6118cfa0 100644 --- a/src/it/testrun.rs +++ b/src/it/testrun.rs @@ -51,17 +51,12 @@ impl TestRun { uapi::connect(socket.raw(), &self.server_addr) .to_os_error() .with_context(|| "Could not connect to the compositor")?; - let fd = self - .state - .eng - .fd(&socket) - .with_context(|| "Could not create an async fd")?; let mut obj_ids = Bitfield::default(); obj_ids.take(0); obj_ids.take(1); let tran = Rc::new(TestTransport { run: self.clone(), - fd, + socket, client_id: Cell::new(ClientId::from_raw(0)), bufs: Default::default(), swapchain: Default::default(), diff --git a/src/state.rs b/src/state.rs index 16f360b3..d7eb5026 100644 --- a/src/state.rs +++ b/src/state.rs @@ -109,7 +109,7 @@ pub struct State { pub config_file_id: NumCell, pub tracker: Tracker, pub data_offer_ids: NumCell, - pub io_uring: IoUring, + pub ring: Rc, } // impl Drop for State { diff --git a/src/tools/tool_client.rs b/src/tools/tool_client.rs index 09a779d7..68883332 100644 --- a/src/tools/tool_client.rs +++ b/src/tools/tool_client.rs @@ -4,6 +4,7 @@ use { client::{EventFormatter, RequestParser}, compositor::WAYLAND_DISPLAY, event_loop::{EventLoop, EventLoopError}, + io_uring::{IoUring, IoUringError}, logger::Logger, object::{ObjectId, WL_DISPLAY_ID}, utils::{ @@ -49,6 +50,8 @@ pub enum ToolClientError { CreateWheel(#[source] WheelError), #[error("Could not create an async engine")] CreateEngine(#[source] AsyncError), + #[error("Could not create an io-uring")] + CreateRing(#[source] IoUringError), #[error("XDG_RUNTIME_DIR is not set")] XrdNotSet, #[error("WAYLAND_DISPLAY is not set")] @@ -59,8 +62,6 @@ pub enum ToolClientError { SocketPathTooLong, #[error("Could not connect to the compositor")] Connect(#[source] OsError), - #[error("Could not create an async fd")] - AsyncFd(#[source] AsyncError), #[error("The message length is smaller than 8 bytes")] MsgLenTooSmall, #[error("The size of the message is not a multiple of 4")] @@ -78,6 +79,7 @@ pub enum ToolClientError { pub struct ToolClient { pub logger: Arc, pub el: Rc, + pub ring: Rc, pub wheel: Rc, pub eng: Rc, obj_ids: RefCell, @@ -131,7 +133,11 @@ impl ToolClient { Ok(e) => e, Err(e) => return Err(ToolClientError::CreateEngine(e)), }; - let wheel = match Wheel::new(&eng) { + let ring = match IoUring::new(&eng, 32) { + Ok(e) => e, + Err(e) => return Err(ToolClientError::CreateRing(e)), + }; + let wheel = match Wheel::new(&eng, &ring) { Ok(w) => w, Err(e) => return Err(ToolClientError::CreateWheel(e)), }; @@ -163,16 +169,13 @@ impl ToolClient { if let Err(e) = uapi::connect(socket.raw(), &addr) { return Err(ToolClientError::Connect(e.into())); } - let fd = match eng.fd(&socket) { - Ok(fd) => fd, - Err(e) => return Err(ToolClientError::AsyncFd(e)), - }; let mut obj_ids = Bitfield::default(); obj_ids.take(0); obj_ids.take(1); let slf = Rc::new(Self { logger, el, + ring, wheel, eng, obj_ids: RefCell::new(obj_ids), @@ -197,7 +200,7 @@ impl ToolClient { slf.eng.spawn( Incoming { tc: slf.clone(), - buf: BufFdIn::new(fd.clone()), + buf: BufFdIn::new(&socket, &slf.ring), } .run(), ), @@ -206,7 +209,7 @@ impl ToolClient { slf.eng.spawn( Outgoing { tc: slf.clone(), - buf: BufFdOut::new(fd.clone(), &slf.wheel), + buf: BufFdOut::new(&socket, &slf.ring, &slf.wheel), buffers: Default::default(), } .run(), diff --git a/src/utils/buffd.rs b/src/utils/buffd.rs index d5b9aa96..9af64454 100644 --- a/src/utils/buffd.rs +++ b/src/utils/buffd.rs @@ -1,4 +1,7 @@ -use {crate::async_engine::AsyncError, thiserror::Error}; +use { + crate::{io_uring::IoUringError, utils::oserror::OsError}, + thiserror::Error, +}; pub use { buf_in::BufFdIn, buf_out::{BufFdOut, OutBuffer, OutBufferSwapchain}, @@ -14,9 +17,9 @@ mod parser; #[derive(Debug, Error)] pub enum BufFdError { #[error("An IO error occurred")] - Io(#[source] crate::utils::oserror::OsError), - #[error("An async error occurred")] - Async(#[from] AsyncError), + Io(#[source] OsError), + #[error("An io-uring error occurred")] + Ring(#[from] IoUringError), #[error("The peer did not send a file descriptor")] NoFd, #[error("The peer sent too many file descriptors")] diff --git a/src/utils/buffd/buf_in.rs b/src/utils/buffd/buf_in.rs index 726547e0..811ab1ae 100644 --- a/src/utils/buffd/buf_in.rs +++ b/src/utils/buffd/buf_in.rs @@ -1,14 +1,15 @@ use { crate::{ - async_engine::AsyncFd, + io_uring::IoUring, utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE, MAX_IN_FD}, }, - std::{collections::VecDeque, mem::MaybeUninit}, + std::{collections::VecDeque, mem::MaybeUninit, rc::Rc}, uapi::{c, Errno, OwnedFd, Pod}, }; pub struct BufFdIn { - fd: AsyncFd, + fd: Rc, + ring: Rc, in_fd: VecDeque, @@ -19,9 +20,10 @@ pub struct BufFdIn { } impl BufFdIn { - pub fn new(fd: AsyncFd) -> Self { + pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { - fd, + fd: fd.clone(), + ring: ring.clone(), in_fd: Default::default(), in_buf: Box::new([MaybeUninit::uninit(); BUF_SIZE]), in_cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]), @@ -35,7 +37,7 @@ impl BufFdIn { let mut offset = 0; while offset < bytes.len() { if self.read_full_(bytes, &mut offset)? { - self.fd.readable().await?; + self.ring.readable(&self.fd).await?; } } Ok(()) diff --git a/src/utils/buffd/buf_out.rs b/src/utils/buffd/buf_out.rs index 3bbd2a42..71bc690e 100644 --- a/src/utils/buffd/buf_out.rs +++ b/src/utils/buffd/buf_out.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::AsyncFd, + io_uring::IoUring, utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE}, wheel::{Wheel, WheelTimeoutFuture}, }, @@ -79,16 +79,18 @@ impl OutBufferSwapchain { } pub struct BufFdOut { - fd: AsyncFd, + fd: Rc, + ring: Rc, wheel: Rc, cmsg_buf: Box<[MaybeUninit; CMSG_BUF_SIZE]>, fd_ids: Vec, } impl BufFdOut { - pub fn new(fd: AsyncFd, wheel: &Rc) -> Self { + pub fn new(fd: &Rc, ring: &Rc, wheel: &Rc) -> Self { Self { - fd, + fd: fd.clone(), + ring: ring.clone(), wheel: wheel.clone(), cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]), fd_ids: vec![], @@ -109,7 +111,7 @@ impl BufFdOut { _ = timeout.as_mut().unwrap() => { return Err(BufFdError::Timeout); }, - res = self.fd.writable().fuse() => { + res = self.ring.writable(&self.fd).fuse() => { res?; }, } @@ -123,7 +125,7 @@ impl BufFdOut { pub async fn flush_no_timeout(&mut self, buf: &mut OutBuffer) -> Result<(), BufFdError> { while buf.read_pos < buf.write_pos { if self.flush_sync(buf)? { - self.fd.writable().await?; + let _ = self.ring.writable(&self.fd).await?; } } buf.read_pos = 0; @@ -186,7 +188,7 @@ impl BufFdOut { let mut read_pos = 0; while read_pos < buf.len() { if self.flush_sync2(&mut read_pos, buf, fds)? { - self.fd.writable().await?; + self.ring.writable(&self.fd).await?; } } Ok(()) diff --git a/src/utils/bufio.rs b/src/utils/bufio.rs index f3793da8..1dfc98e1 100644 --- a/src/utils/bufio.rs +++ b/src/utils/bufio.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncError, AsyncFd}, + io_uring::{IoUring, IoUringError}, utils::{ oserror::OsError, queue::AsyncQueue, @@ -26,9 +26,9 @@ pub enum BufIoError { #[error("Could not read from the socket")] ReadError(#[source] OsError), #[error("Cannot wait for fd to become writable")] - Writable(#[source] AsyncError), + Writable(#[source] IoUringError), #[error("Cannot wait for fd to become readable")] - Readable(#[source] AsyncError), + Readable(#[source] IoUringError), #[error("The socket is closed")] Closed, } @@ -44,7 +44,8 @@ struct MessageOffset { } pub struct BufIo { - fd: AsyncFd, + fd: Rc, + ring: Rc, bufs: Stack>, outgoing: AsyncQueue, } @@ -69,9 +70,10 @@ struct Outgoing { } impl BufIo { - pub fn new(fd: AsyncFd) -> Self { + pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { - fd, + fd: fd.clone(), + ring: ring.clone(), bufs: Default::default(), outgoing: Default::default(), } @@ -130,7 +132,7 @@ impl BufIoIncoming { if e.0 != c::EAGAIN { return Err(BufIoError::ReadError(e.into())); } - if let Err(e) = self.bufio.fd.readable().await { + if let Err(e) = self.bufio.ring.readable(&self.bufio.fd).await { return Err(BufIoError::Readable(e)); } } @@ -184,7 +186,7 @@ impl Outgoing { if e != Errno(c::EAGAIN) { return Err(BufIoError::FlushError(e.into())); } - if let Err(e) = self.bufio.fd.writable().await { + if let Err(e) = self.bufio.ring.writable(&self.bufio.fd).await { return Err(BufIoError::Writable(e)); } } diff --git a/src/wheel.rs b/src/wheel.rs index b51a0689..591b2f3e 100644 --- a/src/wheel.rs +++ b/src/wheel.rs @@ -1,6 +1,7 @@ use { crate::{ - async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture}, + async_engine::{AsyncEngine, AsyncError, SpawnedFuture}, + io_uring::IoUring, time::{Time, TimeError}, utils::{ copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsError, @@ -18,7 +19,7 @@ use { time::Duration, }, thiserror::Error, - uapi::c, + uapi::{c, OwnedFd}, }; #[derive(Debug, Error)] @@ -99,7 +100,8 @@ impl Future for WheelTimeoutFuture { pub struct WheelData { destroyed: Cell, - fd: AsyncFd, + ring: Rc, + fd: Rc, next_id: NumCell, start: Time, current_expiration: Cell>, @@ -110,14 +112,14 @@ pub struct WheelData { } impl Wheel { - pub fn new(eng: &Rc) -> Result, WheelError> { + pub fn new(eng: &Rc, ring: &Rc) -> Result, WheelError> { let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { Ok(fd) => Rc::new(fd), Err(e) => return Err(WheelError::CreateFailed(e.into())), }; - let fd = eng.fd(&fd)?; let data = Rc::new(WheelData { destroyed: Cell::new(false), + ring: ring.clone(), fd, next_id: NumCell::new(1), start: Time::now()?, @@ -207,7 +209,7 @@ impl WheelData { async fn dispatch(self: Rc) { loop { - if let Err(e) = self.fd.readable().await { + if let Err(e) = self.ring.readable(&self.fd).await { log::error!( "Could not wait for the timerfd to become readable: {}", ErrorFmt(e) diff --git a/src/xcon.rs b/src/xcon.rs index a64a463c..0d8dfb3b 100644 --- a/src/xcon.rs +++ b/src/xcon.rs @@ -5,8 +5,9 @@ pub use crate::xcon::{ }; use { crate::{ - async_engine::{AsyncEngine, AsyncError, Phase, SpawnedFuture}, + async_engine::{AsyncError, Phase, SpawnedFuture}, compositor::DISPLAY, + state::State, utils::{ bufio::{BufIo, BufIoError, BufIoMessage}, clonecell::CloneCell, @@ -385,7 +386,7 @@ impl Xcon { Ok(id) } - pub async fn connect(eng: Rc) -> Result, XconError> { + pub async fn connect(state: &Rc) -> Result, XconError> { let authority = match XAuthority::load() { Ok(a) => a, Err(e) => { @@ -433,18 +434,17 @@ impl Xcon { } (&[], &[]) }; - Self::connect_to_fd(&eng, &fd, auth_method, auth_value).await + Self::connect_to_fd(state, &fd, auth_method, auth_value).await } pub async fn connect_to_fd( - eng: &Rc, + state: &Rc, fd: &Rc, auth_method: &[u8], auth_value: &[u8], ) -> Result, XconError> { - let fd = eng.fd(fd)?; let data = Rc::new(XconData { - bufio: Rc::new(BufIo::new(fd)), + bufio: Rc::new(BufIo::new(fd, &state.ring)), next_serial: NumCell::new(1), last_recv_serial: Cell::new(0), reply_handlers: Default::default(), @@ -454,7 +454,9 @@ impl Xcon { xorg: CloneCell::new(Weak::new()), events: Default::default(), }); - let outgoing = eng.spawn2(Phase::PostLayout, handle_outgoing(data.clone())); + let outgoing = state + .eng + .spawn2(Phase::PostLayout, handle_outgoing(data.clone())); let mut buf = data.bufio.buf(); let mut fds = vec![]; { @@ -497,7 +499,7 @@ impl Xcon { return Err(XconError::Authenticate(reason.to_owned())); } let setup = Setup::deserialize(&mut parser)?; - let incoming = eng.spawn(handle_incoming(data.clone(), incoming)); + let incoming = state.eng.spawn(handle_incoming(data.clone(), incoming)); let slf = Rc::new(Self { extensions: data.fetch_extension_data().await?, outgoing: Cell::new(Some(outgoing)), diff --git a/src/xwayland.rs b/src/xwayland.rs index 823ff0f0..54a19bbf 100644 --- a/src/xwayland.rs +++ b/src/xwayland.rs @@ -3,7 +3,6 @@ mod xwm; use { crate::{ - async_engine::AsyncError, client::ClientError, compositor::DISPLAY, forker::{ForkerError, ForkerProxy}, @@ -19,6 +18,7 @@ use { WlSurface, }, }, + io_uring::IoUringError, state::State, user_session::import_environment, utils::{errorfmt::ErrorFmt, oserror::OsError, tri::Try}, @@ -59,8 +59,8 @@ enum XWaylandError { BindFailed(#[source] OsError), #[error("All X displays in the range 0..1000 are already in use")] AddressesInUse, - #[error("The async engine returned an error")] - AsyncError(#[from] AsyncError), + #[error("The io-uring returned an error")] + RingError(#[from] IoUringError), #[error("pipe(2) failed")] Pipe(#[source] OsError), #[error("socketpair(2) failed")] @@ -120,12 +120,7 @@ pub async fn manage(state: Rc) { if state.backend.get().import_environment() { import_environment(&state, DISPLAY, &display); } - let res = XWaylandError::tria(async { - state.eng.fd(&socket)?.readable().await?; - Ok(()) - }) - .await; - if let Err(e) = res { + if let Err(e) = state.ring.readable(&socket).await { log::error!("{}", ErrorFmt(e)); return; } @@ -192,7 +187,7 @@ async fn run( Ok(c) => c, Err(e) => return Err(XWaylandError::SpawnClient(e)), }; - state.eng.fd(&Rc::new(dfdread))?.readable().await?; + state.ring.readable(&Rc::new(dfdread)).await?; state.xwayland.queue.clear(); { let shared = Rc::new(XwmShared::default()); @@ -201,7 +196,7 @@ async fn run( Err(e) => return Err(XWaylandError::CreateWm(Box::new(e))), }; let _wm = state.eng.spawn(wm.run()); - state.eng.fd(&Rc::new(pidfd))?.readable().await?; + state.ring.readable(&Rc::new(pidfd)).await?; } state.xwayland.queue.clear(); stderr_read.await; @@ -226,6 +221,7 @@ pub fn build_args(fds: &[OwnedFd]) -> (String, Vec) { } async fn log_xwayland(state: Rc, stderr: OwnedFd) { + let stderr = Rc::new(stderr); let res = Errno::tri(|| { uapi::fcntl_setfl( stderr.raw(), @@ -237,21 +233,11 @@ async fn log_xwayland(state: Rc, stderr: OwnedFd) { log::error!("Could not set stderr fd to nonblock: {}", ErrorFmt(e)); return; } - let afd = match state.eng.fd(&Rc::new(stderr)) { - Ok(f) => f, - Err(e) => { - log::error!( - "Could not turn the stderr fd into an async fd: {}", - ErrorFmt(e) - ); - return; - } - }; let mut buf = vec![]; let mut buf2 = [0; 128]; let mut done = false; while !done { - if let Err(e) = afd.readable().await { + if let Err(e) = state.ring.readable(&stderr).await { log::error!( "Cannot wait for the xwayland stderr to become readable: {}", ErrorFmt(e) @@ -259,7 +245,7 @@ async fn log_xwayland(state: Rc, stderr: OwnedFd) { return; } loop { - match uapi::read(afd.raw(), &mut buf2[..]) { + match uapi::read(stderr.raw(), &mut buf2[..]) { Ok(buf2) if buf2.len() > 0 => { buf.extend_from_slice(buf2); } diff --git a/src/xwayland/xwm.rs b/src/xwayland/xwm.rs index e9ec3622..bbe1c162 100644 --- a/src/xwayland/xwm.rs +++ b/src/xwayland/xwm.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncFd, SpawnedFuture}, + async_engine::SpawnedFuture, client::Client, ifs::{ ipc::{ @@ -18,6 +18,7 @@ use { WlSurface, }, }, + io_uring::{IoUring, TaskResultExt}, rect::Rect, state::State, tree::ToplevelNode, @@ -282,7 +283,7 @@ impl Wm { socket: OwnedFd, shared: &Rc, ) -> Result { - let c = match Xcon::connect_to_fd(&state.eng, &Rc::new(socket), &[], &[]).await { + let c = match Xcon::connect_to_fd(&state, &Rc::new(socket), &[], &[]).await { Ok(c) => c, Err(e) => return Err(XWaylandError::Connect(e)), }; @@ -1576,19 +1577,13 @@ impl Wm { log::error!("Could not make pipe nonblocking: {}", e); break 'convert; } - let fd = match self.state.eng.fd(&Rc::new(rx)) { - Ok(afd) => afd, - Err(e) => { - log::error!("Could not create an async fd: {}", ErrorFmt(e)); - break 'convert; - } - }; success = None; receive_data_offer::(&offer.offer, &mt, Rc::new(tx)); let id = self.transfer_ids.fetch_add(1); let wtx = WaylandToXTransfer { id, - fd, + fd: Rc::new(rx), + ring: self.state.ring.clone(), c: self.c.clone(), window: event.requestor, time: event.time, @@ -2391,11 +2386,11 @@ impl XToWaylandTransfer { while pos < self.data.len() { let f1 = self .state - .io_uring + .ring .write(&self.fd, &self.data, pos, self.data.len() - pos); pin_mut!(f1); match future::select(f1, &mut timeout).await { - Either::Left((res, _)) => match res { + Either::Left((res, _)) => match res.merge() { Ok(n) => pos += n, Err(e) => { log::error!("Could not write to wayland client: {}", ErrorFmt(e)); @@ -2414,7 +2409,8 @@ impl XToWaylandTransfer { struct WaylandToXTransfer { id: u64, - fd: AsyncFd, + fd: Rc, + ring: Rc, c: Rc, window: u32, time: u32, @@ -2449,7 +2445,7 @@ impl WaylandToXTransfer { } } Err(Errno(c::EAGAIN)) => { - if let Err(e) = self.fd.readable().await { + if let Err(e) = self.ring.readable(&self.fd).await { log::error!("Could not wait for fd to become readable: {}", ErrorFmt(e)); break; }