io_uring: add debounce future
This commit is contained in:
parent
9977f9dfdf
commit
597636fba6
3 changed files with 68 additions and 7 deletions
|
|
@ -7,6 +7,7 @@ use {
|
||||||
crate::{
|
crate::{
|
||||||
async_engine::AsyncEngine,
|
async_engine::AsyncEngine,
|
||||||
io_uring::{
|
io_uring::{
|
||||||
|
debounce::Debouncer,
|
||||||
ops::{
|
ops::{
|
||||||
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask,
|
||||||
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
poll::PollTask, poll_external::PollExternalTask, read_write::ReadWriteTask,
|
||||||
|
|
@ -29,6 +30,7 @@ use {
|
||||||
copyhashmap::CopyHashMap,
|
copyhashmap::CopyHashMap,
|
||||||
errorfmt::ErrorFmt,
|
errorfmt::ErrorFmt,
|
||||||
mmap::{Mmapped, mmap},
|
mmap::{Mmapped, mmap},
|
||||||
|
numcell::NumCell,
|
||||||
oserror::OsError,
|
oserror::OsError,
|
||||||
ptr_ext::{MutPtrExt, PtrExt},
|
ptr_ext::{MutPtrExt, PtrExt},
|
||||||
stack::Stack,
|
stack::Stack,
|
||||||
|
|
@ -42,6 +44,7 @@ use {
|
||||||
AtomicU32,
|
AtomicU32,
|
||||||
Ordering::{Acquire, Relaxed, Release},
|
Ordering::{Acquire, Relaxed, Release},
|
||||||
},
|
},
|
||||||
|
task::Waker,
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
uapi::{
|
uapi::{
|
||||||
|
|
@ -61,6 +64,7 @@ macro_rules! map_err {
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mod debounce;
|
||||||
mod ops;
|
mod ops;
|
||||||
mod pending_result;
|
mod pending_result;
|
||||||
mod sys;
|
mod sys;
|
||||||
|
|
@ -242,6 +246,8 @@ impl IoUring {
|
||||||
cached_connects: Default::default(),
|
cached_connects: Default::default(),
|
||||||
cached_accepts: Default::default(),
|
cached_accepts: Default::default(),
|
||||||
fd_ids_scratch: Default::default(),
|
fd_ids_scratch: Default::default(),
|
||||||
|
iteration: Default::default(),
|
||||||
|
yields: Default::default(),
|
||||||
});
|
});
|
||||||
Ok(Rc::new(Self { ring: data }))
|
Ok(Rc::new(Self { ring: data }))
|
||||||
}
|
}
|
||||||
|
|
@ -259,6 +265,16 @@ impl IoUring {
|
||||||
pub fn cancel(&self, id: IoUringTaskId) {
|
pub fn cancel(&self, id: IoUringTaskId) {
|
||||||
self.ring.cancel_task(id);
|
self.ring.cancel_task(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub fn debouncer(&self, max: u64) -> Debouncer {
|
||||||
|
Debouncer {
|
||||||
|
cur: Default::default(),
|
||||||
|
max,
|
||||||
|
iteration: Cell::new(self.ring.iteration.get()),
|
||||||
|
ring: self.ring.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct IoUringData {
|
struct IoUringData {
|
||||||
|
|
@ -306,6 +322,9 @@ struct IoUringData {
|
||||||
cached_accepts: Stack<Box<AcceptTask>>,
|
cached_accepts: Stack<Box<AcceptTask>>,
|
||||||
|
|
||||||
fd_ids_scratch: RefCell<Vec<c::c_int>>,
|
fd_ids_scratch: RefCell<Vec<c::c_int>>,
|
||||||
|
|
||||||
|
iteration: NumCell<u64>,
|
||||||
|
yields: SyncQueue<Waker>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe trait Task {
|
unsafe trait Task {
|
||||||
|
|
@ -326,6 +345,10 @@ impl IoUringData {
|
||||||
fn run(&self) -> Result<(), IoUringError> {
|
fn run(&self) -> Result<(), IoUringError> {
|
||||||
let mut to_submit = 0;
|
let mut to_submit = 0;
|
||||||
loop {
|
loop {
|
||||||
|
self.iteration.fetch_add(1);
|
||||||
|
while let Some(ev) = self.yields.pop() {
|
||||||
|
ev.wake();
|
||||||
|
}
|
||||||
loop {
|
loop {
|
||||||
self.eng.dispatch();
|
self.eng.dispatch();
|
||||||
if self.destroyed.get() {
|
if self.destroyed.get() {
|
||||||
|
|
@ -336,12 +359,18 @@ impl IoUringData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
to_submit += self.encode();
|
to_submit += self.encode();
|
||||||
let res = if to_submit == 0 {
|
let res = {
|
||||||
io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS)
|
let (to_submit, mut min_complete, flags) = if to_submit == 0 {
|
||||||
} else if self.to_encode.is_empty() {
|
(0, 1, IORING_ENTER_GETEVENTS)
|
||||||
io_uring_enter(self.fd.raw(), to_submit as _, 1, IORING_ENTER_GETEVENTS)
|
} else if self.to_encode.is_empty() {
|
||||||
} else {
|
(to_submit as _, 1, IORING_ENTER_GETEVENTS)
|
||||||
io_uring_enter(self.fd.raw(), !0, 0, 0)
|
} else {
|
||||||
|
(!0, 0, 0)
|
||||||
|
};
|
||||||
|
if self.yields.is_not_empty() {
|
||||||
|
min_complete = 0;
|
||||||
|
}
|
||||||
|
io_uring_enter(self.fd.raw(), to_submit, min_complete, flags)
|
||||||
};
|
};
|
||||||
let mut submitted_any = false;
|
let mut submitted_any = false;
|
||||||
match res {
|
match res {
|
||||||
|
|
|
||||||
33
src/io_uring/debounce.rs
Normal file
33
src/io_uring/debounce.rs
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
use {
|
||||||
|
crate::{io_uring::IoUringData, utils::numcell::NumCell},
|
||||||
|
std::{cell::Cell, future::poll_fn, rc::Rc, task::Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Debouncer {
|
||||||
|
pub(super) cur: NumCell<u64>,
|
||||||
|
pub(super) max: u64,
|
||||||
|
pub(super) iteration: Cell<u64>,
|
||||||
|
pub(super) ring: Rc<IoUringData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debouncer {
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub async fn debounce(&self) {
|
||||||
|
let iteration = self.ring.iteration.get();
|
||||||
|
if self.iteration.replace(iteration) != iteration {
|
||||||
|
self.cur.set(0);
|
||||||
|
}
|
||||||
|
if self.cur.fetch_add(1) > self.max {
|
||||||
|
poll_fn(|ctx| {
|
||||||
|
if self.ring.iteration.get() > iteration {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
self.ring.yields.push(ctx.waker().clone());
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
self.cur.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -44,7 +44,6 @@ impl<T> SyncQueue<T> {
|
||||||
unsafe { self.el.get().deref_mut().is_empty() }
|
unsafe { self.el.get().deref_mut().is_empty() }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[expect(dead_code)]
|
|
||||||
pub fn is_not_empty(&self) -> bool {
|
pub fn is_not_empty(&self) -> bool {
|
||||||
!self.is_empty()
|
!self.is_empty()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue