From 597636fba6a1c99ab72ec3b274385c1fcb50ba15 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Sat, 3 May 2025 12:31:54 +0200 Subject: [PATCH] io_uring: add debounce future --- src/io_uring.rs | 41 ++++++++++++++++++++++++++++++++++------ src/io_uring/debounce.rs | 33 ++++++++++++++++++++++++++++++++ src/utils/syncqueue.rs | 1 - 3 files changed, 68 insertions(+), 7 deletions(-) create mode 100644 src/io_uring/debounce.rs diff --git a/src/io_uring.rs b/src/io_uring.rs index 7ed3d63e..d5e1a249 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -7,6 +7,7 @@ use { crate::{ async_engine::AsyncEngine, io_uring::{ + debounce::Debouncer, ops::{ accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask, @@ -29,6 +30,7 @@ use { copyhashmap::CopyHashMap, errorfmt::ErrorFmt, mmap::{Mmapped, mmap}, + numcell::NumCell, oserror::OsError, ptr_ext::{MutPtrExt, PtrExt}, stack::Stack, @@ -42,6 +44,7 @@ use { AtomicU32, Ordering::{Acquire, Relaxed, Release}, }, + task::Waker, }, thiserror::Error, uapi::{ @@ -61,6 +64,7 @@ macro_rules! map_err { }}; } +mod debounce; mod ops; mod pending_result; mod sys; @@ -242,6 +246,8 @@ impl IoUring { cached_connects: Default::default(), cached_accepts: Default::default(), fd_ids_scratch: Default::default(), + iteration: Default::default(), + yields: Default::default(), }); Ok(Rc::new(Self { ring: data })) } @@ -259,6 +265,16 @@ impl IoUring { pub fn cancel(&self, id: IoUringTaskId) { self.ring.cancel_task(id); } + + #[expect(dead_code)] + pub fn debouncer(&self, max: u64) -> Debouncer { + Debouncer { + cur: Default::default(), + max, + iteration: Cell::new(self.ring.iteration.get()), + ring: self.ring.clone(), + } + } } struct IoUringData { @@ -306,6 +322,9 @@ struct IoUringData { cached_accepts: Stack>, fd_ids_scratch: RefCell>, + + iteration: NumCell, + yields: SyncQueue, } unsafe trait Task { @@ -326,6 +345,10 @@ impl IoUringData { fn run(&self) -> Result<(), IoUringError> { let mut to_submit = 0; loop { + self.iteration.fetch_add(1); + while let Some(ev) = self.yields.pop() { + ev.wake(); + } loop { self.eng.dispatch(); if self.destroyed.get() { @@ -336,12 +359,18 @@ impl IoUringData { } } to_submit += self.encode(); - let res = if to_submit == 0 { - io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS) - } else if self.to_encode.is_empty() { - io_uring_enter(self.fd.raw(), to_submit as _, 1, IORING_ENTER_GETEVENTS) - } else { - io_uring_enter(self.fd.raw(), !0, 0, 0) + let res = { + let (to_submit, mut min_complete, flags) = if to_submit == 0 { + (0, 1, IORING_ENTER_GETEVENTS) + } else if self.to_encode.is_empty() { + (to_submit as _, 1, IORING_ENTER_GETEVENTS) + } else { + (!0, 0, 0) + }; + if self.yields.is_not_empty() { + min_complete = 0; + } + io_uring_enter(self.fd.raw(), to_submit, min_complete, flags) }; let mut submitted_any = false; match res { diff --git a/src/io_uring/debounce.rs b/src/io_uring/debounce.rs new file mode 100644 index 00000000..6c68693f --- /dev/null +++ b/src/io_uring/debounce.rs @@ -0,0 +1,33 @@ +use { + crate::{io_uring::IoUringData, utils::numcell::NumCell}, + std::{cell::Cell, future::poll_fn, rc::Rc, task::Poll}, +}; + +pub struct Debouncer { + pub(super) cur: NumCell, + pub(super) max: u64, + pub(super) iteration: Cell, + pub(super) ring: Rc, +} + +impl Debouncer { + #[expect(dead_code)] + pub async fn debounce(&self) { + let iteration = self.ring.iteration.get(); + if self.iteration.replace(iteration) != iteration { + self.cur.set(0); + } + if self.cur.fetch_add(1) > self.max { + poll_fn(|ctx| { + if self.ring.iteration.get() > iteration { + Poll::Ready(()) + } else { + self.ring.yields.push(ctx.waker().clone()); + Poll::Pending + } + }) + .await; + self.cur.set(0); + } + } +} diff --git a/src/utils/syncqueue.rs b/src/utils/syncqueue.rs index 1a0852e1..d8bc938f 100644 --- a/src/utils/syncqueue.rs +++ b/src/utils/syncqueue.rs @@ -44,7 +44,6 @@ impl SyncQueue { unsafe { self.el.get().deref_mut().is_empty() } } - #[expect(dead_code)] pub fn is_not_empty(&self) -> bool { !self.is_empty() }