diff --git a/Cargo.lock b/Cargo.lock index 1fceef16..827cce6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,7 @@ dependencies = [ "jay-edid", "jay-formats", "jay-geometry", + "jay-io-uring", "jay-layout-animation", "jay-time", "jay-toml-config", @@ -776,6 +777,19 @@ dependencies = [ "smallvec", ] +[[package]] +name = "jay-io-uring" +version = "0.1.0" +dependencies = [ + "jay-async-engine", + "jay-time", + "jay-utils", + "log", + "run-on-drop", + "thiserror", + "uapi", +] + [[package]] name = "jay-layout-animation" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 310e76df..ca160a6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "time", "tracy", "async-engine", + "io-uring", "toml-config", "algorithms", "toml-spec", @@ -57,6 +58,7 @@ jay-cmm = { version = "0.1.0", path = "cmm" } jay-time = { version = "0.1.0", path = "time" } jay-tracy = { version = "0.1.0", path = "tracy" } jay-async-engine = { version = "0.1.0", path = "async-engine" } +jay-io-uring = { version = "0.1.0", path = "io-uring" } uapi = "0.2.13" thiserror = "2.0.11" diff --git a/io-uring/Cargo.toml b/io-uring/Cargo.toml new file mode 100644 index 00000000..eb8edc6a --- /dev/null +++ b/io-uring/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "jay-io-uring" +version = "0.1.0" +edition = "2024" +license = "GPL-3.0-only" + +[dependencies] +jay-async-engine = { version = "0.1.0", path = "../async-engine" } +jay-time = { version = "0.1.0", path = "../time" } +jay-utils = { version = "0.1.0", path = "../utils" } + +log = { version = "0.4.20", features = ["std"] } +run-on-drop = "1.0.0" +thiserror = "2.0.11" +uapi = "0.2.13" diff --git a/src/io_uring/debounce.rs b/io-uring/src/debounce.rs similarity index 93% rename from src/io_uring/debounce.rs rename to io-uring/src/debounce.rs index f5b65f40..816a9d38 100644 --- a/src/io_uring/debounce.rs +++ b/io-uring/src/debounce.rs @@ -1,5 +1,6 @@ use { - crate::{io_uring::IoUringData, utils::numcell::NumCell}, + crate::IoUringData, + jay_utils::numcell::NumCell, std::{cell::Cell, future::poll_fn, rc::Rc, task::Poll}, }; diff --git a/io-uring/src/lib.rs b/io-uring/src/lib.rs new file mode 100644 index 00000000..d81313f2 --- /dev/null +++ b/io-uring/src/lib.rs @@ -0,0 +1,587 @@ +pub use ops::{ + TaskResultExt, + poll_external::{PendingPoll, PollCallback}, + timeout_external::{PendingTimeout, TimeoutCallback}, +}; +use { + crate::{ + debounce::Debouncer, + ops::{ + accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, + poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask, + read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask, + sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask, + timeout_link::TimeoutLinkTask, + }, + pending_result::PendingResults, + sys::{ + IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, + IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_DEFER_TASKRUN, + IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SUBMIT_ALL, IOSQE_IO_LINK, io_uring_cqe, + io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, + }, + }, + jay_async_engine::AsyncEngine, + jay_utils::{ + asyncevent::AsyncEvent, + bitflags::BitflagsExt, + buf::Buf, + copyhashmap::CopyHashMap, + errorfmt::ErrorFmt, + mmap::{Mmapped, mmap}, + numcell::NumCell, + oserror::OsError, + ptr_ext::{MutPtrExt, PtrExt}, + stack::Stack, + syncqueue::SyncQueue, + }, + std::{ + cell::{Cell, RefCell, UnsafeCell}, + rc::Rc, + sync::atomic::{ + AtomicU32, + Ordering::{Acquire, Relaxed, Release}, + }, + task::Waker, + }, + thiserror::Error, + uapi::{ + OwnedFd, + c::{self}, + }, +}; + +macro_rules! map_err { + ($n:expr) => {{ + let n = $n; + if n < 0 { + Err(jay_utils::oserror::OsError::from(-n as uapi::c::c_int)) + } else { + Ok(n) + } + }}; +} + +mod debounce; +mod ops; +mod pending_result; +mod sys; + +#[derive(Debug, Error)] +pub enum IoUringError { + #[error(transparent)] + OsError(#[from] OsError), + #[error("Could not create an io-uring")] + CreateUring(#[source] OsError), + #[error("The kernel does not support the IORING_FEAT_NODROP feature")] + NoDrop, + #[error("Could not map the submission queue ring")] + MapSqRing(#[source] OsError), + #[error("Could not map the submission queue entries")] + MapSqEntries(#[source] OsError), + #[error("Could not map the completion queue ring")] + MapCqRing(#[source] OsError), + #[error("The io-uring has already been destroyed")] + Destroyed, + #[error("io_uring_enter failed")] + Enter(#[source] OsError), + #[error("Kernel sent invalid cmsg data")] + InvalidCmsgData, +} + +pub struct IoUring { + ring: Rc, +} + +impl Drop for IoUring { + fn drop(&mut self) { + self.ring.kill(); + } +} + +impl IoUring { + pub fn new(eng: &Rc, entries: u32) -> Result, IoUringError> { + let feature_levels = [ + IORING_SETUP_SUBMIT_ALL, // 5.18 + IORING_SETUP_COOP_TASKRUN, // 5.19 + IORING_SETUP_SINGLE_ISSUER, // 6.0 + IORING_SETUP_DEFER_TASKRUN, // 6.1 + ]; + let mut feature_levels = &feature_levels[..]; + let mut params; + let fd = loop { + params = io_uring_params::default(); + for &flags in feature_levels { + params.flags |= flags; + } + match io_uring_setup(entries, &mut params) { + Ok(f) => break f, + Err(e) => { + if let Some((_, levels)) = feature_levels.split_last() { + feature_levels = levels; + } else { + return Err(IoUringError::CreateUring(e)); + } + } + } + }; + if !params.features.contains(IORING_FEAT_NODROP) { + return Err(IoUringError::NoDrop); + } + let sqmap_map = mmap( + (params.sq_off.array + params.sq_entries * 4) as _, + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_SQ_RING as _, + ); + let sqmap_map = match sqmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapSqRing(e)), + }; + let sqesmap_map = mmap( + params.sq_entries as usize * size_of::(), + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_SQES as _, + ); + let sqesmap_map = match sqesmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapSqEntries(e)), + }; + let cqmap_map = mmap( + params.cq_off.cqes as usize + params.cq_entries as usize * size_of::(), + c::PROT_READ | c::PROT_WRITE, + c::MAP_SHARED | c::MAP_POPULATE, + fd.raw(), + IORING_OFF_CQ_RING as _, + ); + let cqmap_map = match cqmap_map { + Ok(map) => map, + Err(e) => return Err(IoUringError::MapCqRing(e)), + }; + let sqmask = unsafe { + *(sqmap_map.ptr as *const u8) + .add(params.sq_off.ring_mask as _) + .cast() + }; + let sqhead = unsafe { + (sqmap_map.ptr as *const u8) + .add(params.sq_off.head as _) + .cast() + }; + let sqtail = unsafe { + (sqmap_map.ptr as *const u8) + .add(params.sq_off.tail as _) + .cast() + }; + let sqmap = unsafe { + let base = (sqmap_map.ptr as *const u8) + .add(params.sq_off.array as _) + .cast(); + std::slice::from_raw_parts(base, params.sq_entries as _) + }; + let sqesmap = unsafe { + let base = (sqesmap_map.ptr as *const u8).cast(); + std::slice::from_raw_parts(base, params.sq_entries as _) + }; + let cqmask = unsafe { + *(cqmap_map.ptr as *const u8) + .add(params.cq_off.ring_mask as _) + .cast() + }; + let cqhead = unsafe { + (cqmap_map.ptr as *const u8) + .add(params.cq_off.head as _) + .cast() + }; + let cqtail = unsafe { + (cqmap_map.ptr as *const u8) + .add(params.cq_off.tail as _) + .cast() + }; + let cqmap = unsafe { + let base = (cqmap_map.ptr as *const u8) + .add(params.cq_off.cqes as _) + .cast(); + std::slice::from_raw_parts(base, params.cq_entries as _) + }; + let data = Rc::new(IoUringData { + destroyed: Cell::new(false), + fd, + eng: eng.clone(), + _sqesmap_map: sqesmap_map, + _sqmap_map: sqmap_map, + sqmask, + sqlen: params.sq_entries, + sqhead, + sqtail, + sqmap, + sqesmap, + _cqmap_map: cqmap_map, + cqmask, + cqhead, + cqtail, + cqmap, + cqes_consumed: Default::default(), + next: Default::default(), + to_encode: Default::default(), + pending_in_kernel: Default::default(), + tasks: Default::default(), + pending_results: Default::default(), + cached_read_writes: Default::default(), + cached_read_writes_no_cancel: Default::default(), + cached_cancels: Default::default(), + cached_polls: Default::default(), + cached_polls_external: Default::default(), + cached_sendmsg: Default::default(), + cached_recvmsg: Default::default(), + cached_timeouts: Default::default(), + cached_timeouts_external: Default::default(), + cached_timeout_links: Default::default(), + cached_cmsg_bufs: Default::default(), + cached_connects: Default::default(), + cached_accepts: Default::default(), + fd_ids_scratch: Default::default(), + iteration: Default::default(), + yields: Default::default(), + }); + Ok(Rc::new(Self { ring: data })) + } + + pub fn stop(&self) { + self.ring.kill(); + } + + pub fn run(&self) -> Result<(), IoUringError> { + let res = self.ring.run(); + self.ring.kill(); + res + } + + pub fn cancel(&self, id: IoUringTaskId) { + self.ring.cancel_task(id); + } + + pub fn debouncer(&self, max: u64) -> Debouncer { + Debouncer { + cur: Default::default(), + max, + iteration: Cell::new(self.ring.iteration.get()), + ring: self.ring.clone(), + } + } +} + +struct IoUringData { + destroyed: Cell, + + fd: OwnedFd, + eng: Rc, + + _sqesmap_map: Mmapped, + _sqmap_map: Mmapped, + sqmask: u32, + sqlen: u32, + sqhead: *const AtomicU32, + sqtail: *const AtomicU32, + sqmap: *const [Cell], + sqesmap: *const [UnsafeCell], + + _cqmap_map: Mmapped, + cqmask: u32, + cqhead: *const AtomicU32, + cqtail: *const AtomicU32, + cqmap: *const [Cell], + + cqes_consumed: AsyncEvent, + + next: IoUringTaskIds, + to_encode: SyncQueue, + pending_in_kernel: CopyHashMap, + tasks: CopyHashMap>, + + pending_results: PendingResults, + + cached_read_writes: Stack>, + cached_read_writes_no_cancel: Stack>, + cached_cancels: Stack>, + cached_polls: Stack>, + cached_polls_external: Stack>, + cached_sendmsg: Stack>, + cached_recvmsg: Stack>, + cached_timeouts: Stack>, + cached_timeouts_external: Stack>, + cached_timeout_links: Stack>, + cached_cmsg_bufs: Stack, + cached_connects: Stack>, + cached_accepts: Stack>, + + fd_ids_scratch: RefCell>, + + iteration: NumCell, + yields: SyncQueue, +} + +unsafe trait Task { + fn id(&self) -> IoUringTaskId; + fn complete(self: Box, ring: &IoUringData, res: i32); + fn encode(&self, sqe: &mut io_uring_sqe); + + fn is_cancel(&self) -> bool { + false + } + + fn has_timeout(&self) -> bool { + false + } +} + +impl IoUringData { + fn run(&self) -> Result<(), IoUringError> { + let mut to_submit = 0; + loop { + self.iteration.fetch_add(1); + while let Some(ev) = self.yields.pop() { + ev.wake(); + } + loop { + self.eng.dispatch(); + if self.destroyed.get() { + return Ok(()); + } + if !self.dispatch_completions() { + break; + } + } + to_submit += self.encode(); + let res = { + let (to_submit, mut min_complete, flags) = if to_submit == 0 { + (0, 1, IORING_ENTER_GETEVENTS) + } else if self.to_encode.is_empty() { + (to_submit as _, 1, IORING_ENTER_GETEVENTS) + } else { + (!0, 0, 0) + }; + if self.yields.is_not_empty() { + min_complete = 0; + } + io_uring_enter(self.fd.raw(), to_submit, min_complete, flags) + }; + let mut submitted_any = false; + match res { + Ok(n) => { + if n > 0 { + submitted_any = true; + } + to_submit -= n; + } + Err(e) => { + if !matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) { + return Err(IoUringError::Enter(e)); + } + } + } + if to_submit > 0 && !submitted_any { + let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); + if let Err(e) = res { + if e.0 != c::EINTR { + return Err(IoUringError::Enter(e)); + } + } + } + } + } + + fn dispatch_completions(&self) -> bool { + unsafe { + let mut head = self.cqhead.deref().load(Relaxed); + let tail = self.cqtail.deref().load(Acquire); + if head == tail { + return false; + } + while head != tail { + let idx = (head & self.cqmask) as usize; + let entry = self.cqmap.deref()[idx].get(); + head = head.wrapping_add(1); + self.cqhead.deref().store(head, Release); + let id = IoUringTaskId(entry.user_data); + if let Some(pending) = self.tasks.remove(&id) { + self.pending_in_kernel.remove(&id); + pending.complete(self, entry.res); + } + } + self.cqhead.deref().store(head, Release); + self.cqes_consumed.trigger(); + true + } + } + + fn encode(&self) -> usize { + let tasks = self.tasks.lock(); + let mut encoded = 0; + unsafe { + let mut tail = self.sqtail.deref().load(Relaxed); + let head = self.sqhead.deref().load(Acquire); + let available = self.sqlen - tail.wrapping_sub(head); + while encoded < available { + let id = match self.to_encode.pop() { + Some(t) => t, + _ => break, + }; + let task = match tasks.get(&id) { + Some(t) => t, + _ => continue, + }; + let has_timeout = task.has_timeout(); + if has_timeout && (available - encoded) < 2 { + self.to_encode.push_front(id); + break; + } + self.pending_in_kernel.set(id, ()); + let idx = (tail & self.sqmask) as usize; + let sqe = self.sqesmap.deref()[idx].get().deref_mut(); + self.sqmap.deref()[idx].set(idx as _); + *sqe = Default::default(); + sqe.user_data = id.raw(); + task.encode(sqe); + if has_timeout { + sqe.flags |= IOSQE_IO_LINK; + } + tail = tail.wrapping_add(1); + encoded += 1; + } + self.sqtail.deref().store(tail, Release); + } + encoded as usize + } + + fn id(&self) -> Cancellable<'_> { + Cancellable { + id: self.id_raw(), + data: self, + } + } + + fn id_raw(&self) -> IoUringTaskId { + self.next.next() + } + + fn cancel_task(&self, id: IoUringTaskId) { + if !self.tasks.contains(&id) { + return; + } + if !self.pending_in_kernel.contains(&id) { + self.tasks + .remove(&id) + .unwrap() + .complete(self, -c::ECANCELED); + return; + } + self.cancel_task_in_kernel(id); + } + + fn schedule(&self, t: Box) { + assert!(!self.destroyed.get()); + self.to_encode.push(t.id()); + self.tasks.set(t.id(), t); + } + + fn check_destroyed(&self) -> Result<(), IoUringError> { + if self.destroyed.get() { + Err(IoUringError::Destroyed) + } else { + Ok(()) + } + } + + fn kill(&self) { + self.eng.stop(); + let mut to_cancel = vec![]; + for task in self.tasks.lock().values() { + if !task.is_cancel() { + to_cancel.push(task.id()); + } + } + for task in to_cancel { + self.cancel_task(task); + } + self.destroyed.set(true); + while !self.tasks.is_empty() { + self.encode(); + let _ = io_uring_enter(self.fd.raw(), u32::MAX, 0, 0); + let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); + if let Err(e) = res { + panic!("Could not wait for io_uring to drain: {}", ErrorFmt(e)); + } + while self.dispatch_completions() { + // nothing + } + } + } + + fn cmsg_buf(&self) -> Buf { + self.cached_cmsg_bufs + .pop() + .unwrap_or_else(|| Buf::new(1024)) + } +} + +#[derive(Debug)] +struct IoUringTaskIds { + next: NumCell, +} + +impl Default for IoUringTaskIds { + fn default() -> Self { + Self { + next: NumCell::new(1), + } + } +} + +impl IoUringTaskIds { + fn next(&self) -> IoUringTaskId { + IoUringTaskId(self.next.fetch_add(1)) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub struct IoUringTaskId(u64); + +impl IoUringTaskId { + #[allow(clippy::allow_attributes, dead_code)] + pub fn raw(&self) -> u64 { + self.0 + } + + #[allow(clippy::allow_attributes, dead_code)] + pub fn from_raw(id: u64) -> Self { + Self(id) + } +} + +impl std::fmt::Display for IoUringTaskId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +#[expect(clippy::derivable_impls)] +impl Default for IoUringTaskId { + fn default() -> Self { + Self(0) + } +} + +struct Cancellable<'a> { + id: IoUringTaskId, + data: &'a IoUringData, +} + +impl<'a> Drop for Cancellable<'a> { + fn drop(&mut self) { + self.data.cancel_task(self.id); + } +} diff --git a/src/io_uring/ops.rs b/io-uring/src/ops.rs similarity index 91% rename from src/io_uring/ops.rs rename to io-uring/src/ops.rs index e6abdbee..f77768d0 100644 --- a/src/io_uring/ops.rs +++ b/io-uring/src/ops.rs @@ -1,4 +1,4 @@ -use crate::{io_uring::IoUringError, utils::oserror::OsError}; +use {crate::IoUringError, jay_utils::oserror::OsError}; pub mod accept; pub mod async_cancel; diff --git a/src/io_uring/ops/accept.rs b/io-uring/src/ops/accept.rs similarity index 98% rename from src/io_uring/ops/accept.rs rename to io-uring/src/ops/accept.rs index 0acee0d5..5fd47152 100644 --- a/src/io_uring/ops/accept.rs +++ b/io-uring/src/ops/accept.rs @@ -1,5 +1,5 @@ use { - crate::io_uring::{ + crate::{ IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, pending_result::PendingResult, sys::{IORING_OP_ACCEPT, io_uring_sqe}, diff --git a/src/io_uring/ops/async_cancel.rs b/io-uring/src/ops/async_cancel.rs similarity index 85% rename from src/io_uring/ops/async_cancel.rs rename to io-uring/src/ops/async_cancel.rs index f21e7024..4a74b43d 100644 --- a/src/io_uring/ops/async_cancel.rs +++ b/io-uring/src/ops/async_cancel.rs @@ -1,11 +1,9 @@ use { crate::{ - io_uring::{ - IoUringData, IoUringTaskId, Task, - sys::{IORING_OP_ASYNC_CANCEL, io_uring_sqe}, - }, - utils::errorfmt::ErrorFmt, + IoUringData, IoUringTaskId, Task, + sys::{IORING_OP_ASYNC_CANCEL, io_uring_sqe}, }, + jay_utils::errorfmt::ErrorFmt, uapi::c, }; diff --git a/src/io_uring/ops/connect.rs b/io-uring/src/ops/connect.rs similarity index 98% rename from src/io_uring/ops/connect.rs rename to io-uring/src/ops/connect.rs index cfb89039..4c0512f1 100644 --- a/src/io_uring/ops/connect.rs +++ b/io-uring/src/ops/connect.rs @@ -1,5 +1,5 @@ use { - crate::io_uring::{ + crate::{ IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, pending_result::PendingResult, sys::{IORING_OP_CONNECT, io_uring_sqe}, diff --git a/src/io_uring/ops/poll.rs b/io-uring/src/ops/poll.rs similarity index 97% rename from src/io_uring/ops/poll.rs rename to io-uring/src/ops/poll.rs index 01e77679..dd7a88e4 100644 --- a/src/io_uring/ops/poll.rs +++ b/io-uring/src/ops/poll.rs @@ -1,5 +1,5 @@ use { - crate::io_uring::{ + crate::{ IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, ops::TaskResult, pending_result::PendingResult, @@ -32,7 +32,6 @@ impl IoUring { self.poll(fd, c::POLLIN).await.merge() } - #[expect(dead_code)] pub async fn writable(&self, fd: &Rc) -> Result { self.poll(fd, c::POLLOUT).await.merge() } diff --git a/src/io_uring/ops/poll_external.rs b/io-uring/src/ops/poll_external.rs similarity index 92% rename from src/io_uring/ops/poll_external.rs rename to io-uring/src/ops/poll_external.rs index 6a86a125..be25e053 100644 --- a/src/io_uring/ops/poll_external.rs +++ b/io-uring/src/ops/poll_external.rs @@ -1,11 +1,9 @@ use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, - sys::{IORING_OP_POLL_ADD, io_uring_sqe}, - }, - utils::oserror::OsError, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, + sys::{IORING_OP_POLL_ADD, io_uring_sqe}, }, + jay_utils::oserror::OsError, std::{cell::Cell, rc::Rc}, uapi::{OwnedFd, c}, }; @@ -61,7 +59,6 @@ impl IoUring { self.poll_external(fd, c::POLLIN, callback) } - #[expect(dead_code)] pub fn writable_external( &self, fd: &Rc, diff --git a/src/io_uring/ops/read_write.rs b/io-uring/src/ops/read_write.rs similarity index 89% rename from src/io_uring/ops/read_write.rs rename to io-uring/src/ops/read_write.rs index ddece5a3..e7af42fc 100644 --- a/src/io_uring/ops/read_write.rs +++ b/io-uring/src/ops/read_write.rs @@ -1,13 +1,11 @@ use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, - pending_result::PendingResult, - sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe}, - }, - time::Time, - utils::buf::Buf, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, + pending_result::PendingResult, + sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe}, }, + jay_time::Time, + jay_utils::buf::Buf, std::rc::Rc, uapi::{OwnedFd, c}, }; diff --git a/src/io_uring/ops/read_write_no_cancel.rs b/io-uring/src/ops/read_write_no_cancel.rs similarity index 92% rename from src/io_uring/ops/read_write_no_cancel.rs rename to io-uring/src/ops/read_write_no_cancel.rs index a152d752..609aa464 100644 --- a/src/io_uring/ops/read_write_no_cancel.rs +++ b/io-uring/src/ops/read_write_no_cancel.rs @@ -3,13 +3,11 @@ mod tests; use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, - pending_result::PendingResult, - sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe}, - }, - time::Time, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, + pending_result::PendingResult, + sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe}, }, + jay_time::Time, run_on_drop::on_drop, uapi::{Fd, c}, }; diff --git a/src/io_uring/ops/read_write_no_cancel/tests.rs b/io-uring/src/ops/read_write_no_cancel/tests.rs similarity index 78% rename from src/io_uring/ops/read_write_no_cancel/tests.rs rename to io-uring/src/ops/read_write_no_cancel/tests.rs index ef50bba0..20094f96 100644 --- a/src/io_uring/ops/read_write_no_cancel/tests.rs +++ b/io-uring/src/ops/read_write_no_cancel/tests.rs @@ -1,10 +1,7 @@ use { - crate::{ - async_engine::AsyncEngine, - io_uring::{IoUring, IoUringError}, - utils::{oserror::OsError, queue::AsyncQueue}, - wheel::Wheel, - }, + crate::{IoUring, IoUringError}, + jay_async_engine::AsyncEngine, + jay_utils::{oserror::OsError, queue::AsyncQueue}, std::rc::Rc, uapi::c::ECANCELED, }; @@ -14,7 +11,6 @@ fn cancel(timeout: bool) { let ring = IoUring::new(&eng, 32).unwrap(); let ring2 = ring.clone(); let ring3 = ring.clone(); - let wheel = Wheel::new(&eng, &ring).unwrap(); let queue = Rc::new(AsyncQueue::new()); let queue2 = queue.clone(); let _fut1 = eng.spawn("", async move { @@ -32,7 +28,7 @@ fn cancel(timeout: bool) { let _fut2 = eng.spawn("", async move { let id = queue2.pop().await; if timeout { - wheel.timeout(1).await.unwrap(); + ring2.timeout(1).await.unwrap(); } ring2.cancel(id); }); diff --git a/src/io_uring/ops/recvmsg.rs b/io-uring/src/ops/recvmsg.rs similarity index 94% rename from src/io_uring/ops/recvmsg.rs rename to io-uring/src/ops/recvmsg.rs index 1c7e4f81..0aa2f883 100644 --- a/src/io_uring/ops/recvmsg.rs +++ b/io-uring/src/ops/recvmsg.rs @@ -1,12 +1,10 @@ use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, - pending_result::PendingResult, - sys::{IORING_OP_RECVMSG, io_uring_sqe}, - }, - utils::buf::Buf, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, + pending_result::PendingResult, + sys::{IORING_OP_RECVMSG, io_uring_sqe}, }, + jay_utils::buf::Buf, std::{cell::Cell, collections::VecDeque, mem::MaybeUninit, rc::Rc}, uapi::{OwnedFd, c}, }; diff --git a/src/io_uring/ops/sendmsg.rs b/io-uring/src/ops/sendmsg.rs similarity index 93% rename from src/io_uring/ops/sendmsg.rs rename to io-uring/src/ops/sendmsg.rs index c6231b39..8a245740 100644 --- a/src/io_uring/ops/sendmsg.rs +++ b/io-uring/src/ops/sendmsg.rs @@ -1,13 +1,11 @@ use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, - pending_result::PendingResult, - sys::{IORING_OP_SENDMSG, io_uring_sqe}, - }, - time::Time, - utils::{buf::Buf, compat::IovLength, vec_ext::UninitVecExt}, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, + pending_result::PendingResult, + sys::{IORING_OP_SENDMSG, io_uring_sqe}, }, + jay_time::Time, + jay_utils::{buf::Buf, compat::IovLength, vec_ext::UninitVecExt}, std::{mem::MaybeUninit, ptr, rc::Rc}, uapi::{OwnedFd, c}, }; diff --git a/src/io_uring/ops/timeout.rs b/io-uring/src/ops/timeout.rs similarity index 98% rename from src/io_uring/ops/timeout.rs rename to io-uring/src/ops/timeout.rs index d7e0c2fb..74e7be1c 100644 --- a/src/io_uring/ops/timeout.rs +++ b/io-uring/src/ops/timeout.rs @@ -1,5 +1,5 @@ use { - crate::io_uring::{ + crate::{ IoUring, IoUringData, IoUringError, IoUringTaskId, Task, pending_result::PendingResult, sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe}, diff --git a/src/io_uring/ops/timeout_external.rs b/io-uring/src/ops/timeout_external.rs similarity index 90% rename from src/io_uring/ops/timeout_external.rs rename to io-uring/src/ops/timeout_external.rs index 860811de..1034d703 100644 --- a/src/io_uring/ops/timeout_external.rs +++ b/io-uring/src/ops/timeout_external.rs @@ -1,12 +1,9 @@ use { crate::{ - io_uring::{ - IoUring, IoUringData, IoUringError, IoUringTaskId, Task, - ops::timeout::timespec64, - sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe}, - }, - utils::oserror::OsError, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, ops::timeout::timespec64, + sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe}, }, + jay_utils::oserror::OsError, std::{cell::Cell, rc::Rc}, uapi::c, }; diff --git a/src/io_uring/ops/timeout_link.rs b/io-uring/src/ops/timeout_link.rs similarity index 83% rename from src/io_uring/ops/timeout_link.rs rename to io-uring/src/ops/timeout_link.rs index edb9faa4..f349c879 100644 --- a/src/io_uring/ops/timeout_link.rs +++ b/io-uring/src/ops/timeout_link.rs @@ -1,11 +1,8 @@ use crate::{ - io_uring::{ - IoUring, IoUringData, IoUringTaskId, Task, - ops::timeout::timespec64, - sys::{IORING_OP_LINK_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe}, - }, - time::Time, + IoUring, IoUringData, IoUringTaskId, Task, ops::timeout::timespec64, + sys::{IORING_OP_LINK_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe}, }; +use jay_time::Time; #[derive(Default)] pub struct TimeoutLinkTask { diff --git a/src/io_uring/pending_result.rs b/io-uring/src/pending_result.rs similarity index 96% rename from src/io_uring/pending_result.rs rename to io-uring/src/pending_result.rs index 544c182e..4d7c6a87 100644 --- a/src/io_uring/pending_result.rs +++ b/io-uring/src/pending_result.rs @@ -1,5 +1,5 @@ use { - crate::utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack}, + jay_utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack}, std::{ cell::Cell, future::Future, diff --git a/src/io_uring/sys.rs b/io-uring/src/sys.rs similarity index 99% rename from src/io_uring/sys.rs rename to io-uring/src/sys.rs index d9ba4197..9a9b9665 100644 --- a/src/io_uring/sys.rs +++ b/io-uring/src/sys.rs @@ -1,7 +1,7 @@ #![allow(non_camel_case_types, dead_code)] use { - crate::utils::oserror::OsError, + jay_utils::oserror::OsError, std::mem::MaybeUninit, uapi::{OwnedFd, c}, }; diff --git a/src/io_uring.rs b/src/io_uring.rs index 49f5a2ea..c18791a6 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -1,551 +1 @@ -pub use ops::{ - TaskResultExt, - poll_external::{PendingPoll, PollCallback}, - timeout_external::{PendingTimeout, TimeoutCallback}, -}; -use { - crate::{ - async_engine::AsyncEngine, - io_uring::{ - debounce::Debouncer, - ops::{ - accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, - poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask, - read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask, - sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask, - timeout_link::TimeoutLinkTask, - }, - pending_result::PendingResults, - sys::{ - IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, - IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_DEFER_TASKRUN, - IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SUBMIT_ALL, IOSQE_IO_LINK, io_uring_cqe, - io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, - }, - }, - utils::{ - asyncevent::AsyncEvent, - bitflags::BitflagsExt, - buf::Buf, - copyhashmap::CopyHashMap, - errorfmt::ErrorFmt, - mmap::{Mmapped, mmap}, - numcell::NumCell, - oserror::OsError, - ptr_ext::{MutPtrExt, PtrExt}, - stack::Stack, - syncqueue::SyncQueue, - }, - }, - std::{ - cell::{Cell, RefCell, UnsafeCell}, - rc::Rc, - sync::atomic::{ - AtomicU32, - Ordering::{Acquire, Relaxed, Release}, - }, - task::Waker, - }, - thiserror::Error, - uapi::{ - OwnedFd, - c::{self}, - }, -}; - -macro_rules! map_err { - ($n:expr) => {{ - let n = $n; - if n < 0 { - Err(crate::utils::oserror::OsError::from(-n as uapi::c::c_int)) - } else { - Ok(n) - } - }}; -} - -mod debounce; -mod ops; -mod pending_result; -mod sys; - -#[derive(Debug, Error)] -pub enum IoUringError { - #[error(transparent)] - OsError(#[from] OsError), - #[error("Could not create an io-uring")] - CreateUring(#[source] OsError), - #[error("The kernel does not support the IORING_FEAT_NODROP feature")] - NoDrop, - #[error("Could not map the submission queue ring")] - MapSqRing(#[source] OsError), - #[error("Could not map the submission queue entries")] - MapSqEntries(#[source] OsError), - #[error("Could not map the completion queue ring")] - MapCqRing(#[source] OsError), - #[error("The io-uring has already been destroyed")] - Destroyed, - #[error("io_uring_enter failed")] - Enter(#[source] OsError), - #[error("Kernel sent invalid cmsg data")] - InvalidCmsgData, -} - -pub struct IoUring { - ring: Rc, -} - -impl Drop for IoUring { - fn drop(&mut self) { - self.ring.kill(); - } -} - -impl IoUring { - pub fn new(eng: &Rc, entries: u32) -> Result, IoUringError> { - let feature_levels = [ - IORING_SETUP_SUBMIT_ALL, // 5.18 - IORING_SETUP_COOP_TASKRUN, // 5.19 - IORING_SETUP_SINGLE_ISSUER, // 6.0 - IORING_SETUP_DEFER_TASKRUN, // 6.1 - ]; - let mut feature_levels = &feature_levels[..]; - let mut params; - let fd = loop { - params = io_uring_params::default(); - for &flags in feature_levels { - params.flags |= flags; - } - match io_uring_setup(entries, &mut params) { - Ok(f) => break f, - Err(e) => { - if let Some((_, levels)) = feature_levels.split_last() { - feature_levels = levels; - } else { - return Err(IoUringError::CreateUring(e)); - } - } - } - }; - if !params.features.contains(IORING_FEAT_NODROP) { - return Err(IoUringError::NoDrop); - } - let sqmap_map = mmap( - (params.sq_off.array + params.sq_entries * 4) as _, - c::PROT_READ | c::PROT_WRITE, - c::MAP_SHARED | c::MAP_POPULATE, - fd.raw(), - IORING_OFF_SQ_RING as _, - ); - let sqmap_map = match sqmap_map { - Ok(map) => map, - Err(e) => return Err(IoUringError::MapSqRing(e)), - }; - let sqesmap_map = mmap( - params.sq_entries as usize * size_of::(), - c::PROT_READ | c::PROT_WRITE, - c::MAP_SHARED | c::MAP_POPULATE, - fd.raw(), - IORING_OFF_SQES as _, - ); - let sqesmap_map = match sqesmap_map { - Ok(map) => map, - Err(e) => return Err(IoUringError::MapSqEntries(e)), - }; - let cqmap_map = mmap( - params.cq_off.cqes as usize + params.cq_entries as usize * size_of::(), - c::PROT_READ | c::PROT_WRITE, - c::MAP_SHARED | c::MAP_POPULATE, - fd.raw(), - IORING_OFF_CQ_RING as _, - ); - let cqmap_map = match cqmap_map { - Ok(map) => map, - Err(e) => return Err(IoUringError::MapCqRing(e)), - }; - let sqmask = unsafe { - *(sqmap_map.ptr as *const u8) - .add(params.sq_off.ring_mask as _) - .cast() - }; - let sqhead = unsafe { - (sqmap_map.ptr as *const u8) - .add(params.sq_off.head as _) - .cast() - }; - let sqtail = unsafe { - (sqmap_map.ptr as *const u8) - .add(params.sq_off.tail as _) - .cast() - }; - let sqmap = unsafe { - let base = (sqmap_map.ptr as *const u8) - .add(params.sq_off.array as _) - .cast(); - std::slice::from_raw_parts(base, params.sq_entries as _) - }; - let sqesmap = unsafe { - let base = (sqesmap_map.ptr as *const u8).cast(); - std::slice::from_raw_parts(base, params.sq_entries as _) - }; - let cqmask = unsafe { - *(cqmap_map.ptr as *const u8) - .add(params.cq_off.ring_mask as _) - .cast() - }; - let cqhead = unsafe { - (cqmap_map.ptr as *const u8) - .add(params.cq_off.head as _) - .cast() - }; - let cqtail = unsafe { - (cqmap_map.ptr as *const u8) - .add(params.cq_off.tail as _) - .cast() - }; - let cqmap = unsafe { - let base = (cqmap_map.ptr as *const u8) - .add(params.cq_off.cqes as _) - .cast(); - std::slice::from_raw_parts(base, params.cq_entries as _) - }; - let data = Rc::new(IoUringData { - destroyed: Cell::new(false), - fd, - eng: eng.clone(), - _sqesmap_map: sqesmap_map, - _sqmap_map: sqmap_map, - sqmask, - sqlen: params.sq_entries, - sqhead, - sqtail, - sqmap, - sqesmap, - _cqmap_map: cqmap_map, - cqmask, - cqhead, - cqtail, - cqmap, - cqes_consumed: Default::default(), - next: Default::default(), - to_encode: Default::default(), - pending_in_kernel: Default::default(), - tasks: Default::default(), - pending_results: Default::default(), - cached_read_writes: Default::default(), - cached_read_writes_no_cancel: Default::default(), - cached_cancels: Default::default(), - cached_polls: Default::default(), - cached_polls_external: Default::default(), - cached_sendmsg: Default::default(), - cached_recvmsg: Default::default(), - cached_timeouts: Default::default(), - cached_timeouts_external: Default::default(), - cached_timeout_links: Default::default(), - cached_cmsg_bufs: Default::default(), - cached_connects: Default::default(), - cached_accepts: Default::default(), - fd_ids_scratch: Default::default(), - iteration: Default::default(), - yields: Default::default(), - }); - Ok(Rc::new(Self { ring: data })) - } - - pub fn stop(&self) { - self.ring.kill(); - } - - pub fn run(&self) -> Result<(), IoUringError> { - let res = self.ring.run(); - self.ring.kill(); - res - } - - pub fn cancel(&self, id: IoUringTaskId) { - self.ring.cancel_task(id); - } - - pub fn debouncer(&self, max: u64) -> Debouncer { - Debouncer { - cur: Default::default(), - max, - iteration: Cell::new(self.ring.iteration.get()), - ring: self.ring.clone(), - } - } -} - -struct IoUringData { - destroyed: Cell, - - fd: OwnedFd, - eng: Rc, - - _sqesmap_map: Mmapped, - _sqmap_map: Mmapped, - sqmask: u32, - sqlen: u32, - sqhead: *const AtomicU32, - sqtail: *const AtomicU32, - sqmap: *const [Cell], - sqesmap: *const [UnsafeCell], - - _cqmap_map: Mmapped, - cqmask: u32, - cqhead: *const AtomicU32, - cqtail: *const AtomicU32, - cqmap: *const [Cell], - - cqes_consumed: AsyncEvent, - - next: IoUringTaskIds, - to_encode: SyncQueue, - pending_in_kernel: CopyHashMap, - tasks: CopyHashMap>, - - pending_results: PendingResults, - - cached_read_writes: Stack>, - cached_read_writes_no_cancel: Stack>, - cached_cancels: Stack>, - cached_polls: Stack>, - cached_polls_external: Stack>, - cached_sendmsg: Stack>, - cached_recvmsg: Stack>, - cached_timeouts: Stack>, - cached_timeouts_external: Stack>, - cached_timeout_links: Stack>, - cached_cmsg_bufs: Stack, - cached_connects: Stack>, - cached_accepts: Stack>, - - fd_ids_scratch: RefCell>, - - iteration: NumCell, - yields: SyncQueue, -} - -unsafe trait Task { - fn id(&self) -> IoUringTaskId; - fn complete(self: Box, ring: &IoUringData, res: i32); - fn encode(&self, sqe: &mut io_uring_sqe); - - fn is_cancel(&self) -> bool { - false - } - - fn has_timeout(&self) -> bool { - false - } -} - -impl IoUringData { - fn run(&self) -> Result<(), IoUringError> { - let mut to_submit = 0; - loop { - self.iteration.fetch_add(1); - while let Some(ev) = self.yields.pop() { - ev.wake(); - } - loop { - self.eng.dispatch(); - if self.destroyed.get() { - return Ok(()); - } - if !self.dispatch_completions() { - break; - } - } - to_submit += self.encode(); - let res = { - let (to_submit, mut min_complete, flags) = if to_submit == 0 { - (0, 1, IORING_ENTER_GETEVENTS) - } else if self.to_encode.is_empty() { - (to_submit as _, 1, IORING_ENTER_GETEVENTS) - } else { - (!0, 0, 0) - }; - if self.yields.is_not_empty() { - min_complete = 0; - } - io_uring_enter(self.fd.raw(), to_submit, min_complete, flags) - }; - let mut submitted_any = false; - match res { - Ok(n) => { - if n > 0 { - submitted_any = true; - } - to_submit -= n; - } - Err(e) => { - if not_matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) { - return Err(IoUringError::Enter(e)); - } - } - } - if to_submit > 0 && !submitted_any { - let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); - if let Err(e) = res { - if e.0 != c::EINTR { - return Err(IoUringError::Enter(e)); - } - } - } - } - } - - fn dispatch_completions(&self) -> bool { - unsafe { - let mut head = self.cqhead.deref().load(Relaxed); - let tail = self.cqtail.deref().load(Acquire); - if head == tail { - return false; - } - while head != tail { - let idx = (head & self.cqmask) as usize; - let entry = self.cqmap.deref()[idx].get(); - head = head.wrapping_add(1); - self.cqhead.deref().store(head, Release); - let id = IoUringTaskId(entry.user_data); - if let Some(pending) = self.tasks.remove(&id) { - self.pending_in_kernel.remove(&id); - pending.complete(self, entry.res); - } - } - self.cqhead.deref().store(head, Release); - self.cqes_consumed.trigger(); - true - } - } - - fn encode(&self) -> usize { - let tasks = self.tasks.lock(); - let mut encoded = 0; - unsafe { - let mut tail = self.sqtail.deref().load(Relaxed); - let head = self.sqhead.deref().load(Acquire); - let available = self.sqlen - tail.wrapping_sub(head); - while encoded < available { - let id = match self.to_encode.pop() { - Some(t) => t, - _ => break, - }; - let task = match tasks.get(&id) { - Some(t) => t, - _ => continue, - }; - let has_timeout = task.has_timeout(); - if has_timeout && (available - encoded) < 2 { - self.to_encode.push_front(id); - break; - } - self.pending_in_kernel.set(id, ()); - let idx = (tail & self.sqmask) as usize; - let sqe = self.sqesmap.deref()[idx].get().deref_mut(); - self.sqmap.deref()[idx].set(idx as _); - *sqe = Default::default(); - sqe.user_data = id.raw(); - task.encode(sqe); - if has_timeout { - sqe.flags |= IOSQE_IO_LINK; - } - tail = tail.wrapping_add(1); - encoded += 1; - } - self.sqtail.deref().store(tail, Release); - } - encoded as usize - } - - fn id(&self) -> Cancellable<'_> { - Cancellable { - id: self.id_raw(), - data: self, - } - } - - fn id_raw(&self) -> IoUringTaskId { - self.next.next() - } - - fn cancel_task(&self, id: IoUringTaskId) { - if !self.tasks.contains(&id) { - return; - } - if !self.pending_in_kernel.contains(&id) { - self.tasks - .remove(&id) - .unwrap() - .complete(self, -c::ECANCELED); - return; - } - self.cancel_task_in_kernel(id); - } - - fn schedule(&self, t: Box) { - assert!(!self.destroyed.get()); - self.to_encode.push(t.id()); - self.tasks.set(t.id(), t); - } - - fn check_destroyed(&self) -> Result<(), IoUringError> { - if self.destroyed.get() { - Err(IoUringError::Destroyed) - } else { - Ok(()) - } - } - - fn kill(&self) { - self.eng.stop(); - let mut to_cancel = vec![]; - for task in self.tasks.lock().values() { - if !task.is_cancel() { - to_cancel.push(task.id()); - } - } - for task in to_cancel { - self.cancel_task(task); - } - self.destroyed.set(true); - while !self.tasks.is_empty() { - self.encode(); - let _ = io_uring_enter(self.fd.raw(), u32::MAX, 0, 0); - let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); - if let Err(e) = res { - panic!("Could not wait for io_uring to drain: {}", ErrorFmt(e)); - } - while self.dispatch_completions() { - // nothing - } - } - } - - fn cmsg_buf(&self) -> Buf { - self.cached_cmsg_bufs - .pop() - .unwrap_or_else(|| Buf::new(1024)) - } -} - -linear_ids!(IoUringTaskIds, IoUringTaskId, u64); - -#[expect(clippy::derivable_impls)] -impl Default for IoUringTaskId { - fn default() -> Self { - Self(0) - } -} - -struct Cancellable<'a> { - id: IoUringTaskId, - data: &'a IoUringData, -} - -impl<'a> Drop for Cancellable<'a> { - fn drop(&mut self) { - self.data.cancel_task(self.id); - } -} +pub use jay_io_uring::*;