From 4780315f5037892cb92d0ce48d0c471af292d2d8 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Fri, 13 May 2022 22:31:56 +0200 Subject: [PATCH] io-uring: ensure that timeouts are encoded in the same submission --- src/io_uring.rs | 25 +++++++++++++++++++------ src/io_uring/ops/sendmsg.rs | 9 +++++---- src/io_uring/ops/write.rs | 9 +++++---- src/utils/queue.rs | 4 ---- src/utils/syncqueue.rs | 6 ++++++ 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/io_uring.rs b/src/io_uring.rs index 1d508b1e..e02266a4 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -11,7 +11,7 @@ use { sys::{ io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, IORING_ENTER_GETEVENTS, IORING_FEAT_NODROP, IORING_OFF_CQ_RING, IORING_OFF_SQES, - IORING_OFF_SQ_RING, + IORING_OFF_SQ_RING, IOSQE_IO_LINK, }, }, utils::{ @@ -24,8 +24,8 @@ use { numcell::NumCell, oserror::OsError, ptr_ext::{MutPtrExt, PtrExt}, - queue::AsyncQueue, stack::Stack, + syncqueue::SyncQueue, }, }, std::{ @@ -251,7 +251,7 @@ struct IoUringData { cqes_consumed: AsyncEvent, next: NumCell, - to_encode: AsyncQueue, + to_encode: SyncQueue, pending_in_kernel: CopyHashMap, tasks: CopyHashMap>, @@ -276,6 +276,10 @@ unsafe trait Task { fn is_cancel(&self) -> bool { false } + + fn has_timeout(&self) -> bool { + false + } } impl IoUringData { @@ -353,8 +357,9 @@ impl IoUringData { unsafe { let mut tail = self.sqtail.deref().load(Relaxed); let head = self.sqhead.deref().load(Acquire); - while tail.wrapping_sub(head) < self.sqlen { - let id = match self.to_encode.try_pop() { + let available = self.sqlen - tail.wrapping_sub(head); + while encoded < available { + let id = match self.to_encode.pop() { Some(t) => t, _ => break, }; @@ -362,6 +367,11 @@ impl IoUringData { Some(t) => t, _ => continue, }; + let has_timeout = task.has_timeout(); + if has_timeout && (available - encoded) < 2 { + self.to_encode.push_front(id); + break; + } self.pending_in_kernel.set(id, ()); let idx = (tail & self.sqmask) as usize; let mut sqe = self.sqesmap.deref()[idx].get().deref_mut(); @@ -369,12 +379,15 @@ impl IoUringData { *sqe = Default::default(); sqe.user_data = id; task.encode(sqe); + if has_timeout { + sqe.flags |= IOSQE_IO_LINK; + } tail = tail.wrapping_add(1); encoded += 1; } self.sqtail.deref().store(tail, Release); } - encoded + encoded as usize } fn id(&self) -> Cancellable { diff --git a/src/io_uring/ops/sendmsg.rs b/src/io_uring/ops/sendmsg.rs index 0091752d..4381071d 100644 --- a/src/io_uring/ops/sendmsg.rs +++ b/src/io_uring/ops/sendmsg.rs @@ -2,7 +2,7 @@ use { crate::{ io_uring::{ pending_result::PendingResult, - sys::{io_uring_sqe, IORING_OP_SENDMSG, IOSQE_IO_LINK}, + sys::{io_uring_sqe, IORING_OP_SENDMSG}, IoUring, IoUringData, IoUringError, Task, }, time::Time, @@ -123,8 +123,9 @@ unsafe impl Task for SendmsgTask { sqe.fd = self.fd; sqe.u2.addr = &self.msghdr as *const _ as _; sqe.u3.msg_flags = c::MSG_NOSIGNAL as _; - if self.has_timeout { - sqe.flags = IOSQE_IO_LINK; - } + } + + fn has_timeout(&self) -> bool { + self.has_timeout } } diff --git a/src/io_uring/ops/write.rs b/src/io_uring/ops/write.rs index 13869fc4..23e1b82d 100644 --- a/src/io_uring/ops/write.rs +++ b/src/io_uring/ops/write.rs @@ -3,7 +3,7 @@ use { io_uring::{ ops::TaskResult, pending_result::PendingResult, - sys::{io_uring_sqe, IORING_OP_WRITE, IOSQE_IO_LINK}, + sys::{io_uring_sqe, IORING_OP_WRITE}, IoUring, IoUringData, Task, }, time::Time, @@ -74,8 +74,9 @@ unsafe impl Task for WriteTask { sqe.u2.addr = data.buf.as_ptr() as _; sqe.u3.rw_flags = 0; sqe.len = data.buf.len() as _; - if self.has_timeout { - sqe.flags = IOSQE_IO_LINK; - } + } + + fn has_timeout(&self) -> bool { + self.has_timeout } } diff --git a/src/utils/queue.rs b/src/utils/queue.rs index ae7c6d85..21b0a6ba 100644 --- a/src/utils/queue.rs +++ b/src/utils/queue.rs @@ -56,10 +56,6 @@ impl AsyncQueue { } self.waiter.take(); } - - pub fn is_empty(&self) -> bool { - unsafe { self.data.get().deref().is_empty() } - } } pub struct AsyncQueuePop<'a, T> { diff --git a/src/utils/syncqueue.rs b/src/utils/syncqueue.rs index 87dea9d3..2cb2c936 100644 --- a/src/utils/syncqueue.rs +++ b/src/utils/syncqueue.rs @@ -23,6 +23,12 @@ impl SyncQueue { } } + pub fn push_front(&self, t: T) { + unsafe { + self.el.get().deref_mut().push_front(t); + } + } + pub fn pop(&self) -> Option { unsafe { self.el.get().deref_mut().pop_front() } }