eventfd: move cache into workspace crate
This commit is contained in:
parent
89dc6c91cf
commit
aa70204881
6 changed files with 191 additions and 162 deletions
13
Cargo.lock
generated
13
Cargo.lock
generated
|
|
@ -698,6 +698,7 @@ dependencies = [
|
||||||
"jay-criteria",
|
"jay-criteria",
|
||||||
"jay-dbus-core",
|
"jay-dbus-core",
|
||||||
"jay-edid",
|
"jay-edid",
|
||||||
|
"jay-eventfd-cache",
|
||||||
"jay-formats",
|
"jay-formats",
|
||||||
"jay-geometry",
|
"jay-geometry",
|
||||||
"jay-io-uring",
|
"jay-io-uring",
|
||||||
|
|
@ -787,6 +788,18 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "jay-eventfd-cache"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"jay-async-engine",
|
||||||
|
"jay-io-uring",
|
||||||
|
"jay-utils",
|
||||||
|
"log",
|
||||||
|
"thiserror",
|
||||||
|
"uapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "jay-formats"
|
name = "jay-formats"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ members = [
|
||||||
"wire-types",
|
"wire-types",
|
||||||
"wire-buf",
|
"wire-buf",
|
||||||
"tree-types",
|
"tree-types",
|
||||||
|
"eventfd-cache",
|
||||||
"toml-config",
|
"toml-config",
|
||||||
"algorithms",
|
"algorithms",
|
||||||
"toml-spec",
|
"toml-spec",
|
||||||
|
|
@ -71,6 +72,7 @@ jay-xcon = { version = "0.1.0", path = "xcon" }
|
||||||
jay-wire-types = { version = "0.1.0", path = "wire-types" }
|
jay-wire-types = { version = "0.1.0", path = "wire-types" }
|
||||||
jay-wire-buf = { version = "0.1.0", path = "wire-buf" }
|
jay-wire-buf = { version = "0.1.0", path = "wire-buf" }
|
||||||
jay-tree-types = { version = "0.1.0", path = "tree-types" }
|
jay-tree-types = { version = "0.1.0", path = "tree-types" }
|
||||||
|
jay-eventfd-cache = { version = "0.1.0", path = "eventfd-cache" }
|
||||||
|
|
||||||
uapi = "0.2.13"
|
uapi = "0.2.13"
|
||||||
thiserror = "2.0.11"
|
thiserror = "2.0.11"
|
||||||
|
|
|
||||||
14
eventfd-cache/Cargo.toml
Normal file
14
eventfd-cache/Cargo.toml
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "jay-eventfd-cache"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
license = "GPL-3.0-only"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
jay-async-engine = { version = "0.1.0", path = "../async-engine" }
|
||||||
|
jay-io-uring = { version = "0.1.0", path = "../io-uring" }
|
||||||
|
jay-utils = { version = "0.1.0", path = "../utils" }
|
||||||
|
|
||||||
|
log = { version = "0.4.20", features = ["std"] }
|
||||||
|
thiserror = "2.0.11"
|
||||||
|
uapi = "0.2.13"
|
||||||
157
eventfd-cache/src/lib.rs
Normal file
157
eventfd-cache/src/lib.rs
Normal file
|
|
@ -0,0 +1,157 @@
|
||||||
|
use {
|
||||||
|
jay_async_engine::{AsyncEngine, SpawnedFuture},
|
||||||
|
jay_io_uring::{IoUring, IoUringError},
|
||||||
|
jay_utils::{
|
||||||
|
buf::Buf,
|
||||||
|
errorfmt::ErrorFmt,
|
||||||
|
oserror::{OsError, OsErrorExt, OsErrorExt2},
|
||||||
|
queue::AsyncQueue,
|
||||||
|
stack::Stack,
|
||||||
|
},
|
||||||
|
std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll},
|
||||||
|
thiserror::Error,
|
||||||
|
uapi::{OwnedFd, c},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum EventfdError {
|
||||||
|
#[error("Could not create an eventfd")]
|
||||||
|
CreateEventfd(#[source] OsError),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EventfdCache {
|
||||||
|
inner: Rc<Inner>,
|
||||||
|
_task: SpawnedFuture<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
ring: Rc<IoUring>,
|
||||||
|
fds: Stack<Rc<OwnedFd>>,
|
||||||
|
recycle: AsyncQueue<Rc<OwnedFd>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Eventfd {
|
||||||
|
cache: Rc<Inner>,
|
||||||
|
pub fd: Rc<OwnedFd>,
|
||||||
|
signaled: Cell<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventfdCache {
|
||||||
|
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Rc<Self> {
|
||||||
|
let inner = Rc::new(Inner {
|
||||||
|
ring: ring.clone(),
|
||||||
|
fds: Default::default(),
|
||||||
|
recycle: Default::default(),
|
||||||
|
});
|
||||||
|
let task = eng.spawn("eventfd-cache", inner.clone().recycle());
|
||||||
|
Rc::new(Self { inner, _task: task })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn acquire(&self) -> Result<Eventfd, EventfdError> {
|
||||||
|
let fd = match self.inner.fds.pop() {
|
||||||
|
Some(fd) => fd,
|
||||||
|
_ => uapi::eventfd(0, c::EFD_CLOEXEC)
|
||||||
|
.map(Rc::new)
|
||||||
|
.map_os_err(EventfdError::CreateEventfd)?,
|
||||||
|
};
|
||||||
|
Ok(Eventfd {
|
||||||
|
cache: self.inner.clone(),
|
||||||
|
fd,
|
||||||
|
signaled: Default::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eventfd {
|
||||||
|
pub fn is_signaled(&self) -> bool {
|
||||||
|
self.signaled.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn signaled(&self) -> Result<(), IoUringError> {
|
||||||
|
if self.signaled.get() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.cache.ring.readable(&self.fd).await?;
|
||||||
|
self.signaled.set(true);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn signaled_blocking(&self) -> Result<(), OsError> {
|
||||||
|
if self.signaled.get() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let mut pollfd = c::pollfd {
|
||||||
|
fd: self.fd.raw(),
|
||||||
|
events: c::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
};
|
||||||
|
uapi::poll(slice::from_mut(&mut pollfd), -1).to_os_error()?;
|
||||||
|
self.signaled.set(true);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inner {
|
||||||
|
async fn recycle(self: Rc<Self>) {
|
||||||
|
let slf = &*self;
|
||||||
|
let mut fds = vec![];
|
||||||
|
let mut bufs = vec![];
|
||||||
|
let mut tasks = vec![];
|
||||||
|
let mut todo = vec![];
|
||||||
|
loop {
|
||||||
|
fds.clear();
|
||||||
|
tasks.clear();
|
||||||
|
todo.clear();
|
||||||
|
slf.recycle.non_empty().await;
|
||||||
|
while let Some(fd) = slf.recycle.try_pop() {
|
||||||
|
fds.push(fd);
|
||||||
|
}
|
||||||
|
for (idx, fd) in fds.iter().enumerate() {
|
||||||
|
if idx >= bufs.len() {
|
||||||
|
bufs.push(Buf::new(size_of::<u64>()));
|
||||||
|
}
|
||||||
|
let fd = fd.clone();
|
||||||
|
let buf = bufs[idx].clone();
|
||||||
|
tasks.push(async move { slf.ring.read(&fd, buf).await });
|
||||||
|
todo.push(idx);
|
||||||
|
}
|
||||||
|
poll_fn(|ctx| {
|
||||||
|
let mut i = 0;
|
||||||
|
while i < todo.len() {
|
||||||
|
let idx = todo[i];
|
||||||
|
let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) };
|
||||||
|
if let Poll::Ready(res) = task.poll(ctx) {
|
||||||
|
todo.swap_remove(i);
|
||||||
|
match res {
|
||||||
|
Ok(_) => {
|
||||||
|
self.fds.push(fds[idx].clone());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Could not read from eventfd: {}", ErrorFmt(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if todo.is_empty() {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Eventfd {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.signaled.get() {
|
||||||
|
self.cache.recycle.push(self.fd.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::EventfdCache,
|
||||||
async_engine::AsyncEngine, eventfd_cache::EventfdCache, io_uring::IoUring, utils::array,
|
jay_async_engine::AsyncEngine,
|
||||||
},
|
jay_io_uring::IoUring,
|
||||||
|
jay_utils::array,
|
||||||
std::{rc::Rc, slice},
|
std::{rc::Rc, slice},
|
||||||
uapi::c,
|
uapi::c,
|
||||||
};
|
};
|
||||||
|
|
@ -1,159 +1 @@
|
||||||
use {
|
pub use jay_eventfd_cache::*;
|
||||||
crate::{
|
|
||||||
async_engine::{AsyncEngine, SpawnedFuture},
|
|
||||||
io_uring::{IoUring, IoUringError},
|
|
||||||
utils::{
|
|
||||||
buf::Buf,
|
|
||||||
errorfmt::ErrorFmt,
|
|
||||||
oserror::{OsError, OsErrorExt, OsErrorExt2},
|
|
||||||
queue::AsyncQueue,
|
|
||||||
stack::Stack,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll},
|
|
||||||
thiserror::Error,
|
|
||||||
uapi::{OwnedFd, c},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum EventfdError {
|
|
||||||
#[error("Could not create an eventfd")]
|
|
||||||
CreateEventfd(#[source] OsError),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct EventfdCache {
|
|
||||||
inner: Rc<Inner>,
|
|
||||||
_task: SpawnedFuture<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Inner {
|
|
||||||
ring: Rc<IoUring>,
|
|
||||||
fds: Stack<Rc<OwnedFd>>,
|
|
||||||
recycle: AsyncQueue<Rc<OwnedFd>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Eventfd {
|
|
||||||
cache: Rc<Inner>,
|
|
||||||
pub fd: Rc<OwnedFd>,
|
|
||||||
signaled: Cell<bool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EventfdCache {
|
|
||||||
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Rc<Self> {
|
|
||||||
let inner = Rc::new(Inner {
|
|
||||||
ring: ring.clone(),
|
|
||||||
fds: Default::default(),
|
|
||||||
recycle: Default::default(),
|
|
||||||
});
|
|
||||||
let task = eng.spawn("eventfd-cache", inner.clone().recycle());
|
|
||||||
Rc::new(Self { inner, _task: task })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn acquire(&self) -> Result<Eventfd, EventfdError> {
|
|
||||||
let fd = match self.inner.fds.pop() {
|
|
||||||
Some(fd) => fd,
|
|
||||||
_ => uapi::eventfd(0, c::EFD_CLOEXEC)
|
|
||||||
.map(Rc::new)
|
|
||||||
.map_os_err(EventfdError::CreateEventfd)?,
|
|
||||||
};
|
|
||||||
Ok(Eventfd {
|
|
||||||
cache: self.inner.clone(),
|
|
||||||
fd,
|
|
||||||
signaled: Default::default(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Eventfd {
|
|
||||||
pub fn is_signaled(&self) -> bool {
|
|
||||||
self.signaled.get()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn signaled(&self) -> Result<(), IoUringError> {
|
|
||||||
if self.signaled.get() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
self.cache.ring.readable(&self.fd).await?;
|
|
||||||
self.signaled.set(true);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn signaled_blocking(&self) -> Result<(), OsError> {
|
|
||||||
if self.signaled.get() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
let mut pollfd = c::pollfd {
|
|
||||||
fd: self.fd.raw(),
|
|
||||||
events: c::POLLIN,
|
|
||||||
revents: 0,
|
|
||||||
};
|
|
||||||
uapi::poll(slice::from_mut(&mut pollfd), -1).to_os_error()?;
|
|
||||||
self.signaled.set(true);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Inner {
|
|
||||||
async fn recycle(self: Rc<Self>) {
|
|
||||||
let slf = &*self;
|
|
||||||
let mut fds = vec![];
|
|
||||||
let mut bufs = vec![];
|
|
||||||
let mut tasks = vec![];
|
|
||||||
let mut todo = vec![];
|
|
||||||
loop {
|
|
||||||
fds.clear();
|
|
||||||
tasks.clear();
|
|
||||||
todo.clear();
|
|
||||||
slf.recycle.non_empty().await;
|
|
||||||
while let Some(fd) = slf.recycle.try_pop() {
|
|
||||||
fds.push(fd);
|
|
||||||
}
|
|
||||||
for (idx, fd) in fds.iter().enumerate() {
|
|
||||||
if idx >= bufs.len() {
|
|
||||||
bufs.push(Buf::new(size_of::<u64>()));
|
|
||||||
}
|
|
||||||
let fd = fd.clone();
|
|
||||||
let buf = bufs[idx].clone();
|
|
||||||
tasks.push(async move { slf.ring.read(&fd, buf).await });
|
|
||||||
todo.push(idx);
|
|
||||||
}
|
|
||||||
poll_fn(|ctx| {
|
|
||||||
let mut i = 0;
|
|
||||||
while i < todo.len() {
|
|
||||||
let idx = todo[i];
|
|
||||||
let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) };
|
|
||||||
if let Poll::Ready(res) = task.poll(ctx) {
|
|
||||||
todo.swap_remove(i);
|
|
||||||
match res {
|
|
||||||
Ok(_) => {
|
|
||||||
self.fds.push(fds[idx].clone());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::error!("Could not read from eventfd: {}", ErrorFmt(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if todo.is_empty() {
|
|
||||||
Poll::Ready(())
|
|
||||||
} else {
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Eventfd {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if self.signaled.get() {
|
|
||||||
self.cache.recycle.push(self.fd.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue