1
0
Fork 0
forked from wry/wry

io-uring: ensure that timeouts are encoded in the same submission

This commit is contained in:
Julian Orth 2022-05-13 22:31:56 +02:00
parent 7cc6c945d3
commit 4780315f50
5 changed files with 35 additions and 18 deletions

View file

@ -11,7 +11,7 @@ use {
sys::{ sys::{
io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe,
IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQES, IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQES,
IORING_OFF_SQ_RING, IORING_OFF_SQ_RING, IOSQE_IO_LINK,
}, },
}, },
utils::{ utils::{
@ -24,8 +24,8 @@ use {
numcell::NumCell, numcell::NumCell,
oserror::OsError, oserror::OsError,
ptr_ext::{MutPtrExt, PtrExt}, ptr_ext::{MutPtrExt, PtrExt},
queue::AsyncQueue,
stack::Stack, stack::Stack,
syncqueue::SyncQueue,
}, },
}, },
std::{ std::{
@ -251,7 +251,7 @@ struct IoUringData {
cqes_consumed: AsyncEvent, cqes_consumed: AsyncEvent,
next: NumCell<u64>, next: NumCell<u64>,
to_encode: AsyncQueue<u64>, to_encode: SyncQueue<u64>,
pending_in_kernel: CopyHashMap<u64, ()>, pending_in_kernel: CopyHashMap<u64, ()>,
tasks: CopyHashMap<u64, Box<dyn Task>>, tasks: CopyHashMap<u64, Box<dyn Task>>,
@ -276,6 +276,10 @@ unsafe trait Task {
fn is_cancel(&self) -> bool { fn is_cancel(&self) -> bool {
false false
} }
fn has_timeout(&self) -> bool {
false
}
} }
impl IoUringData { impl IoUringData {
@ -353,8 +357,9 @@ impl IoUringData {
unsafe { unsafe {
let mut tail = self.sqtail.deref().load(Relaxed); let mut tail = self.sqtail.deref().load(Relaxed);
let head = self.sqhead.deref().load(Acquire); let head = self.sqhead.deref().load(Acquire);
while tail.wrapping_sub(head) < self.sqlen { let available = self.sqlen - tail.wrapping_sub(head);
let id = match self.to_encode.try_pop() { while encoded < available {
let id = match self.to_encode.pop() {
Some(t) => t, Some(t) => t,
_ => break, _ => break,
}; };
@ -362,6 +367,11 @@ impl IoUringData {
Some(t) => t, Some(t) => t,
_ => continue, _ => continue,
}; };
let has_timeout = task.has_timeout();
if has_timeout && (available - encoded) < 2 {
self.to_encode.push_front(id);
break;
}
self.pending_in_kernel.set(id, ()); self.pending_in_kernel.set(id, ());
let idx = (tail & self.sqmask) as usize; let idx = (tail & self.sqmask) as usize;
let mut sqe = self.sqesmap.deref()[idx].get().deref_mut(); let mut sqe = self.sqesmap.deref()[idx].get().deref_mut();
@ -369,12 +379,15 @@ impl IoUringData {
*sqe = Default::default(); *sqe = Default::default();
sqe.user_data = id; sqe.user_data = id;
task.encode(sqe); task.encode(sqe);
if has_timeout {
sqe.flags |= IOSQE_IO_LINK;
}
tail = tail.wrapping_add(1); tail = tail.wrapping_add(1);
encoded += 1; encoded += 1;
} }
self.sqtail.deref().store(tail, Release); self.sqtail.deref().store(tail, Release);
} }
encoded encoded as usize
} }
fn id(&self) -> Cancellable { fn id(&self) -> Cancellable {

View file

@ -2,7 +2,7 @@ use {
crate::{ crate::{
io_uring::{ io_uring::{
pending_result::PendingResult, pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_SENDMSG, IOSQE_IO_LINK}, sys::{io_uring_sqe, IORING_OP_SENDMSG},
IoUring, IoUringData, IoUringError, Task, IoUring, IoUringData, IoUringError, Task,
}, },
time::Time, time::Time,
@ -123,8 +123,9 @@ unsafe impl Task for SendmsgTask {
sqe.fd = self.fd; sqe.fd = self.fd;
sqe.u2.addr = &self.msghdr as *const _ as _; sqe.u2.addr = &self.msghdr as *const _ as _;
sqe.u3.msg_flags = c::MSG_NOSIGNAL as _; sqe.u3.msg_flags = c::MSG_NOSIGNAL as _;
if self.has_timeout { }
sqe.flags = IOSQE_IO_LINK;
} fn has_timeout(&self) -> bool {
self.has_timeout
} }
} }

View file

@ -3,7 +3,7 @@ use {
io_uring::{ io_uring::{
ops::TaskResult, ops::TaskResult,
pending_result::PendingResult, pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_WRITE, IOSQE_IO_LINK}, sys::{io_uring_sqe, IORING_OP_WRITE},
IoUring, IoUringData, Task, IoUring, IoUringData, Task,
}, },
time::Time, time::Time,
@ -74,8 +74,9 @@ unsafe impl Task for WriteTask {
sqe.u2.addr = data.buf.as_ptr() as _; sqe.u2.addr = data.buf.as_ptr() as _;
sqe.u3.rw_flags = 0; sqe.u3.rw_flags = 0;
sqe.len = data.buf.len() as _; sqe.len = data.buf.len() as _;
if self.has_timeout { }
sqe.flags = IOSQE_IO_LINK;
} fn has_timeout(&self) -> bool {
self.has_timeout
} }
} }

View file

@ -56,10 +56,6 @@ impl<T> AsyncQueue<T> {
} }
self.waiter.take(); self.waiter.take();
} }
pub fn is_empty(&self) -> bool {
unsafe { self.data.get().deref().is_empty() }
}
} }
pub struct AsyncQueuePop<'a, T> { pub struct AsyncQueuePop<'a, T> {

View file

@ -23,6 +23,12 @@ impl<T> SyncQueue<T> {
} }
} }
pub fn push_front(&self, t: T) {
unsafe {
self.el.get().deref_mut().push_front(t);
}
}
pub fn pop(&self) -> Option<T> { pub fn pop(&self) -> Option<T> {
unsafe { self.el.get().deref_mut().pop_front() } unsafe { self.el.get().deref_mut().pop_front() }
} }