From 285724b4f168bb47b979e47adc8b1823b71347ff Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Wed, 11 May 2022 21:45:15 +0200 Subject: [PATCH] xwayland: use io_uring to prevent lockups See https://gitlab.freedesktop.org/wayland/wayland/-/issues/296 --- src/compositor.rs | 5 + src/io_uring.rs | 443 +++++++++++++++++++++++++++++++ src/io_uring/ops.rs | 2 + src/io_uring/ops/async_cancel.rs | 52 ++++ src/io_uring/ops/write.rs | 80 ++++++ src/io_uring/pending_result.rs | 123 +++++++++ src/io_uring/sys.rs | 397 +++++++++++++++++++++++++++ src/main.rs | 1 + src/state.rs | 2 + src/utils.rs | 1 + src/utils/asyncevent.rs | 1 + src/utils/mmap.rs | 34 +++ src/xwayland/xwm.rs | 66 +++-- 13 files changed, 1173 insertions(+), 34 deletions(-) create mode 100644 src/io_uring.rs create mode 100644 src/io_uring/ops.rs create mode 100644 src/io_uring/ops/async_cancel.rs create mode 100644 src/io_uring/ops/write.rs create mode 100644 src/io_uring/pending_result.rs create mode 100644 src/io_uring/sys.rs create mode 100644 src/utils/mmap.rs diff --git a/src/compositor.rs b/src/compositor.rs index 92c10d89..8d7f48ba 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -18,6 +18,7 @@ use { forker, globals::Globals, ifs::{wl_output::WlOutputGlobal, wl_surface::NoneSurfaceExt}, + io_uring::{IoUring, IoUringError}, leaks, logger::Logger, render::{self, RenderError}, @@ -87,6 +88,8 @@ pub enum CompositorError { AsyncError(#[from] AsyncError), #[error("The render backend caused an error")] RenderError(#[from] RenderError), + #[error("Could not create an io-uring")] + IoUringError(#[from] IoUringError), } pub const WAYLAND_DISPLAY: &str = "WAYLAND_DISPLAY"; @@ -117,6 +120,7 @@ fn start_compositor2( let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap(); let wheel = Wheel::install(&el)?; let engine = AsyncEngine::install(&el, &wheel)?; + let io_uring = IoUring::new(&engine, 32)?; let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine); let node_ids = NodeIds::default(); let state = Rc::new(State { @@ -182,6 +186,7 @@ fn start_compositor2( tracker: Default::default(), data_offer_ids: Default::default(), drm_dev_ids: Default::default(), + io_uring, }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); diff --git a/src/io_uring.rs b/src/io_uring.rs new file mode 100644 index 00000000..c8e63dec --- /dev/null +++ b/src/io_uring.rs @@ -0,0 +1,443 @@ +use { + crate::{ + async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture}, + io_uring::{ + ops::{async_cancel::AsyncCancelTask, write::WriteTask}, + pending_result::PendingResults, + sys::{ + io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, + IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQES, + IORING_OFF_SQ_RING, + }, + }, + utils::{ + asyncevent::AsyncEvent, + bitflags::BitflagsExt, + copyhashmap::CopyHashMap, + errorfmt::ErrorFmt, + mmap::{mmap, Mmapped}, + numcell::NumCell, + oserror::OsError, + ptr_ext::{MutPtrExt, PtrExt}, + queue::AsyncQueue, + stack::Stack, + }, + }, + std::{ + cell::{Cell, UnsafeCell}, + mem::{self}, + rc::Rc, + sync::atomic::{ + AtomicU32, + Ordering::{Acquire, Relaxed, Release}, + }, + }, + thiserror::Error, + uapi::c::{self}, +}; + +macro_rules! map_err { + ($n:expr) => {{ + let n = $n; + if n < 0 { + Err(crate::utils::oserror::OsError::from(-n as uapi::c::c_int)) + } else { + Ok(n) + } + }}; +} + +mod ops; +mod pending_result; +mod sys; + +#[derive(Debug, Error)] +pub enum IoUringError { + #[error(transparent)] + OsError(OsError), + #[error("The async engine returned an error")] + AsyncError(#[from] AsyncError), + #[error("Could not create an io-uring")] + CreateUring(#[source] OsError), + #[error("The kernel does not support the IORING_FEAT_NODROP feature")] + NoDrop, + #[error("Could not map the submission queue ring")] + MapSqRing(#[source] OsError), + #[error("Could not map the submission queue entries")] + MapSqEntries(#[source] OsError), + #[error("Could not map the completion queue ring")] + MapCqRing(#[source] OsError), + #[error("The io-uring has already been destroyed")] + Destroyed, +} + +pub struct IoUring { + ring: Rc, +} + +impl Drop for IoUring { + fn drop(&mut self) { + self.ring.kill(); + } +} + +impl IoUring { + pub fn new(eng: &Rc, entries: u32) -> Result { + let mut params = io_uring_params::default(); + let fd = match io_uring_setup(entries, &mut params) { + Ok(f) => Rc::new(f), + Err(e) => return Err(IoUringError::CreateUring(e.into())), + }; + if !params.features.contains(IORING_FEAT_NODROP) { + return Err(IoUringError::NoDrop); + } + let sqmap_map = mmap( + (params.sq_off.array + params.sq_entries * 4) as _, + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_SQ_RING as _, + ); + let sqmap_map = match sqmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapSqRing(e)), + }; + let sqesmap_map = mmap( + params.sq_entries as usize * mem::size_of::(), + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_SQES as _, + ); + let sqesmap_map = match sqesmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapSqEntries(e)), + }; + let cqmap_map = mmap( + params.cq_off.cqes as usize + + params.sq_entries as usize * mem::size_of::(), + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_CQ_RING as _, + ); + let cqmap_map = match cqmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapCqRing(e)), + }; + let sqmask = unsafe { + *(sqmap_map.ptr as *const u8) + .add(params.sq_off.ring_mask as _) + .cast() + }; + let sqhead = unsafe { + (sqmap_map.ptr as *const u8) + .add(params.sq_off.head as _) + .cast() + }; + let sqtail = unsafe { + (sqmap_map.ptr as *const u8) + .add(params.sq_off.tail as _) + .cast() + }; + let sqmap = unsafe { + let base = (sqmap_map.ptr as *const u8) + .add(params.sq_off.array as _) + .cast(); + std::slice::from_raw_parts(base, params.sq_entries as _) + }; + let sqesmap = unsafe { + let base = (sqesmap_map.ptr as *const u8).cast(); + std::slice::from_raw_parts(base, params.sq_entries as _) + }; + let cqmask = unsafe { + *(cqmap_map.ptr as *const u8) + .add(params.cq_off.ring_mask as _) + .cast() + }; + let cqhead = unsafe { + (cqmap_map.ptr as *const u8) + .add(params.cq_off.head as _) + .cast() + }; + let cqtail = unsafe { + (cqmap_map.ptr as *const u8) + .add(params.cq_off.tail as _) + .cast() + }; + let cqmap = unsafe { + let base = (cqmap_map.ptr as *const u8) + .add(params.cq_off.cqes as _) + .cast(); + std::slice::from_raw_parts(base, params.cq_entries as _) + }; + let fd = eng.fd(&fd)?; + let data = Rc::new(IoUringData { + destroyed: Cell::new(false), + fd, + _sqesmap_map: sqesmap_map, + _sqmap_map: sqmap_map, + sqmask, + sqlen: params.sq_entries, + sqhead, + sqtail, + sqmap, + sqesmap, + _cqmap_map: cqmap_map, + cqmask, + cqhead, + cqtail, + cqmap, + cqes_consumed: Default::default(), + next: Default::default(), + to_encode: Default::default(), + pending_in_kernel: Default::default(), + tasks: Default::default(), + pending_results: Default::default(), + cached_writes: Default::default(), + cached_cancels: Default::default(), + reader: Cell::new(None), + submitter: Cell::new(None), + }); + let submitter = eng.spawn2(Phase::Present, data.clone().submit()); + let reader = eng.spawn(data.clone().reader()); + data.reader.set(Some(reader)); + data.submitter.set(Some(submitter)); + Ok(Self { ring: data }) + } +} + +struct IoUringData { + destroyed: Cell, + + fd: AsyncFd, + + _sqesmap_map: Mmapped, + _sqmap_map: Mmapped, + sqmask: u32, + sqlen: u32, + sqhead: *const AtomicU32, + sqtail: *const AtomicU32, + sqmap: *const [Cell], + sqesmap: *const [UnsafeCell], + + _cqmap_map: Mmapped, + cqmask: u32, + cqhead: *const AtomicU32, + cqtail: *const AtomicU32, + cqmap: *const [Cell], + + cqes_consumed: AsyncEvent, + + next: NumCell, + to_encode: AsyncQueue, + pending_in_kernel: CopyHashMap, + tasks: CopyHashMap>, + + pending_results: PendingResults, + cached_writes: Stack>, + cached_cancels: Stack>, + + reader: Cell>>, + submitter: Cell>>, +} + +unsafe trait Task { + fn id(&self) -> u64; + fn complete(self: Box, ring: &IoUringData, res: i32); + fn encode(&self, sqe: &mut io_uring_sqe); + + fn is_cancel(&self) -> bool { + false + } +} + +impl IoUringData { + async fn reader(self: Rc) { + loop { + if !self.dispatch_completions() { + match self.fd.readable().await { + Err(e) => { + log::error!("Could not wait for the fd to become readable: {}", e); + } + Ok(FdStatus::Err) => { + log::error!("Fd is in an error state"); + } + _ => continue, + } + self.kill(); + return; + } + } + } + + fn dispatch_completions(&self) -> bool { + unsafe { + let mut head = self.cqhead.deref().load(Relaxed); + let tail = self.cqtail.deref().load(Acquire); + if head == tail { + return false; + } + while head != tail { + let idx = (head & self.cqmask) as usize; + let entry = self.cqmap.deref()[idx].get(); + head = head.wrapping_add(1); + self.cqhead.deref().store(head, Release); + if let Some(pending) = self.tasks.remove(&entry.user_data) { + self.pending_in_kernel.remove(&entry.user_data); + pending.complete(&self, entry.res); + } + } + self.cqhead.deref().store(head, Release); + self.cqes_consumed.trigger(); + true + } + } + + fn encode(&self) -> usize { + let tasks = self.tasks.lock(); + let mut encoded = 0; + unsafe { + let mut tail = self.sqtail.deref().load(Relaxed); + let head = self.sqhead.deref().load(Acquire); + while tail.wrapping_sub(head) < self.sqlen { + let id = match self.to_encode.try_pop() { + Some(t) => t, + _ => break, + }; + let task = match tasks.get(&id) { + Some(t) => t, + _ => continue, + }; + self.pending_in_kernel.set(id, ()); + let idx = (tail & self.sqmask) as usize; + let mut sqe = self.sqesmap.deref()[idx].get().deref_mut(); + self.sqmap.deref()[idx].set(idx as _); + sqe.user_data = id; + task.encode(sqe); + tail = tail.wrapping_add(1); + encoded += 1; + } + self.sqtail.deref().store(tail, Release); + } + encoded + } + + async fn submit(self: Rc) { + let mut to_submit = 0; + loop { + self.to_encode.non_empty().await; + to_submit += self.encode(); + if to_submit == 0 { + match self.fd.writable().await { + Err(e) => { + log::error!("Could not write for fd to become writable: {}", ErrorFmt(e)); + } + Ok(FdStatus::Err) => { + log::error!("Fd is in an error state"); + } + _ => continue, + } + self.kill(); + return; + } + while to_submit > 0 { + let res = io_uring_enter(self.fd.raw(), to_submit as _, 0, 0); + match res { + Ok(0) => { + panic!("io_uring_enter returned 0"); + } + Ok(n) => to_submit -= n, + Err(e) => match e.0 { + c::EAGAIN | c::EBUSY => { + log::debug!("waiting for completion events"); + self.cqes_consumed.clear(); + self.cqes_consumed.triggered().await; + } + _ => { + log::error!("io_uring_enter returned an error: {}", ErrorFmt(e)); + self.kill(); + return; + } + }, + } + } + } + } + + fn id(&self) -> Cancellable { + Cancellable { + id: self.id_raw(), + data: self, + } + } + + fn id_raw(&self) -> u64 { + self.next.fetch_add(1) + } + + fn cancel_task(&self, id: u64) { + if !self.tasks.contains(&id) { + return; + } + if !self.pending_in_kernel.contains(&id) { + self.tasks + .remove(&id) + .unwrap() + .complete(self, -c::ECANCELED); + return; + } + self.cancel_task_in_kernel(id); + } + + fn schedule(&self, t: Box) { + assert!(!self.destroyed.get()); + self.to_encode.push(t.id()); + self.tasks.set(t.id(), t); + } + + fn check_destroyed(&self) -> Result<(), IoUringError> { + if self.destroyed.get() { + Err(IoUringError::Destroyed) + } else { + Ok(()) + } + } + + fn kill(&self) { + self.reader.take(); + self.submitter.take(); + let mut to_cancel = vec![]; + for task in self.tasks.lock().values() { + if !task.is_cancel() { + to_cancel.push(task.id()); + } + } + for task in to_cancel { + self.cancel_task(task); + } + self.destroyed.set(true); + while !self.tasks.is_empty() { + self.encode(); + let _ = io_uring_enter(self.fd.raw(), u32::MAX, 0, 0); + let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); + if let Err(e) = res { + panic!("Could not wait for io_uring to drain: {}", ErrorFmt(e)); + } + while self.dispatch_completions() { + // nothing + } + } + } +} + +struct Cancellable<'a> { + id: u64, + data: &'a IoUringData, +} + +impl<'a> Drop for Cancellable<'a> { + fn drop(&mut self) { + self.data.cancel_task(self.id); + } +} diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs new file mode 100644 index 00000000..2c3d89f0 --- /dev/null +++ b/src/io_uring/ops.rs @@ -0,0 +1,2 @@ +pub mod async_cancel; +pub mod write; diff --git a/src/io_uring/ops/async_cancel.rs b/src/io_uring/ops/async_cancel.rs new file mode 100644 index 00000000..1fb514b3 --- /dev/null +++ b/src/io_uring/ops/async_cancel.rs @@ -0,0 +1,52 @@ +use { + crate::{ + io_uring::{ + sys::{io_uring_sqe, IORING_OP_ASYNC_CANCEL}, + IoUringData, Task, + }, + utils::errorfmt::ErrorFmt, + }, + std::cell::Cell, +}; + +pub struct AsyncCancelTask { + id: Cell, + target: Cell, +} + +impl IoUringData { + pub fn cancel_task_in_kernel(&self, target: u64) { + let task = self.cached_cancels.pop().unwrap_or_else(|| { + Box::new(AsyncCancelTask { + id: Cell::new(0), + target: Cell::new(0), + }) + }); + let id = self.id_raw(); + task.id.set(id); + task.target.set(target); + self.schedule(task); + } +} + +unsafe impl Task for AsyncCancelTask { + fn id(&self) -> u64 { + self.id.get() + } + + fn complete(self: Box, ring: &IoUringData, res: i32) { + if let Err(e) = map_err!(res) { + log::debug!("Could not cancel task: {}", ErrorFmt(e)); + } + ring.cached_cancels.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + sqe.opcode = IORING_OP_ASYNC_CANCEL; + sqe.u2.addr = self.target.get(); + } + + fn is_cancel(&self) -> bool { + true + } +} diff --git a/src/io_uring/ops/write.rs b/src/io_uring/ops/write.rs new file mode 100644 index 00000000..6165cacf --- /dev/null +++ b/src/io_uring/ops/write.rs @@ -0,0 +1,80 @@ +use { + crate::io_uring::{ + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_WRITE}, + IoUring, IoUringData, IoUringError, Task, + }, + std::{ + cell::{Cell, RefCell}, + ops::Range, + rc::Rc, + }, + uapi::OwnedFd, +}; + +impl IoUring { + pub async fn write( + &self, + fd: &Rc, + buf: &Rc>, + offset: usize, + n: usize, + ) -> Result { + self.ring.check_destroyed()?; + let id = self.ring.id(); + let pr = self.ring.pending_results.acquire(); + { + let pw = self.ring.cached_writes.pop().unwrap_or_else(|| { + Box::new(WriteTask { + id: Cell::new(0), + data: Default::default(), + }) + }); + pw.id.set(id.id); + *pw.data.borrow_mut() = Some(WriteTaskData { + fd: fd.clone(), + buf: buf.clone(), + range: offset..offset + n, + res: pr.clone(), + }); + self.ring.schedule(pw); + } + Ok(pr.await? as usize) + } +} + +struct WriteTaskData { + fd: Rc, + buf: Rc>, + range: Range, + res: PendingResult, +} + +pub struct WriteTask { + id: Cell, + data: RefCell>, +} + +unsafe impl Task for WriteTask { + fn id(&self) -> u64 { + self.id.get() + } + + fn complete(self: Box, ring: &IoUringData, res: i32) { + if let Some(data) = self.data.borrow_mut().take() { + data.res.complete(res); + } + ring.clone().cached_writes.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + let data = self.data.borrow_mut(); + let data = data.as_ref().unwrap(); + sqe.opcode = IORING_OP_WRITE; + sqe.fd = data.fd.raw(); + sqe.u1.off = !0; + sqe.u2.addr = data.buf[data.range.clone()].as_ptr() as _; + sqe.u3.rw_flags = 0; + sqe.len = data.range.len() as _; + } +} diff --git a/src/io_uring/pending_result.rs b/src/io_uring/pending_result.rs new file mode 100644 index 00000000..a323aad3 --- /dev/null +++ b/src/io_uring/pending_result.rs @@ -0,0 +1,123 @@ +use { + crate::{ + io_uring::IoUringError, + utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack}, + }, + std::{ + cell::Cell, + future::Future, + pin::Pin, + rc::{Rc, Weak}, + task::{Context, Poll, Waker}, + }, + uapi::c, +}; + +#[derive(Default)] +pub struct PendingResults { + data: Rc, +} + +impl PendingResults { + pub fn acquire(&self) -> PendingResult { + let pr = self.data.unused.pop().unwrap_or_else(|| { + Box::into_raw(Box::new(PendingResultData { + rc: NumCell::new(0), + base: Rc::downgrade(&self.data), + waker: Cell::new(None), + res: Cell::new(None), + })) + }); + unsafe { + let prr = pr.deref(); + debug_assert_eq!(prr.rc.get(), 0); + prr.rc.fetch_add(1); + PendingResult { pr } + } + } +} + +#[derive(Default)] +struct PendingResultsData { + unused: Stack<*mut PendingResultData>, +} + +impl Drop for PendingResultsData { + fn drop(&mut self) { + while let Some(pr) = self.unused.pop() { + unsafe { + drop(Box::from_raw(pr)); + } + } + } +} + +struct PendingResultData { + rc: NumCell, + base: Weak, + waker: Cell>, + res: Cell>, +} + +pub struct PendingResult { + pr: *mut PendingResultData, +} + +impl PendingResult { + pub fn complete(&self, res: i32) { + unsafe { + let pr = self.pr.deref(); + pr.res.set(Some(res)); + if let Some(waker) = pr.waker.take() { + waker.wake(); + } + } + } +} + +impl Drop for PendingResult { + fn drop(&mut self) { + { + let pr = unsafe { self.pr.deref() }; + if pr.rc.fetch_sub(1) != 1 { + return; + } + if let Some(base) = pr.base.upgrade() { + pr.waker.set(None); + pr.res.set(None); + base.unused.push(self.pr); + return; + } + } + unsafe { + drop(Box::from_raw(self.pr)); + } + } +} + +impl Clone for PendingResult { + fn clone(&self) -> Self { + let pr = unsafe { self.pr.deref() }; + pr.rc.fetch_add(1); + Self { pr: self.pr } + } +} + +impl Future for PendingResult { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pr = unsafe { self.pr.deref() }; + if let Some(res) = pr.res.take() { + let res = if res < 0 { + Err(IoUringError::OsError(OsError::from(-res as c::c_int))) + } else { + Ok(res) + }; + Poll::Ready(res) + } else { + pr.waker.set(Some(cx.waker().clone())); + Poll::Pending + } + } +} diff --git a/src/io_uring/sys.rs b/src/io_uring/sys.rs new file mode 100644 index 00000000..cf97e087 --- /dev/null +++ b/src/io_uring/sys.rs @@ -0,0 +1,397 @@ +#![allow(non_camel_case_types, dead_code)] + +use { + crate::utils::oserror::OsError, + uapi::{c, OwnedFd}, +}; + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct io_uring_sqe { + pub opcode: u8, + pub flags: u8, + pub ioprio: u16, + pub fd: i32, + pub u1: io_uring_sqe_union1, + pub u2: io_uring_sqe_union2, + pub len: u32, + pub u3: io_uring_sqe_union3, + pub user_data: u64, + pub u4: io_uring_sqe_union4, + pub personality: u16, + pub u5: io_uring_sqe_union5, + pub __pad2: [u64; 2], +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub union io_uring_sqe_union1 { + pub off: u64, + pub addr2: u64, +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub union io_uring_sqe_union2 { + pub addr: u64, + pub splice_off_in: u64, +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub union io_uring_sqe_union3 { + pub rw_flags: c::c_int, + pub fsync_flags: u32, + pub poll_events: u16, + pub poll32_events: u32, + pub sync_range_flags: u32, + pub msg_flags: u32, + pub timeout_flags: u32, + pub accept_flags: u32, + pub cancel_flags: u32, + pub open_flags: u32, + pub statx_flags: u32, + pub fadvise_advice: u32, + pub splice_flags: u32, + pub rename_flags: u32, + pub unlink_flags: u32, + pub hardlink_flags: u32, +} + +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub union io_uring_sqe_union4 { + pub buf_index: u16, + pub buf_group: u16, +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub union io_uring_sqe_union5 { + pub splice_fd_in: i32, + pub file_index: u32, +} + +pub const IOSQE_FIXED_FILE_BIT: u8 = 0; +pub const IOSQE_IO_DRAIN_BIT: u8 = 1; +pub const IOSQE_IO_LINK_BIT: u8 = 2; +pub const IOSQE_IO_HARDLINK_BIT: u8 = 3; +pub const IOSQE_ASYNC_BIT: u8 = 4; +pub const IOSQE_BUFFER_SELECT_BIT: u8 = 5; +pub const IOSQE_CQE_SKIP_SUCCESS_BIT: u8 = 6; + +pub const IOSQE_FIXED_FILE: u8 = 1 << IOSQE_FIXED_FILE_BIT; +pub const IOSQE_IO_DRAIN: u8 = 1 << IOSQE_IO_DRAIN_BIT; +pub const IOSQE_IO_LINK: u8 = 1 << IOSQE_IO_LINK_BIT; +pub const IOSQE_IO_HARDLINK: u8 = 1 << IOSQE_IO_HARDLINK_BIT; +pub const IOSQE_ASYNC: u8 = 1 << IOSQE_ASYNC_BIT; +pub const IOSQE_BUFFER_SELECT: u8 = 1 << IOSQE_BUFFER_SELECT_BIT; +pub const IOSQE_CQE_SKIP_SUCCESS: u8 = 1 << IOSQE_CQE_SKIP_SUCCESS_BIT; + +pub const IORING_SETUP_IOPOLL: u32 = 1 << 0; +pub const IORING_SETUP_SQPOLL: u32 = 1 << 1; +pub const IORING_SETUP_SQ_AFF: u32 = 1 << 2; +pub const IORING_SETUP_CQSIZE: u32 = 1 << 3; +pub const IORING_SETUP_CLAMP: u32 = 1 << 4; +pub const IORING_SETUP_ATTACH_WQ: u32 = 1 << 5; +pub const IORING_SETUP_R_DISABLED: u32 = 1 << 6; + +pub const IORING_OP_NOP: u8 = 0; +pub const IORING_OP_READV: u8 = 1; +pub const IORING_OP_WRITEV: u8 = 2; +pub const IORING_OP_FSYNC: u8 = 3; +pub const IORING_OP_READ_FIXED: u8 = 4; +pub const IORING_OP_WRITE_FIXED: u8 = 5; +pub const IORING_OP_POLL_ADD: u8 = 6; +pub const IORING_OP_POLL_REMOVE: u8 = 7; +pub const IORING_OP_SYNC_FILE_RANGE: u8 = 8; +pub const IORING_OP_SENDMSG: u8 = 9; +pub const IORING_OP_RECVMSG: u8 = 10; +pub const IORING_OP_TIMEOUT: u8 = 11; +pub const IORING_OP_TIMEOUT_REMOVE: u8 = 12; +pub const IORING_OP_ACCEPT: u8 = 13; +pub const IORING_OP_ASYNC_CANCEL: u8 = 14; +pub const IORING_OP_LINK_TIMEOUT: u8 = 15; +pub const IORING_OP_CONNECT: u8 = 16; +pub const IORING_OP_FALLOCATE: u8 = 17; +pub const IORING_OP_OPENAT: u8 = 18; +pub const IORING_OP_CLOSE: u8 = 19; +pub const IORING_OP_FILES_UPDATE: u8 = 20; +pub const IORING_OP_STATX: u8 = 21; +pub const IORING_OP_READ: u8 = 22; +pub const IORING_OP_WRITE: u8 = 23; +pub const IORING_OP_FADVISE: u8 = 24; +pub const IORING_OP_MADVISE: u8 = 25; +pub const IORING_OP_SEND: u8 = 26; +pub const IORING_OP_RECV: u8 = 27; +pub const IORING_OP_OPENAT2: u8 = 28; +pub const IORING_OP_EPOLL_CTL: u8 = 29; +pub const IORING_OP_SPLICE: u8 = 30; +pub const IORING_OP_PROVIDE_BUFFERS: u8 = 31; +pub const IORING_OP_REMOVE_BUFFERS: u8 = 32; +pub const IORING_OP_TEE: u8 = 33; +pub const IORING_OP_SHUTDOWN: u8 = 34; +pub const IORING_OP_RENAMEAT: u8 = 35; +pub const IORING_OP_UNLINKAT: u8 = 36; +pub const IORING_OP_MKDIRAT: u8 = 37; +pub const IORING_OP_SYMLINKAT: u8 = 38; +pub const IORING_OP_LINKAT: u8 = 39; +pub const IORING_OP_LAST: u8 = 40; + +pub const IORING_FSYNC_DATASYNC: u32 = 1 << 0; + +pub const IORING_TIMEOUT_ABS: u32 = 1 << 0; +pub const IORING_TIMEOUT_UPDATE: u32 = 1 << 1; +pub const IORING_TIMEOUT_BOOTTIME: u32 = 1 << 2; +pub const IORING_TIMEOUT_REALTIME: u32 = 1 << 3; +pub const IORING_LINK_TIMEOUT_UPDATE: u32 = 1 << 4; +pub const IORING_TIMEOUT_ETIME_SUCCESS: u32 = 1 << 5; +pub const IORING_TIMEOUT_CLOCK_MASK: u32 = IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME; +pub const IORING_TIMEOUT_UPDATE_MASK: u32 = IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE; + +pub const SPLICE_F_FD_IN_FIXED: u32 = 1 << 31; + +pub const IORING_POLL_ADD_MULTI: u32 = 1 << 0; +pub const IORING_POLL_UPDATE_EVENTS: u32 = 1 << 1; +pub const IORING_POLL_UPDATE_USER_DATA: u32 = 1 << 2; + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_cqe { + pub user_data: u64, + pub res: i32, + pub flags: u32, +} + +pub const IORING_CQE_F_BUFFER: u32 = 1 << 0; +pub const IORING_CQE_F_MORE: u32 = 1 << 1; + +pub const IORING_CQE_BUFFER_SHIFT: u32 = 16; + +pub const IORING_OFF_SQ_RING: u64 = 0; +pub const IORING_OFF_CQ_RING: u64 = 0x8000000; +pub const IORING_OFF_SQES: u64 = 0x10000000; + +#[repr(C)] +#[derive(Debug, Copy, Clone, Default)] +pub struct io_sqring_offsets { + pub head: u32, + pub tail: u32, + pub ring_mask: u32, + pub ring_entries: u32, + pub flags: u32, + pub dropped: u32, + pub array: u32, + pub resv1: u32, + pub resv2: u64, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone, Default)] +pub struct io_cqring_offsets { + pub head: u32, + pub tail: u32, + pub ring_mask: u32, + pub ring_entries: u32, + pub overflow: u32, + pub cqes: u32, + pub flags: u32, + pub resv1: u32, + pub resv2: u64, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone, Default)] +pub struct io_uring_params { + pub sq_entries: u32, + pub cq_entries: u32, + pub flags: u32, + pub sq_thread_cpu: u32, + pub sq_thread_idle: u32, + pub features: u32, + pub wq_fd: u32, + pub resv: [u32; 3], + pub sq_off: io_sqring_offsets, + pub cq_off: io_cqring_offsets, +} + +pub const IORING_SQ_NEED_WAKEUP: u32 = 1 << 0; +pub const IORING_SQ_CQ_OVERFLOW: u32 = 1 << 1; + +pub const IORING_CQ_EVENTFD_DISABLED: u32 = 1 << 0; + +pub const IORING_ENTER_GETEVENTS: c::c_uint = 1 << 0; +pub const IORING_ENTER_SQ_WAKEUP: c::c_uint = 1 << 1; +pub const IORING_ENTER_SQ_WAIT: c::c_uint = 1 << 2; +pub const IORING_ENTER_EXT_ARG: c::c_uint = 1 << 3; + +pub const IORING_FEAT_SINGLE_MMAP: u32 = 1 << 0; +pub const IORING_FEAT_NODROP: u32 = 1 << 1; +pub const IORING_FEAT_SUBMIT_STABLE: u32 = 1 << 2; +pub const IORING_FEAT_RW_CUR_POS: u32 = 1 << 3; +pub const IORING_FEAT_CUR_PERSONALITY: u32 = 1 << 4; +pub const IORING_FEAT_FAST_POLL: u32 = 1 << 5; +pub const IORING_FEAT_POLL_32BITS: u32 = 1 << 6; +pub const IORING_FEAT_SQPOLL_NONFIXED: u32 = 1 << 7; +pub const IORING_FEAT_EXT_ARG: u32 = 1 << 8; +pub const IORING_FEAT_NATIVE_WORKERS: u32 = 1 << 9; +pub const IORING_FEAT_RSRC_TAGS: u32 = 1 << 10; +pub const IORING_FEAT_CQE_SKIP: u32 = 1 << 11; + +pub const IORING_REGISTER_BUFFERS: c::c_uint = 0; +pub const IORING_UNREGISTER_BUFFERS: c::c_uint = 1; +pub const IORING_REGISTER_FILES: c::c_uint = 2; +pub const IORING_UNREGISTER_FILES: c::c_uint = 3; +pub const IORING_REGISTER_EVENTFD: c::c_uint = 4; +pub const IORING_UNREGISTER_EVENTFD: c::c_uint = 5; +pub const IORING_REGISTER_FILES_UPDATE: c::c_uint = 6; +pub const IORING_REGISTER_EVENTFD_ASYNC: c::c_uint = 7; +pub const IORING_REGISTER_PROBE: c::c_uint = 8; +pub const IORING_REGISTER_PERSONALITY: c::c_uint = 9; +pub const IORING_UNREGISTER_PERSONALITY: c::c_uint = 10; +pub const IORING_REGISTER_RESTRICTIONS: c::c_uint = 11; +pub const IORING_REGISTER_ENABLE_RINGS: c::c_uint = 12; +pub const IORING_REGISTER_FILES2: c::c_uint = 13; +pub const IORING_REGISTER_FILES_UPDATE2: c::c_uint = 14; +pub const IORING_REGISTER_BUFFERS2: c::c_uint = 15; +pub const IORING_REGISTER_BUFFERS_UPDATE: c::c_uint = 16; +pub const IORING_REGISTER_IOWQ_AFF: c::c_uint = 17; +pub const IORING_UNREGISTER_IOWQ_AFF: c::c_uint = 18; +pub const IORING_REGISTER_IOWQ_MAX_WORKERS: c::c_uint = 19; + +pub const IO_WQ_BOUND: u32 = 0; +pub const IO_WQ_UNBOUND: u32 = 1; + +#[repr(C, align(8))] +#[derive(Debug, Copy, Clone)] +pub struct AlignedU64(pub u64); + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_files_update { + pub offset: u32, + pub resv: u32, + pub fds: AlignedU64, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_rsrc_register { + pub nr: u32, + pub resv: u32, + pub resv2: u64, + pub data: AlignedU64, + pub tags: AlignedU64, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_rsrc_update { + pub offset: u32, + pub resv: u32, + pub data: AlignedU64, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_rsrc_update2 { + pub offset: u32, + pub resv: u32, + pub data: AlignedU64, + pub tags: AlignedU64, + pub nr: u32, + pub resv2: u32, +} + +pub const IORING_REGISTER_FILES_SKIP: i32 = -2; + +pub const IO_URING_OP_SUPPORTED: u32 = 1 << 0; + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_probe_op { + pub op: u8, + pub resv: u8, + pub flags: u16, + pub resv2: u32, +} + +#[repr(C)] +#[derive(Debug)] +pub struct io_uring_probe { + pub last_op: u8, + pub ops_len: u8, + pub resv: u16, + pub resv2: [u32; 3], + pub ops: [io_uring_probe_op; 0], +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct io_uring_restriction { + pub opcode: u16, + pub u1: io_uring_restriction_union1, + pub resv: u8, + pub resv2: [u32; 3], +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub union io_uring_restriction_union1 { + pub register_op: u8, + pub sqe_op: u8, + pub sqe_flags: u8, +} + +pub const IORING_RESTRICTION_REGISTER_OP: u16 = 0; +pub const IORING_RESTRICTION_SQE_OP: u16 = 1; +pub const IORING_RESTRICTION_SQE_FLAGS_ALLOWED: u16 = 2; +pub const IORING_RESTRICTION_SQE_FLAGS_REQUIRED: u16 = 3; + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct io_uring_getevents_arg { + sigmask: u64, + sigmask_sz: u32, + pad: u32, + ts: u64, +} + +pub fn io_uring_setup(entries: u32, params: &mut io_uring_params) -> Result { + let res = unsafe { + c::syscall( + c::SYS_io_uring_setup, + entries as usize, + params as *mut _ as usize, + ) + }; + if res < 0 { + Err(OsError::default()) + } else { + Ok(OwnedFd::new(res as _)) + } +} + +pub fn io_uring_enter( + fd: c::c_int, + to_submit: c::c_uint, + min_complete: c::c_uint, + flags: c::c_uint, +) -> Result { + let res = unsafe { + c::syscall( + c::SYS_io_uring_enter, + fd as usize, + to_submit as usize, + min_complete as usize, + flags as usize, + 0usize, + 0usize, + ) + }; + if res < 0 { + Err(OsError::default()) + } else { + Ok(res as usize) + } +} diff --git a/src/main.rs b/src/main.rs index 49c64f20..f9d85d5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,7 @@ mod forker; mod format; mod globals; mod ifs; +mod io_uring; #[cfg(feature = "it")] mod it; mod libinput; diff --git a/src/state.rs b/src/state.rs index cabe89d1..61d2a1f5 100644 --- a/src/state.rs +++ b/src/state.rs @@ -24,6 +24,7 @@ use { }, zwp_linux_dmabuf_v1::ZwpLinuxDmabufV1Global, }, + io_uring::IoUring, leaks::Tracker, logger::Logger, rect::Rect, @@ -108,6 +109,7 @@ pub struct State { pub config_file_id: NumCell, pub tracker: Tracker, pub data_offer_ids: NumCell, + pub io_uring: IoUring, } // impl Drop for State { diff --git a/src/utils.rs b/src/utils.rs index 83a8bd72..284803d1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -12,6 +12,7 @@ pub mod fdcloser; pub mod hex; pub mod linkedlist; pub mod log_on_drop; +pub mod mmap; pub mod nonblock; pub mod num_cpus; pub mod numcell; diff --git a/src/utils/asyncevent.rs b/src/utils/asyncevent.rs index 1242ecd9..7b814f87 100644 --- a/src/utils/asyncevent.rs +++ b/src/utils/asyncevent.rs @@ -25,6 +25,7 @@ impl Debug for AsyncEvent { impl AsyncEvent { pub fn clear(&self) { + self.triggers.set(0); self.waker.take(); } diff --git a/src/utils/mmap.rs b/src/utils/mmap.rs new file mode 100644 index 00000000..de4db910 --- /dev/null +++ b/src/utils/mmap.rs @@ -0,0 +1,34 @@ +use { + crate::utils::{oserror::OsError, ptr_ext::PtrExt}, + std::ptr, + uapi::c, +}; + +pub struct Mmapped { + pub ptr: *const [u8], +} + +pub fn mmap( + len: usize, + prot: c::c_int, + flags: c::c_int, + fd: c::c_int, + offset: c::off_t, +) -> Result { + let res = unsafe { c::mmap(ptr::null_mut(), len, prot, flags, fd, offset) }; + if res == c::MAP_FAILED { + Err(OsError::default()) + } else { + Ok(Mmapped { + ptr: unsafe { std::slice::from_raw_parts(res.cast(), len) }, + }) + } +} + +impl Drop for Mmapped { + fn drop(&mut self) { + unsafe { + c::munmap(self.ptr.deref().as_ptr() as _, self.ptr.deref().len()); + } + } +} diff --git a/src/xwayland/xwm.rs b/src/xwayland/xwm.rs index 2790ad44..94a17e38 100644 --- a/src/xwayland/xwm.rs +++ b/src/xwayland/xwm.rs @@ -58,7 +58,10 @@ use { }, ahash::{AHashMap, AHashSet}, bstr::ByteSlice, - futures_util::{select, FutureExt}, + futures_util::{ + future::{self, Either}, + pin_mut, select, FutureExt, + }, smallvec::SmallVec, std::{ borrow::Cow, @@ -250,7 +253,7 @@ pub struct Wm { struct PendingTransfer { mime_type: u32, - fd: AsyncFd, + fd: Rc, } const TEXT_PLAIN_UTF_8: &str = "text/plain;charset=utf-8"; @@ -649,7 +652,6 @@ impl Wm { } Some(r) => r, }; - let name = mt.clone(); let mt = match self.mime_type_to_atom(mt).await { Ok(mt) => mt, Err(e) => { @@ -802,24 +804,6 @@ impl Wm { return; } }; - { - let res = OsError::tri(|| { - let fl = uapi::fcntl_getfl(fd.raw())?; - uapi::fcntl_setfl(fd.raw(), fl | c::O_NONBLOCK)?; - Ok(()) - }); - if let Err(e) = res { - log::error!("Could not set file description flags: {}", ErrorFmt(e)); - return; - } - } - let fd = match self.state.eng.fd(&fd) { - Ok(afd) => afd, - Err(e) => { - log::error!("Could not create async fd: {}", ErrorFmt(e)); - return; - } - }; let cs = ConvertSelection { requestor: sd.win.get(), selection: sd.selection.get(), @@ -1709,8 +1693,8 @@ impl Wm { let transfer = XToWaylandTransfer { id, data: data.clone(), - pos: 0, fd: transfer.fd, + state: self.state.clone(), shared: self.shared.clone(), }; self.shared @@ -2394,24 +2378,38 @@ impl Wm { struct XToWaylandTransfer { id: u64, data: Rc>, - pos: usize, - fd: AsyncFd, + fd: Rc, + state: Rc, shared: Rc, } impl XToWaylandTransfer { - async fn run(mut self) { - while self.pos < self.data.len() { - match uapi::write(self.fd.raw(), &self.data[self.pos..]) { - Ok(n) => self.pos += n, - Err(Errno(c::EAGAIN)) => { - if let Err(e) = self.fd.writable().await { - log::error!("Could not wait for fd to become writable: {}", ErrorFmt(e)); + async fn run(self) { + let timeout = match self.state.eng.timeout(5000) { + Ok(to) => to, + Err(e) => { + log::error!("Could not create a timeout: {}", ErrorFmt(e)); + return; + } + }; + pin_mut!(timeout); + let mut pos = 0; + while pos < self.data.len() { + let f1 = self + .state + .io_uring + .write(&self.fd, &self.data, pos, self.data.len() - pos); + pin_mut!(f1); + match future::select(f1, &mut timeout).await { + Either::Left((res, _)) => match res { + Ok(n) => pos += n, + Err(e) => { + log::error!("Could not write to wayland client: {}", ErrorFmt(e)); break; } - } - Err(e) => { - log::error!("Could not write to wayland client: {}", ErrorFmt(e)); + }, + Either::Right(_) => { + log::error!("Transfer timed out"); break; } }