io_uring: add timeout_external
This commit is contained in:
parent
fe7175fab2
commit
fa46527a37
3 changed files with 103 additions and 1 deletions
|
|
@ -10,7 +10,8 @@ use {
|
||||||
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
||||||
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
||||||
read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask,
|
read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask,
|
||||||
sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_link::TimeoutLinkTask,
|
sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_external::TimeoutExternalTask,
|
||||||
|
timeout_link::TimeoutLinkTask,
|
||||||
},
|
},
|
||||||
pending_result::PendingResults,
|
pending_result::PendingResults,
|
||||||
sys::{
|
sys::{
|
||||||
|
|
@ -216,6 +217,7 @@ impl IoUring {
|
||||||
cached_sendmsg: Default::default(),
|
cached_sendmsg: Default::default(),
|
||||||
cached_recvmsg: Default::default(),
|
cached_recvmsg: Default::default(),
|
||||||
cached_timeouts: Default::default(),
|
cached_timeouts: Default::default(),
|
||||||
|
cached_timeouts_external: Default::default(),
|
||||||
cached_timeout_links: Default::default(),
|
cached_timeout_links: Default::default(),
|
||||||
cached_cmsg_bufs: Default::default(),
|
cached_cmsg_bufs: Default::default(),
|
||||||
cached_connects: Default::default(),
|
cached_connects: Default::default(),
|
||||||
|
|
@ -278,6 +280,7 @@ struct IoUringData {
|
||||||
cached_sendmsg: Stack<Box<SendmsgTask>>,
|
cached_sendmsg: Stack<Box<SendmsgTask>>,
|
||||||
cached_recvmsg: Stack<Box<RecvmsgTask>>,
|
cached_recvmsg: Stack<Box<RecvmsgTask>>,
|
||||||
cached_timeouts: Stack<Box<TimeoutTask>>,
|
cached_timeouts: Stack<Box<TimeoutTask>>,
|
||||||
|
cached_timeouts_external: Stack<Box<TimeoutExternalTask>>,
|
||||||
cached_timeout_links: Stack<Box<TimeoutLinkTask>>,
|
cached_timeout_links: Stack<Box<TimeoutLinkTask>>,
|
||||||
cached_cmsg_bufs: Stack<Buf>,
|
cached_cmsg_bufs: Stack<Buf>,
|
||||||
cached_connects: Stack<Box<ConnectTask>>,
|
cached_connects: Stack<Box<ConnectTask>>,
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ pub mod read_write_no_cancel;
|
||||||
pub mod recvmsg;
|
pub mod recvmsg;
|
||||||
pub mod sendmsg;
|
pub mod sendmsg;
|
||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
|
pub mod timeout_external;
|
||||||
pub mod timeout_link;
|
pub mod timeout_link;
|
||||||
|
|
||||||
pub type TaskResult<T> = Result<Result<T, OsError>, IoUringError>;
|
pub type TaskResult<T> = Result<Result<T, OsError>, IoUringError>;
|
||||||
|
|
|
||||||
98
src/io_uring/ops/timeout_external.rs
Normal file
98
src/io_uring/ops/timeout_external.rs
Normal file
|
|
@ -0,0 +1,98 @@
|
||||||
|
use {
|
||||||
|
crate::{
|
||||||
|
io_uring::{
|
||||||
|
ops::timeout::timespec64,
|
||||||
|
sys::{io_uring_sqe, IORING_OP_TIMEOUT, IORING_TIMEOUT_ABS},
|
||||||
|
IoUring, IoUringData, IoUringError, IoUringTaskId, Task,
|
||||||
|
},
|
||||||
|
utils::oserror::OsError,
|
||||||
|
},
|
||||||
|
std::{cell::Cell, rc::Rc},
|
||||||
|
uapi::c,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub trait TimeoutCallback {
|
||||||
|
fn completed(self: Rc<Self>, res: Result<(), OsError>, data: u64);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PendingTimeout {
|
||||||
|
data: Rc<IoUringData>,
|
||||||
|
shared: Rc<TimeoutExternalTaskShared>,
|
||||||
|
id: IoUringTaskId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for PendingTimeout {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.shared.id.get() != self.id {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.shared.callback.take();
|
||||||
|
self.data.cancel_task(self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TimeoutExternalTaskShared {
|
||||||
|
id: Cell<IoUringTaskId>,
|
||||||
|
callback: Cell<Option<Rc<dyn TimeoutCallback>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct TimeoutExternalTask {
|
||||||
|
timespec: timespec64,
|
||||||
|
shared: Rc<TimeoutExternalTaskShared>,
|
||||||
|
data: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IoUring {
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub fn timeout_external(
|
||||||
|
&self,
|
||||||
|
timeout_nsec: u64,
|
||||||
|
callback: Rc<dyn TimeoutCallback>,
|
||||||
|
data: u64,
|
||||||
|
) -> Result<PendingTimeout, IoUringError> {
|
||||||
|
self.ring.check_destroyed()?;
|
||||||
|
let mut pw = self.ring.cached_timeouts_external.pop().unwrap_or_default();
|
||||||
|
pw.shared.id.set(self.ring.id_raw());
|
||||||
|
pw.shared.callback.set(Some(callback));
|
||||||
|
pw.timespec = timespec64 {
|
||||||
|
tv_sec: (timeout_nsec / 1_000_000_000) as _,
|
||||||
|
tv_nsec: (timeout_nsec % 1_000_000_000) as _,
|
||||||
|
};
|
||||||
|
pw.data = data;
|
||||||
|
let pending = PendingTimeout {
|
||||||
|
data: self.ring.clone(),
|
||||||
|
shared: pw.shared.clone(),
|
||||||
|
id: pw.shared.id.get(),
|
||||||
|
};
|
||||||
|
self.ring.schedule(pw);
|
||||||
|
Ok(pending)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Task for TimeoutExternalTask {
|
||||||
|
fn id(&self) -> IoUringTaskId {
|
||||||
|
self.shared.id.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn complete(self: Box<Self>, ring: &IoUringData, res: i32) {
|
||||||
|
if let Some(pr) = self.shared.callback.take() {
|
||||||
|
let res = if res == -c::ETIME {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
map_err!(res).map(drop)
|
||||||
|
};
|
||||||
|
pr.completed(res, self.data);
|
||||||
|
}
|
||||||
|
ring.cached_timeouts_external.push(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode(&self, sqe: &mut io_uring_sqe) {
|
||||||
|
sqe.opcode = IORING_OP_TIMEOUT;
|
||||||
|
sqe.u2.addr = &self.timespec as *const _ as _;
|
||||||
|
sqe.len = 1;
|
||||||
|
sqe.u3.timeout_flags = IORING_TIMEOUT_ABS;
|
||||||
|
sqe.u1.off = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue