diff --git a/src/io_uring.rs b/src/io_uring.rs index 4f9f9267..9d19c786 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -10,7 +10,8 @@ use { accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask, read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask, - sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_link::TimeoutLinkTask, + sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask, + timeout_link::TimeoutLinkTask, }, pending_result::PendingResults, sys::{ @@ -216,6 +217,7 @@ impl IoUring { cached_sendmsg: Default::default(), cached_recvmsg: Default::default(), cached_timeouts: Default::default(), + cached_timeouts_external: Default::default(), cached_timeout_links: Default::default(), cached_cmsg_bufs: Default::default(), cached_connects: Default::default(), @@ -278,6 +280,7 @@ struct IoUringData { cached_sendmsg: Stack>, cached_recvmsg: Stack>, cached_timeouts: Stack>, + cached_timeouts_external: Stack>, cached_timeout_links: Stack>, cached_cmsg_bufs: Stack, cached_connects: Stack>, diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs index e5f4bf0a..e6abdbee 100644 --- a/src/io_uring/ops.rs +++ b/src/io_uring/ops.rs @@ -10,6 +10,7 @@ pub mod read_write_no_cancel; pub mod recvmsg; pub mod sendmsg; pub mod timeout; +pub mod timeout_external; pub mod timeout_link; pub type TaskResult = Result, IoUringError>; diff --git a/src/io_uring/ops/timeout_external.rs b/src/io_uring/ops/timeout_external.rs new file mode 100644 index 00000000..131d4c4b --- /dev/null +++ b/src/io_uring/ops/timeout_external.rs @@ -0,0 +1,98 @@ +use { + crate::{ + io_uring::{ + ops::timeout::timespec64, + sys::{io_uring_sqe, IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS}, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, + }, + utils::oserror::OsError, + }, + std::{cell::Cell, rc::Rc}, + uapi::c, +}; + +pub trait TimeoutCallback { + fn completed(self: Rc, res: Result<(), OsError>, data: u64); +} + +pub struct PendingTimeout { + data: Rc, + shared: Rc, + id: IoUringTaskId, +} + +impl Drop for PendingTimeout { + fn drop(&mut self) { + if self.shared.id.get() != self.id { + return; + } + self.shared.callback.take(); + self.data.cancel_task(self.id); + } +} + +#[derive(Default)] +struct TimeoutExternalTaskShared { + id: Cell, + callback: Cell>>, +} + +#[derive(Default)] +pub struct TimeoutExternalTask { + timespec: timespec64, + shared: Rc, + data: u64, +} + +impl IoUring { + #[expect(dead_code)] + pub fn timeout_external( + &self, + timeout_nsec: u64, + callback: Rc, + data: u64, + ) -> Result { + self.ring.check_destroyed()?; + let mut pw = self.ring.cached_timeouts_external.pop().unwrap_or_default(); + pw.shared.id.set(self.ring.id_raw()); + pw.shared.callback.set(Some(callback)); + pw.timespec = timespec64 { + tv_sec: (timeout_nsec / 1_000_000_000) as _, + tv_nsec: (timeout_nsec % 1_000_000_000) as _, + }; + pw.data = data; + let pending = PendingTimeout { + data: self.ring.clone(), + shared: pw.shared.clone(), + id: pw.shared.id.get(), + }; + self.ring.schedule(pw); + Ok(pending) + } +} + +unsafe impl Task for TimeoutExternalTask { + fn id(&self) -> IoUringTaskId { + self.shared.id.get() + } + + fn complete(self: Box, ring: &IoUringData, res: i32) { + if let Some(pr) = self.shared.callback.take() { + let res = if res == -c::ETIME { + Ok(()) + } else { + map_err!(res).map(drop) + }; + pr.completed(res, self.data); + } + ring.cached_timeouts_external.push(self); + } + + fn encode(&self, sqe: &mut io_uring_sqe) { + sqe.opcode = IORING_OP_TIMEOUT; + sqe.u2.addr = &self.timespec as *const _ as _; + sqe.len = 1; + sqe.u3.timeout_flags = IORING_TIMEOUT_ABS; + sqe.u1.off = 0; + } +}