io_uring: move runtime into workspace crate
This commit is contained in:
parent
03d3876888
commit
c3b17db151
22 changed files with 662 additions and 617 deletions
14
Cargo.lock
generated
14
Cargo.lock
generated
|
|
@ -688,6 +688,7 @@ dependencies = [
|
|||
"jay-edid",
|
||||
"jay-formats",
|
||||
"jay-geometry",
|
||||
"jay-io-uring",
|
||||
"jay-layout-animation",
|
||||
"jay-time",
|
||||
"jay-toml-config",
|
||||
|
|
@ -776,6 +777,19 @@ dependencies = [
|
|||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jay-io-uring"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"jay-async-engine",
|
||||
"jay-time",
|
||||
"jay-utils",
|
||||
"log",
|
||||
"run-on-drop",
|
||||
"thiserror",
|
||||
"uapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jay-layout-animation"
|
||||
version = "0.1.0"
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ members = [
|
|||
"time",
|
||||
"tracy",
|
||||
"async-engine",
|
||||
"io-uring",
|
||||
"toml-config",
|
||||
"algorithms",
|
||||
"toml-spec",
|
||||
|
|
@ -57,6 +58,7 @@ jay-cmm = { version = "0.1.0", path = "cmm" }
|
|||
jay-time = { version = "0.1.0", path = "time" }
|
||||
jay-tracy = { version = "0.1.0", path = "tracy" }
|
||||
jay-async-engine = { version = "0.1.0", path = "async-engine" }
|
||||
jay-io-uring = { version = "0.1.0", path = "io-uring" }
|
||||
|
||||
uapi = "0.2.13"
|
||||
thiserror = "2.0.11"
|
||||
|
|
|
|||
15
io-uring/Cargo.toml
Normal file
15
io-uring/Cargo.toml
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "jay-io-uring"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license = "GPL-3.0-only"
|
||||
|
||||
[dependencies]
|
||||
jay-async-engine = { version = "0.1.0", path = "../async-engine" }
|
||||
jay-time = { version = "0.1.0", path = "../time" }
|
||||
jay-utils = { version = "0.1.0", path = "../utils" }
|
||||
|
||||
log = { version = "0.4.20", features = ["std"] }
|
||||
run-on-drop = "1.0.0"
|
||||
thiserror = "2.0.11"
|
||||
uapi = "0.2.13"
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
use {
|
||||
crate::{io_uring::IoUringData, utils::numcell::NumCell},
|
||||
crate::IoUringData,
|
||||
jay_utils::numcell::NumCell,
|
||||
std::{cell::Cell, future::poll_fn, rc::Rc, task::Poll},
|
||||
};
|
||||
|
||||
587
io-uring/src/lib.rs
Normal file
587
io-uring/src/lib.rs
Normal file
|
|
@ -0,0 +1,587 @@
|
|||
pub use ops::{
|
||||
TaskResultExt,
|
||||
poll_external::{PendingPoll, PollCallback},
|
||||
timeout_external::{PendingTimeout, TimeoutCallback},
|
||||
};
|
||||
use {
|
||||
crate::{
|
||||
debounce::Debouncer,
|
||||
ops::{
|
||||
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
||||
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
||||
read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask,
|
||||
sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask,
|
||||
timeout_link::TimeoutLinkTask,
|
||||
},
|
||||
pending_result::PendingResults,
|
||||
sys::{
|
||||
IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING,
|
||||
IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_DEFER_TASKRUN,
|
||||
IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SUBMIT_ALL, IOSQE_IO_LINK, io_uring_cqe,
|
||||
io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe,
|
||||
},
|
||||
},
|
||||
jay_async_engine::AsyncEngine,
|
||||
jay_utils::{
|
||||
asyncevent::AsyncEvent,
|
||||
bitflags::BitflagsExt,
|
||||
buf::Buf,
|
||||
copyhashmap::CopyHashMap,
|
||||
errorfmt::ErrorFmt,
|
||||
mmap::{Mmapped, mmap},
|
||||
numcell::NumCell,
|
||||
oserror::OsError,
|
||||
ptr_ext::{MutPtrExt, PtrExt},
|
||||
stack::Stack,
|
||||
syncqueue::SyncQueue,
|
||||
},
|
||||
std::{
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
rc::Rc,
|
||||
sync::atomic::{
|
||||
AtomicU32,
|
||||
Ordering::{Acquire, Relaxed, Release},
|
||||
},
|
||||
task::Waker,
|
||||
},
|
||||
thiserror::Error,
|
||||
uapi::{
|
||||
OwnedFd,
|
||||
c::{self},
|
||||
},
|
||||
};
|
||||
|
||||
macro_rules! map_err {
|
||||
($n:expr) => {{
|
||||
let n = $n;
|
||||
if n < 0 {
|
||||
Err(jay_utils::oserror::OsError::from(-n as uapi::c::c_int))
|
||||
} else {
|
||||
Ok(n)
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
mod debounce;
|
||||
mod ops;
|
||||
mod pending_result;
|
||||
mod sys;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum IoUringError {
|
||||
#[error(transparent)]
|
||||
OsError(#[from] OsError),
|
||||
#[error("Could not create an io-uring")]
|
||||
CreateUring(#[source] OsError),
|
||||
#[error("The kernel does not support the IORING_FEAT_NODROP feature")]
|
||||
NoDrop,
|
||||
#[error("Could not map the submission queue ring")]
|
||||
MapSqRing(#[source] OsError),
|
||||
#[error("Could not map the submission queue entries")]
|
||||
MapSqEntries(#[source] OsError),
|
||||
#[error("Could not map the completion queue ring")]
|
||||
MapCqRing(#[source] OsError),
|
||||
#[error("The io-uring has already been destroyed")]
|
||||
Destroyed,
|
||||
#[error("io_uring_enter failed")]
|
||||
Enter(#[source] OsError),
|
||||
#[error("Kernel sent invalid cmsg data")]
|
||||
InvalidCmsgData,
|
||||
}
|
||||
|
||||
pub struct IoUring {
|
||||
ring: Rc<IoUringData>,
|
||||
}
|
||||
|
||||
impl Drop for IoUring {
|
||||
fn drop(&mut self) {
|
||||
self.ring.kill();
|
||||
}
|
||||
}
|
||||
|
||||
impl IoUring {
|
||||
pub fn new(eng: &Rc<AsyncEngine>, entries: u32) -> Result<Rc<Self>, IoUringError> {
|
||||
let feature_levels = [
|
||||
IORING_SETUP_SUBMIT_ALL, // 5.18
|
||||
IORING_SETUP_COOP_TASKRUN, // 5.19
|
||||
IORING_SETUP_SINGLE_ISSUER, // 6.0
|
||||
IORING_SETUP_DEFER_TASKRUN, // 6.1
|
||||
];
|
||||
let mut feature_levels = &feature_levels[..];
|
||||
let mut params;
|
||||
let fd = loop {
|
||||
params = io_uring_params::default();
|
||||
for &flags in feature_levels {
|
||||
params.flags |= flags;
|
||||
}
|
||||
match io_uring_setup(entries, &mut params) {
|
||||
Ok(f) => break f,
|
||||
Err(e) => {
|
||||
if let Some((_, levels)) = feature_levels.split_last() {
|
||||
feature_levels = levels;
|
||||
} else {
|
||||
return Err(IoUringError::CreateUring(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
if !params.features.contains(IORING_FEAT_NODROP) {
|
||||
return Err(IoUringError::NoDrop);
|
||||
}
|
||||
let sqmap_map = mmap(
|
||||
(params.sq_off.array + params.sq_entries * 4) as _,
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_SQ_RING as _,
|
||||
);
|
||||
let sqmap_map = match sqmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapSqRing(e)),
|
||||
};
|
||||
let sqesmap_map = mmap(
|
||||
params.sq_entries as usize * size_of::<io_uring_sqe>(),
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_SQES as _,
|
||||
);
|
||||
let sqesmap_map = match sqesmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapSqEntries(e)),
|
||||
};
|
||||
let cqmap_map = mmap(
|
||||
params.cq_off.cqes as usize + params.cq_entries as usize * size_of::<io_uring_cqe>(),
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_CQ_RING as _,
|
||||
);
|
||||
let cqmap_map = match cqmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapCqRing(e)),
|
||||
};
|
||||
let sqmask = unsafe {
|
||||
*(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.ring_mask as _)
|
||||
.cast()
|
||||
};
|
||||
let sqhead = unsafe {
|
||||
(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.head as _)
|
||||
.cast()
|
||||
};
|
||||
let sqtail = unsafe {
|
||||
(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.tail as _)
|
||||
.cast()
|
||||
};
|
||||
let sqmap = unsafe {
|
||||
let base = (sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.array as _)
|
||||
.cast();
|
||||
std::slice::from_raw_parts(base, params.sq_entries as _)
|
||||
};
|
||||
let sqesmap = unsafe {
|
||||
let base = (sqesmap_map.ptr as *const u8).cast();
|
||||
std::slice::from_raw_parts(base, params.sq_entries as _)
|
||||
};
|
||||
let cqmask = unsafe {
|
||||
*(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.ring_mask as _)
|
||||
.cast()
|
||||
};
|
||||
let cqhead = unsafe {
|
||||
(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.head as _)
|
||||
.cast()
|
||||
};
|
||||
let cqtail = unsafe {
|
||||
(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.tail as _)
|
||||
.cast()
|
||||
};
|
||||
let cqmap = unsafe {
|
||||
let base = (cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.cqes as _)
|
||||
.cast();
|
||||
std::slice::from_raw_parts(base, params.cq_entries as _)
|
||||
};
|
||||
let data = Rc::new(IoUringData {
|
||||
destroyed: Cell::new(false),
|
||||
fd,
|
||||
eng: eng.clone(),
|
||||
_sqesmap_map: sqesmap_map,
|
||||
_sqmap_map: sqmap_map,
|
||||
sqmask,
|
||||
sqlen: params.sq_entries,
|
||||
sqhead,
|
||||
sqtail,
|
||||
sqmap,
|
||||
sqesmap,
|
||||
_cqmap_map: cqmap_map,
|
||||
cqmask,
|
||||
cqhead,
|
||||
cqtail,
|
||||
cqmap,
|
||||
cqes_consumed: Default::default(),
|
||||
next: Default::default(),
|
||||
to_encode: Default::default(),
|
||||
pending_in_kernel: Default::default(),
|
||||
tasks: Default::default(),
|
||||
pending_results: Default::default(),
|
||||
cached_read_writes: Default::default(),
|
||||
cached_read_writes_no_cancel: Default::default(),
|
||||
cached_cancels: Default::default(),
|
||||
cached_polls: Default::default(),
|
||||
cached_polls_external: Default::default(),
|
||||
cached_sendmsg: Default::default(),
|
||||
cached_recvmsg: Default::default(),
|
||||
cached_timeouts: Default::default(),
|
||||
cached_timeouts_external: Default::default(),
|
||||
cached_timeout_links: Default::default(),
|
||||
cached_cmsg_bufs: Default::default(),
|
||||
cached_connects: Default::default(),
|
||||
cached_accepts: Default::default(),
|
||||
fd_ids_scratch: Default::default(),
|
||||
iteration: Default::default(),
|
||||
yields: Default::default(),
|
||||
});
|
||||
Ok(Rc::new(Self { ring: data }))
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.ring.kill();
|
||||
}
|
||||
|
||||
pub fn run(&self) -> Result<(), IoUringError> {
|
||||
let res = self.ring.run();
|
||||
self.ring.kill();
|
||||
res
|
||||
}
|
||||
|
||||
pub fn cancel(&self, id: IoUringTaskId) {
|
||||
self.ring.cancel_task(id);
|
||||
}
|
||||
|
||||
pub fn debouncer(&self, max: u64) -> Debouncer {
|
||||
Debouncer {
|
||||
cur: Default::default(),
|
||||
max,
|
||||
iteration: Cell::new(self.ring.iteration.get()),
|
||||
ring: self.ring.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct IoUringData {
|
||||
destroyed: Cell<bool>,
|
||||
|
||||
fd: OwnedFd,
|
||||
eng: Rc<AsyncEngine>,
|
||||
|
||||
_sqesmap_map: Mmapped,
|
||||
_sqmap_map: Mmapped,
|
||||
sqmask: u32,
|
||||
sqlen: u32,
|
||||
sqhead: *const AtomicU32,
|
||||
sqtail: *const AtomicU32,
|
||||
sqmap: *const [Cell<c::c_uint>],
|
||||
sqesmap: *const [UnsafeCell<io_uring_sqe>],
|
||||
|
||||
_cqmap_map: Mmapped,
|
||||
cqmask: u32,
|
||||
cqhead: *const AtomicU32,
|
||||
cqtail: *const AtomicU32,
|
||||
cqmap: *const [Cell<io_uring_cqe>],
|
||||
|
||||
cqes_consumed: AsyncEvent,
|
||||
|
||||
next: IoUringTaskIds,
|
||||
to_encode: SyncQueue<IoUringTaskId>,
|
||||
pending_in_kernel: CopyHashMap<IoUringTaskId, ()>,
|
||||
tasks: CopyHashMap<IoUringTaskId, Box<dyn Task>>,
|
||||
|
||||
pending_results: PendingResults,
|
||||
|
||||
cached_read_writes: Stack<Box<ReadWriteTask>>,
|
||||
cached_read_writes_no_cancel: Stack<Box<ReadWriteNoCancelTask>>,
|
||||
cached_cancels: Stack<Box<AsyncCancelTask>>,
|
||||
cached_polls: Stack<Box<PollTask>>,
|
||||
cached_polls_external: Stack<Box<PollExternalTask>>,
|
||||
cached_sendmsg: Stack<Box<SendmsgTask>>,
|
||||
cached_recvmsg: Stack<Box<RecvmsgTask>>,
|
||||
cached_timeouts: Stack<Box<TimeoutTask>>,
|
||||
cached_timeouts_external: Stack<Box<TimeoutExternalTask>>,
|
||||
cached_timeout_links: Stack<Box<TimeoutLinkTask>>,
|
||||
cached_cmsg_bufs: Stack<Buf>,
|
||||
cached_connects: Stack<Box<ConnectTask>>,
|
||||
cached_accepts: Stack<Box<AcceptTask>>,
|
||||
|
||||
fd_ids_scratch: RefCell<Vec<c::c_int>>,
|
||||
|
||||
iteration: NumCell<u64>,
|
||||
yields: SyncQueue<Waker>,
|
||||
}
|
||||
|
||||
unsafe trait Task {
|
||||
fn id(&self) -> IoUringTaskId;
|
||||
fn complete(self: Box<Self>, ring: &IoUringData, res: i32);
|
||||
fn encode(&self, sqe: &mut io_uring_sqe);
|
||||
|
||||
fn is_cancel(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn has_timeout(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl IoUringData {
|
||||
fn run(&self) -> Result<(), IoUringError> {
|
||||
let mut to_submit = 0;
|
||||
loop {
|
||||
self.iteration.fetch_add(1);
|
||||
while let Some(ev) = self.yields.pop() {
|
||||
ev.wake();
|
||||
}
|
||||
loop {
|
||||
self.eng.dispatch();
|
||||
if self.destroyed.get() {
|
||||
return Ok(());
|
||||
}
|
||||
if !self.dispatch_completions() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
to_submit += self.encode();
|
||||
let res = {
|
||||
let (to_submit, mut min_complete, flags) = if to_submit == 0 {
|
||||
(0, 1, IORING_ENTER_GETEVENTS)
|
||||
} else if self.to_encode.is_empty() {
|
||||
(to_submit as _, 1, IORING_ENTER_GETEVENTS)
|
||||
} else {
|
||||
(!0, 0, 0)
|
||||
};
|
||||
if self.yields.is_not_empty() {
|
||||
min_complete = 0;
|
||||
}
|
||||
io_uring_enter(self.fd.raw(), to_submit, min_complete, flags)
|
||||
};
|
||||
let mut submitted_any = false;
|
||||
match res {
|
||||
Ok(n) => {
|
||||
if n > 0 {
|
||||
submitted_any = true;
|
||||
}
|
||||
to_submit -= n;
|
||||
}
|
||||
Err(e) => {
|
||||
if !matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) {
|
||||
return Err(IoUringError::Enter(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
if to_submit > 0 && !submitted_any {
|
||||
let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS);
|
||||
if let Err(e) = res {
|
||||
if e.0 != c::EINTR {
|
||||
return Err(IoUringError::Enter(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatch_completions(&self) -> bool {
|
||||
unsafe {
|
||||
let mut head = self.cqhead.deref().load(Relaxed);
|
||||
let tail = self.cqtail.deref().load(Acquire);
|
||||
if head == tail {
|
||||
return false;
|
||||
}
|
||||
while head != tail {
|
||||
let idx = (head & self.cqmask) as usize;
|
||||
let entry = self.cqmap.deref()[idx].get();
|
||||
head = head.wrapping_add(1);
|
||||
self.cqhead.deref().store(head, Release);
|
||||
let id = IoUringTaskId(entry.user_data);
|
||||
if let Some(pending) = self.tasks.remove(&id) {
|
||||
self.pending_in_kernel.remove(&id);
|
||||
pending.complete(self, entry.res);
|
||||
}
|
||||
}
|
||||
self.cqhead.deref().store(head, Release);
|
||||
self.cqes_consumed.trigger();
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn encode(&self) -> usize {
|
||||
let tasks = self.tasks.lock();
|
||||
let mut encoded = 0;
|
||||
unsafe {
|
||||
let mut tail = self.sqtail.deref().load(Relaxed);
|
||||
let head = self.sqhead.deref().load(Acquire);
|
||||
let available = self.sqlen - tail.wrapping_sub(head);
|
||||
while encoded < available {
|
||||
let id = match self.to_encode.pop() {
|
||||
Some(t) => t,
|
||||
_ => break,
|
||||
};
|
||||
let task = match tasks.get(&id) {
|
||||
Some(t) => t,
|
||||
_ => continue,
|
||||
};
|
||||
let has_timeout = task.has_timeout();
|
||||
if has_timeout && (available - encoded) < 2 {
|
||||
self.to_encode.push_front(id);
|
||||
break;
|
||||
}
|
||||
self.pending_in_kernel.set(id, ());
|
||||
let idx = (tail & self.sqmask) as usize;
|
||||
let sqe = self.sqesmap.deref()[idx].get().deref_mut();
|
||||
self.sqmap.deref()[idx].set(idx as _);
|
||||
*sqe = Default::default();
|
||||
sqe.user_data = id.raw();
|
||||
task.encode(sqe);
|
||||
if has_timeout {
|
||||
sqe.flags |= IOSQE_IO_LINK;
|
||||
}
|
||||
tail = tail.wrapping_add(1);
|
||||
encoded += 1;
|
||||
}
|
||||
self.sqtail.deref().store(tail, Release);
|
||||
}
|
||||
encoded as usize
|
||||
}
|
||||
|
||||
fn id(&self) -> Cancellable<'_> {
|
||||
Cancellable {
|
||||
id: self.id_raw(),
|
||||
data: self,
|
||||
}
|
||||
}
|
||||
|
||||
fn id_raw(&self) -> IoUringTaskId {
|
||||
self.next.next()
|
||||
}
|
||||
|
||||
fn cancel_task(&self, id: IoUringTaskId) {
|
||||
if !self.tasks.contains(&id) {
|
||||
return;
|
||||
}
|
||||
if !self.pending_in_kernel.contains(&id) {
|
||||
self.tasks
|
||||
.remove(&id)
|
||||
.unwrap()
|
||||
.complete(self, -c::ECANCELED);
|
||||
return;
|
||||
}
|
||||
self.cancel_task_in_kernel(id);
|
||||
}
|
||||
|
||||
fn schedule(&self, t: Box<dyn Task>) {
|
||||
assert!(!self.destroyed.get());
|
||||
self.to_encode.push(t.id());
|
||||
self.tasks.set(t.id(), t);
|
||||
}
|
||||
|
||||
fn check_destroyed(&self) -> Result<(), IoUringError> {
|
||||
if self.destroyed.get() {
|
||||
Err(IoUringError::Destroyed)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn kill(&self) {
|
||||
self.eng.stop();
|
||||
let mut to_cancel = vec![];
|
||||
for task in self.tasks.lock().values() {
|
||||
if !task.is_cancel() {
|
||||
to_cancel.push(task.id());
|
||||
}
|
||||
}
|
||||
for task in to_cancel {
|
||||
self.cancel_task(task);
|
||||
}
|
||||
self.destroyed.set(true);
|
||||
while !self.tasks.is_empty() {
|
||||
self.encode();
|
||||
let _ = io_uring_enter(self.fd.raw(), u32::MAX, 0, 0);
|
||||
let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS);
|
||||
if let Err(e) = res {
|
||||
panic!("Could not wait for io_uring to drain: {}", ErrorFmt(e));
|
||||
}
|
||||
while self.dispatch_completions() {
|
||||
// nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmsg_buf(&self) -> Buf {
|
||||
self.cached_cmsg_bufs
|
||||
.pop()
|
||||
.unwrap_or_else(|| Buf::new(1024))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct IoUringTaskIds {
|
||||
next: NumCell<u64>,
|
||||
}
|
||||
|
||||
impl Default for IoUringTaskIds {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
next: NumCell::new(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IoUringTaskIds {
|
||||
fn next(&self) -> IoUringTaskId {
|
||||
IoUringTaskId(self.next.fetch_add(1))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
|
||||
pub struct IoUringTaskId(u64);
|
||||
|
||||
impl IoUringTaskId {
|
||||
#[allow(clippy::allow_attributes, dead_code)]
|
||||
pub fn raw(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
|
||||
#[allow(clippy::allow_attributes, dead_code)]
|
||||
pub fn from_raw(id: u64) -> Self {
|
||||
Self(id)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for IoUringTaskId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
std::fmt::Display::fmt(&self.0, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::derivable_impls)]
|
||||
impl Default for IoUringTaskId {
|
||||
fn default() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
struct Cancellable<'a> {
|
||||
id: IoUringTaskId,
|
||||
data: &'a IoUringData,
|
||||
}
|
||||
|
||||
impl<'a> Drop for Cancellable<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.data.cancel_task(self.id);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
use crate::{io_uring::IoUringError, utils::oserror::OsError};
|
||||
use {crate::IoUringError, jay_utils::oserror::OsError};
|
||||
|
||||
pub mod accept;
|
||||
pub mod async_cancel;
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
crate::io_uring::{
|
||||
crate::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_ACCEPT, io_uring_sqe},
|
||||
|
|
@ -1,11 +1,9 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUringData, IoUringTaskId, Task,
|
||||
sys::{IORING_OP_ASYNC_CANCEL, io_uring_sqe},
|
||||
},
|
||||
utils::errorfmt::ErrorFmt,
|
||||
IoUringData, IoUringTaskId, Task,
|
||||
sys::{IORING_OP_ASYNC_CANCEL, io_uring_sqe},
|
||||
},
|
||||
jay_utils::errorfmt::ErrorFmt,
|
||||
uapi::c,
|
||||
};
|
||||
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
crate::io_uring::{
|
||||
crate::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_CONNECT, io_uring_sqe},
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
crate::io_uring::{
|
||||
crate::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
ops::TaskResult,
|
||||
pending_result::PendingResult,
|
||||
|
|
@ -32,7 +32,6 @@ impl IoUring {
|
|||
self.poll(fd, c::POLLIN).await.merge()
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub async fn writable(&self, fd: &Rc<OwnedFd>) -> Result<c::c_short, IoUringError> {
|
||||
self.poll(fd, c::POLLOUT).await.merge()
|
||||
}
|
||||
|
|
@ -1,11 +1,9 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
sys::{IORING_OP_POLL_ADD, io_uring_sqe},
|
||||
},
|
||||
utils::oserror::OsError,
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
sys::{IORING_OP_POLL_ADD, io_uring_sqe},
|
||||
},
|
||||
jay_utils::oserror::OsError,
|
||||
std::{cell::Cell, rc::Rc},
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
|
|
@ -61,7 +59,6 @@ impl IoUring {
|
|||
self.poll_external(fd, c::POLLIN, callback)
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub fn writable_external(
|
||||
&self,
|
||||
fd: &Rc<OwnedFd>,
|
||||
|
|
@ -1,13 +1,11 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe},
|
||||
},
|
||||
time::Time,
|
||||
utils::buf::Buf,
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe},
|
||||
},
|
||||
jay_time::Time,
|
||||
jay_utils::buf::Buf,
|
||||
std::rc::Rc,
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
|
|
@ -3,13 +3,11 @@ mod tests;
|
|||
|
||||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe},
|
||||
},
|
||||
time::Time,
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_READ, IORING_OP_WRITE, io_uring_sqe},
|
||||
},
|
||||
jay_time::Time,
|
||||
run_on_drop::on_drop,
|
||||
uapi::{Fd, c},
|
||||
};
|
||||
|
|
@ -1,10 +1,7 @@
|
|||
use {
|
||||
crate::{
|
||||
async_engine::AsyncEngine,
|
||||
io_uring::{IoUring, IoUringError},
|
||||
utils::{oserror::OsError, queue::AsyncQueue},
|
||||
wheel::Wheel,
|
||||
},
|
||||
crate::{IoUring, IoUringError},
|
||||
jay_async_engine::AsyncEngine,
|
||||
jay_utils::{oserror::OsError, queue::AsyncQueue},
|
||||
std::rc::Rc,
|
||||
uapi::c::ECANCELED,
|
||||
};
|
||||
|
|
@ -14,7 +11,6 @@ fn cancel(timeout: bool) {
|
|||
let ring = IoUring::new(&eng, 32).unwrap();
|
||||
let ring2 = ring.clone();
|
||||
let ring3 = ring.clone();
|
||||
let wheel = Wheel::new(&eng, &ring).unwrap();
|
||||
let queue = Rc::new(AsyncQueue::new());
|
||||
let queue2 = queue.clone();
|
||||
let _fut1 = eng.spawn("", async move {
|
||||
|
|
@ -32,7 +28,7 @@ fn cancel(timeout: bool) {
|
|||
let _fut2 = eng.spawn("", async move {
|
||||
let id = queue2.pop().await;
|
||||
if timeout {
|
||||
wheel.timeout(1).await.unwrap();
|
||||
ring2.timeout(1).await.unwrap();
|
||||
}
|
||||
ring2.cancel(id);
|
||||
});
|
||||
|
|
@ -1,12 +1,10 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_RECVMSG, io_uring_sqe},
|
||||
},
|
||||
utils::buf::Buf,
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_RECVMSG, io_uring_sqe},
|
||||
},
|
||||
jay_utils::buf::Buf,
|
||||
std::{cell::Cell, collections::VecDeque, mem::MaybeUninit, rc::Rc},
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
|
|
@ -1,13 +1,11 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_SENDMSG, io_uring_sqe},
|
||||
},
|
||||
time::Time,
|
||||
utils::{buf::Buf, compat::IovLength, vec_ext::UninitVecExt},
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_SENDMSG, io_uring_sqe},
|
||||
},
|
||||
jay_time::Time,
|
||||
jay_utils::{buf::Buf, compat::IovLength, vec_ext::UninitVecExt},
|
||||
std::{mem::MaybeUninit, ptr, rc::Rc},
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
crate::io_uring::{
|
||||
crate::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
pending_result::PendingResult,
|
||||
sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe},
|
||||
|
|
@ -1,12 +1,9 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||
ops::timeout::timespec64,
|
||||
sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe},
|
||||
},
|
||||
utils::oserror::OsError,
|
||||
IoUring, IoUringData, IoUringError, IoUringTaskId, Task, ops::timeout::timespec64,
|
||||
sys::{IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe},
|
||||
},
|
||||
jay_utils::oserror::OsError,
|
||||
std::{cell::Cell, rc::Rc},
|
||||
uapi::c,
|
||||
};
|
||||
|
|
@ -1,11 +1,8 @@
|
|||
use crate::{
|
||||
io_uring::{
|
||||
IoUring, IoUringData, IoUringTaskId, Task,
|
||||
ops::timeout::timespec64,
|
||||
sys::{IORING_OP_LINK_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe},
|
||||
},
|
||||
time::Time,
|
||||
IoUring, IoUringData, IoUringTaskId, Task, ops::timeout::timespec64,
|
||||
sys::{IORING_OP_LINK_TIMEOUT, IORING_TIMEOUT_ABS, io_uring_sqe},
|
||||
};
|
||||
use jay_time::Time;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TimeoutLinkTask {
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
crate::utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack},
|
||||
jay_utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack},
|
||||
std::{
|
||||
cell::Cell,
|
||||
future::Future,
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
#![allow(non_camel_case_types, dead_code)]
|
||||
|
||||
use {
|
||||
crate::utils::oserror::OsError,
|
||||
jay_utils::oserror::OsError,
|
||||
std::mem::MaybeUninit,
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
552
src/io_uring.rs
552
src/io_uring.rs
|
|
@ -1,551 +1 @@
|
|||
pub use ops::{
|
||||
TaskResultExt,
|
||||
poll_external::{PendingPoll, PollCallback},
|
||||
timeout_external::{PendingTimeout, TimeoutCallback},
|
||||
};
|
||||
use {
|
||||
crate::{
|
||||
async_engine::AsyncEngine,
|
||||
io_uring::{
|
||||
debounce::Debouncer,
|
||||
ops::{
|
||||
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
||||
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
||||
read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask,
|
||||
sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask,
|
||||
timeout_link::TimeoutLinkTask,
|
||||
},
|
||||
pending_result::PendingResults,
|
||||
sys::{
|
||||
IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING,
|
||||
IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_DEFER_TASKRUN,
|
||||
IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SUBMIT_ALL, IOSQE_IO_LINK, io_uring_cqe,
|
||||
io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe,
|
||||
},
|
||||
},
|
||||
utils::{
|
||||
asyncevent::AsyncEvent,
|
||||
bitflags::BitflagsExt,
|
||||
buf::Buf,
|
||||
copyhashmap::CopyHashMap,
|
||||
errorfmt::ErrorFmt,
|
||||
mmap::{Mmapped, mmap},
|
||||
numcell::NumCell,
|
||||
oserror::OsError,
|
||||
ptr_ext::{MutPtrExt, PtrExt},
|
||||
stack::Stack,
|
||||
syncqueue::SyncQueue,
|
||||
},
|
||||
},
|
||||
std::{
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
rc::Rc,
|
||||
sync::atomic::{
|
||||
AtomicU32,
|
||||
Ordering::{Acquire, Relaxed, Release},
|
||||
},
|
||||
task::Waker,
|
||||
},
|
||||
thiserror::Error,
|
||||
uapi::{
|
||||
OwnedFd,
|
||||
c::{self},
|
||||
},
|
||||
};
|
||||
|
||||
macro_rules! map_err {
|
||||
($n:expr) => {{
|
||||
let n = $n;
|
||||
if n < 0 {
|
||||
Err(crate::utils::oserror::OsError::from(-n as uapi::c::c_int))
|
||||
} else {
|
||||
Ok(n)
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
mod debounce;
|
||||
mod ops;
|
||||
mod pending_result;
|
||||
mod sys;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum IoUringError {
|
||||
#[error(transparent)]
|
||||
OsError(#[from] OsError),
|
||||
#[error("Could not create an io-uring")]
|
||||
CreateUring(#[source] OsError),
|
||||
#[error("The kernel does not support the IORING_FEAT_NODROP feature")]
|
||||
NoDrop,
|
||||
#[error("Could not map the submission queue ring")]
|
||||
MapSqRing(#[source] OsError),
|
||||
#[error("Could not map the submission queue entries")]
|
||||
MapSqEntries(#[source] OsError),
|
||||
#[error("Could not map the completion queue ring")]
|
||||
MapCqRing(#[source] OsError),
|
||||
#[error("The io-uring has already been destroyed")]
|
||||
Destroyed,
|
||||
#[error("io_uring_enter failed")]
|
||||
Enter(#[source] OsError),
|
||||
#[error("Kernel sent invalid cmsg data")]
|
||||
InvalidCmsgData,
|
||||
}
|
||||
|
||||
pub struct IoUring {
|
||||
ring: Rc<IoUringData>,
|
||||
}
|
||||
|
||||
impl Drop for IoUring {
|
||||
fn drop(&mut self) {
|
||||
self.ring.kill();
|
||||
}
|
||||
}
|
||||
|
||||
impl IoUring {
|
||||
pub fn new(eng: &Rc<AsyncEngine>, entries: u32) -> Result<Rc<Self>, IoUringError> {
|
||||
let feature_levels = [
|
||||
IORING_SETUP_SUBMIT_ALL, // 5.18
|
||||
IORING_SETUP_COOP_TASKRUN, // 5.19
|
||||
IORING_SETUP_SINGLE_ISSUER, // 6.0
|
||||
IORING_SETUP_DEFER_TASKRUN, // 6.1
|
||||
];
|
||||
let mut feature_levels = &feature_levels[..];
|
||||
let mut params;
|
||||
let fd = loop {
|
||||
params = io_uring_params::default();
|
||||
for &flags in feature_levels {
|
||||
params.flags |= flags;
|
||||
}
|
||||
match io_uring_setup(entries, &mut params) {
|
||||
Ok(f) => break f,
|
||||
Err(e) => {
|
||||
if let Some((_, levels)) = feature_levels.split_last() {
|
||||
feature_levels = levels;
|
||||
} else {
|
||||
return Err(IoUringError::CreateUring(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
if !params.features.contains(IORING_FEAT_NODROP) {
|
||||
return Err(IoUringError::NoDrop);
|
||||
}
|
||||
let sqmap_map = mmap(
|
||||
(params.sq_off.array + params.sq_entries * 4) as _,
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_SQ_RING as _,
|
||||
);
|
||||
let sqmap_map = match sqmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapSqRing(e)),
|
||||
};
|
||||
let sqesmap_map = mmap(
|
||||
params.sq_entries as usize * size_of::<io_uring_sqe>(),
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_SQES as _,
|
||||
);
|
||||
let sqesmap_map = match sqesmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapSqEntries(e)),
|
||||
};
|
||||
let cqmap_map = mmap(
|
||||
params.cq_off.cqes as usize + params.cq_entries as usize * size_of::<io_uring_cqe>(),
|
||||
c::PROT_READ | c::PROT_WRITE,
|
||||
c::MAP_SHARED | c::MAP_POPULATE,
|
||||
fd.raw(),
|
||||
IORING_OFF_CQ_RING as _,
|
||||
);
|
||||
let cqmap_map = match cqmap_map {
|
||||
Ok(map) => map,
|
||||
Err(e) => return Err(IoUringError::MapCqRing(e)),
|
||||
};
|
||||
let sqmask = unsafe {
|
||||
*(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.ring_mask as _)
|
||||
.cast()
|
||||
};
|
||||
let sqhead = unsafe {
|
||||
(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.head as _)
|
||||
.cast()
|
||||
};
|
||||
let sqtail = unsafe {
|
||||
(sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.tail as _)
|
||||
.cast()
|
||||
};
|
||||
let sqmap = unsafe {
|
||||
let base = (sqmap_map.ptr as *const u8)
|
||||
.add(params.sq_off.array as _)
|
||||
.cast();
|
||||
std::slice::from_raw_parts(base, params.sq_entries as _)
|
||||
};
|
||||
let sqesmap = unsafe {
|
||||
let base = (sqesmap_map.ptr as *const u8).cast();
|
||||
std::slice::from_raw_parts(base, params.sq_entries as _)
|
||||
};
|
||||
let cqmask = unsafe {
|
||||
*(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.ring_mask as _)
|
||||
.cast()
|
||||
};
|
||||
let cqhead = unsafe {
|
||||
(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.head as _)
|
||||
.cast()
|
||||
};
|
||||
let cqtail = unsafe {
|
||||
(cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.tail as _)
|
||||
.cast()
|
||||
};
|
||||
let cqmap = unsafe {
|
||||
let base = (cqmap_map.ptr as *const u8)
|
||||
.add(params.cq_off.cqes as _)
|
||||
.cast();
|
||||
std::slice::from_raw_parts(base, params.cq_entries as _)
|
||||
};
|
||||
let data = Rc::new(IoUringData {
|
||||
destroyed: Cell::new(false),
|
||||
fd,
|
||||
eng: eng.clone(),
|
||||
_sqesmap_map: sqesmap_map,
|
||||
_sqmap_map: sqmap_map,
|
||||
sqmask,
|
||||
sqlen: params.sq_entries,
|
||||
sqhead,
|
||||
sqtail,
|
||||
sqmap,
|
||||
sqesmap,
|
||||
_cqmap_map: cqmap_map,
|
||||
cqmask,
|
||||
cqhead,
|
||||
cqtail,
|
||||
cqmap,
|
||||
cqes_consumed: Default::default(),
|
||||
next: Default::default(),
|
||||
to_encode: Default::default(),
|
||||
pending_in_kernel: Default::default(),
|
||||
tasks: Default::default(),
|
||||
pending_results: Default::default(),
|
||||
cached_read_writes: Default::default(),
|
||||
cached_read_writes_no_cancel: Default::default(),
|
||||
cached_cancels: Default::default(),
|
||||
cached_polls: Default::default(),
|
||||
cached_polls_external: Default::default(),
|
||||
cached_sendmsg: Default::default(),
|
||||
cached_recvmsg: Default::default(),
|
||||
cached_timeouts: Default::default(),
|
||||
cached_timeouts_external: Default::default(),
|
||||
cached_timeout_links: Default::default(),
|
||||
cached_cmsg_bufs: Default::default(),
|
||||
cached_connects: Default::default(),
|
||||
cached_accepts: Default::default(),
|
||||
fd_ids_scratch: Default::default(),
|
||||
iteration: Default::default(),
|
||||
yields: Default::default(),
|
||||
});
|
||||
Ok(Rc::new(Self { ring: data }))
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.ring.kill();
|
||||
}
|
||||
|
||||
pub fn run(&self) -> Result<(), IoUringError> {
|
||||
let res = self.ring.run();
|
||||
self.ring.kill();
|
||||
res
|
||||
}
|
||||
|
||||
pub fn cancel(&self, id: IoUringTaskId) {
|
||||
self.ring.cancel_task(id);
|
||||
}
|
||||
|
||||
pub fn debouncer(&self, max: u64) -> Debouncer {
|
||||
Debouncer {
|
||||
cur: Default::default(),
|
||||
max,
|
||||
iteration: Cell::new(self.ring.iteration.get()),
|
||||
ring: self.ring.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct IoUringData {
|
||||
destroyed: Cell<bool>,
|
||||
|
||||
fd: OwnedFd,
|
||||
eng: Rc<AsyncEngine>,
|
||||
|
||||
_sqesmap_map: Mmapped,
|
||||
_sqmap_map: Mmapped,
|
||||
sqmask: u32,
|
||||
sqlen: u32,
|
||||
sqhead: *const AtomicU32,
|
||||
sqtail: *const AtomicU32,
|
||||
sqmap: *const [Cell<c::c_uint>],
|
||||
sqesmap: *const [UnsafeCell<io_uring_sqe>],
|
||||
|
||||
_cqmap_map: Mmapped,
|
||||
cqmask: u32,
|
||||
cqhead: *const AtomicU32,
|
||||
cqtail: *const AtomicU32,
|
||||
cqmap: *const [Cell<io_uring_cqe>],
|
||||
|
||||
cqes_consumed: AsyncEvent,
|
||||
|
||||
next: IoUringTaskIds,
|
||||
to_encode: SyncQueue<IoUringTaskId>,
|
||||
pending_in_kernel: CopyHashMap<IoUringTaskId, ()>,
|
||||
tasks: CopyHashMap<IoUringTaskId, Box<dyn Task>>,
|
||||
|
||||
pending_results: PendingResults,
|
||||
|
||||
cached_read_writes: Stack<Box<ReadWriteTask>>,
|
||||
cached_read_writes_no_cancel: Stack<Box<ReadWriteNoCancelTask>>,
|
||||
cached_cancels: Stack<Box<AsyncCancelTask>>,
|
||||
cached_polls: Stack<Box<PollTask>>,
|
||||
cached_polls_external: Stack<Box<PollExternalTask>>,
|
||||
cached_sendmsg: Stack<Box<SendmsgTask>>,
|
||||
cached_recvmsg: Stack<Box<RecvmsgTask>>,
|
||||
cached_timeouts: Stack<Box<TimeoutTask>>,
|
||||
cached_timeouts_external: Stack<Box<TimeoutExternalTask>>,
|
||||
cached_timeout_links: Stack<Box<TimeoutLinkTask>>,
|
||||
cached_cmsg_bufs: Stack<Buf>,
|
||||
cached_connects: Stack<Box<ConnectTask>>,
|
||||
cached_accepts: Stack<Box<AcceptTask>>,
|
||||
|
||||
fd_ids_scratch: RefCell<Vec<c::c_int>>,
|
||||
|
||||
iteration: NumCell<u64>,
|
||||
yields: SyncQueue<Waker>,
|
||||
}
|
||||
|
||||
unsafe trait Task {
|
||||
fn id(&self) -> IoUringTaskId;
|
||||
fn complete(self: Box<Self>, ring: &IoUringData, res: i32);
|
||||
fn encode(&self, sqe: &mut io_uring_sqe);
|
||||
|
||||
fn is_cancel(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn has_timeout(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl IoUringData {
|
||||
fn run(&self) -> Result<(), IoUringError> {
|
||||
let mut to_submit = 0;
|
||||
loop {
|
||||
self.iteration.fetch_add(1);
|
||||
while let Some(ev) = self.yields.pop() {
|
||||
ev.wake();
|
||||
}
|
||||
loop {
|
||||
self.eng.dispatch();
|
||||
if self.destroyed.get() {
|
||||
return Ok(());
|
||||
}
|
||||
if !self.dispatch_completions() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
to_submit += self.encode();
|
||||
let res = {
|
||||
let (to_submit, mut min_complete, flags) = if to_submit == 0 {
|
||||
(0, 1, IORING_ENTER_GETEVENTS)
|
||||
} else if self.to_encode.is_empty() {
|
||||
(to_submit as _, 1, IORING_ENTER_GETEVENTS)
|
||||
} else {
|
||||
(!0, 0, 0)
|
||||
};
|
||||
if self.yields.is_not_empty() {
|
||||
min_complete = 0;
|
||||
}
|
||||
io_uring_enter(self.fd.raw(), to_submit, min_complete, flags)
|
||||
};
|
||||
let mut submitted_any = false;
|
||||
match res {
|
||||
Ok(n) => {
|
||||
if n > 0 {
|
||||
submitted_any = true;
|
||||
}
|
||||
to_submit -= n;
|
||||
}
|
||||
Err(e) => {
|
||||
if not_matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) {
|
||||
return Err(IoUringError::Enter(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
if to_submit > 0 && !submitted_any {
|
||||
let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS);
|
||||
if let Err(e) = res {
|
||||
if e.0 != c::EINTR {
|
||||
return Err(IoUringError::Enter(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatch_completions(&self) -> bool {
|
||||
unsafe {
|
||||
let mut head = self.cqhead.deref().load(Relaxed);
|
||||
let tail = self.cqtail.deref().load(Acquire);
|
||||
if head == tail {
|
||||
return false;
|
||||
}
|
||||
while head != tail {
|
||||
let idx = (head & self.cqmask) as usize;
|
||||
let entry = self.cqmap.deref()[idx].get();
|
||||
head = head.wrapping_add(1);
|
||||
self.cqhead.deref().store(head, Release);
|
||||
let id = IoUringTaskId(entry.user_data);
|
||||
if let Some(pending) = self.tasks.remove(&id) {
|
||||
self.pending_in_kernel.remove(&id);
|
||||
pending.complete(self, entry.res);
|
||||
}
|
||||
}
|
||||
self.cqhead.deref().store(head, Release);
|
||||
self.cqes_consumed.trigger();
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn encode(&self) -> usize {
|
||||
let tasks = self.tasks.lock();
|
||||
let mut encoded = 0;
|
||||
unsafe {
|
||||
let mut tail = self.sqtail.deref().load(Relaxed);
|
||||
let head = self.sqhead.deref().load(Acquire);
|
||||
let available = self.sqlen - tail.wrapping_sub(head);
|
||||
while encoded < available {
|
||||
let id = match self.to_encode.pop() {
|
||||
Some(t) => t,
|
||||
_ => break,
|
||||
};
|
||||
let task = match tasks.get(&id) {
|
||||
Some(t) => t,
|
||||
_ => continue,
|
||||
};
|
||||
let has_timeout = task.has_timeout();
|
||||
if has_timeout && (available - encoded) < 2 {
|
||||
self.to_encode.push_front(id);
|
||||
break;
|
||||
}
|
||||
self.pending_in_kernel.set(id, ());
|
||||
let idx = (tail & self.sqmask) as usize;
|
||||
let sqe = self.sqesmap.deref()[idx].get().deref_mut();
|
||||
self.sqmap.deref()[idx].set(idx as _);
|
||||
*sqe = Default::default();
|
||||
sqe.user_data = id.raw();
|
||||
task.encode(sqe);
|
||||
if has_timeout {
|
||||
sqe.flags |= IOSQE_IO_LINK;
|
||||
}
|
||||
tail = tail.wrapping_add(1);
|
||||
encoded += 1;
|
||||
}
|
||||
self.sqtail.deref().store(tail, Release);
|
||||
}
|
||||
encoded as usize
|
||||
}
|
||||
|
||||
fn id(&self) -> Cancellable<'_> {
|
||||
Cancellable {
|
||||
id: self.id_raw(),
|
||||
data: self,
|
||||
}
|
||||
}
|
||||
|
||||
fn id_raw(&self) -> IoUringTaskId {
|
||||
self.next.next()
|
||||
}
|
||||
|
||||
fn cancel_task(&self, id: IoUringTaskId) {
|
||||
if !self.tasks.contains(&id) {
|
||||
return;
|
||||
}
|
||||
if !self.pending_in_kernel.contains(&id) {
|
||||
self.tasks
|
||||
.remove(&id)
|
||||
.unwrap()
|
||||
.complete(self, -c::ECANCELED);
|
||||
return;
|
||||
}
|
||||
self.cancel_task_in_kernel(id);
|
||||
}
|
||||
|
||||
fn schedule(&self, t: Box<dyn Task>) {
|
||||
assert!(!self.destroyed.get());
|
||||
self.to_encode.push(t.id());
|
||||
self.tasks.set(t.id(), t);
|
||||
}
|
||||
|
||||
fn check_destroyed(&self) -> Result<(), IoUringError> {
|
||||
if self.destroyed.get() {
|
||||
Err(IoUringError::Destroyed)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn kill(&self) {
|
||||
self.eng.stop();
|
||||
let mut to_cancel = vec![];
|
||||
for task in self.tasks.lock().values() {
|
||||
if !task.is_cancel() {
|
||||
to_cancel.push(task.id());
|
||||
}
|
||||
}
|
||||
for task in to_cancel {
|
||||
self.cancel_task(task);
|
||||
}
|
||||
self.destroyed.set(true);
|
||||
while !self.tasks.is_empty() {
|
||||
self.encode();
|
||||
let _ = io_uring_enter(self.fd.raw(), u32::MAX, 0, 0);
|
||||
let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS);
|
||||
if let Err(e) = res {
|
||||
panic!("Could not wait for io_uring to drain: {}", ErrorFmt(e));
|
||||
}
|
||||
while self.dispatch_completions() {
|
||||
// nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmsg_buf(&self) -> Buf {
|
||||
self.cached_cmsg_bufs
|
||||
.pop()
|
||||
.unwrap_or_else(|| Buf::new(1024))
|
||||
}
|
||||
}
|
||||
|
||||
linear_ids!(IoUringTaskIds, IoUringTaskId, u64);
|
||||
|
||||
#[expect(clippy::derivable_impls)]
|
||||
impl Default for IoUringTaskId {
|
||||
fn default() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
struct Cancellable<'a> {
|
||||
id: IoUringTaskId,
|
||||
data: &'a IoUringData,
|
||||
}
|
||||
|
||||
impl<'a> Drop for Cancellable<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.data.cancel_task(self.id);
|
||||
}
|
||||
}
|
||||
pub use jay_io_uring::*;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue