diff --git a/Cargo.lock b/Cargo.lock index cdbdf9ac..31602b57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,6 +698,7 @@ dependencies = [ "jay-criteria", "jay-dbus-core", "jay-edid", + "jay-eventfd-cache", "jay-formats", "jay-geometry", "jay-io-uring", @@ -787,6 +788,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jay-eventfd-cache" +version = "0.1.0" +dependencies = [ + "jay-async-engine", + "jay-io-uring", + "jay-utils", + "log", + "thiserror", + "uapi", +] + [[package]] name = "jay-formats" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index e36312cf..e7a10432 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "wire-types", "wire-buf", "tree-types", + "eventfd-cache", "toml-config", "algorithms", "toml-spec", @@ -71,6 +72,7 @@ jay-xcon = { version = "0.1.0", path = "xcon" } jay-wire-types = { version = "0.1.0", path = "wire-types" } jay-wire-buf = { version = "0.1.0", path = "wire-buf" } jay-tree-types = { version = "0.1.0", path = "tree-types" } +jay-eventfd-cache = { version = "0.1.0", path = "eventfd-cache" } uapi = "0.2.13" thiserror = "2.0.11" diff --git a/eventfd-cache/Cargo.toml b/eventfd-cache/Cargo.toml new file mode 100644 index 00000000..8bc6e166 --- /dev/null +++ b/eventfd-cache/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "jay-eventfd-cache" +version = "0.1.0" +edition = "2024" +license = "GPL-3.0-only" + +[dependencies] +jay-async-engine = { version = "0.1.0", path = "../async-engine" } +jay-io-uring = { version = "0.1.0", path = "../io-uring" } +jay-utils = { version = "0.1.0", path = "../utils" } + +log = { version = "0.4.20", features = ["std"] } +thiserror = "2.0.11" +uapi = "0.2.13" diff --git a/eventfd-cache/src/lib.rs b/eventfd-cache/src/lib.rs new file mode 100644 index 00000000..336b2fde --- /dev/null +++ b/eventfd-cache/src/lib.rs @@ -0,0 +1,157 @@ +use { + jay_async_engine::{AsyncEngine, SpawnedFuture}, + jay_io_uring::{IoUring, IoUringError}, + jay_utils::{ + buf::Buf, + errorfmt::ErrorFmt, + oserror::{OsError, OsErrorExt, OsErrorExt2}, + queue::AsyncQueue, + stack::Stack, + }, + std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll}, + thiserror::Error, + uapi::{OwnedFd, c}, +}; + +#[cfg(test)] +mod tests; + +#[derive(Debug, Error)] +pub enum EventfdError { + #[error("Could not create an eventfd")] + CreateEventfd(#[source] OsError), +} + +pub struct EventfdCache { + inner: Rc, + _task: SpawnedFuture<()>, +} + +struct Inner { + ring: Rc, + fds: Stack>, + recycle: AsyncQueue>, +} + +pub struct Eventfd { + cache: Rc, + pub fd: Rc, + signaled: Cell, +} + +impl EventfdCache { + pub fn new(ring: &Rc, eng: &Rc) -> Rc { + let inner = Rc::new(Inner { + ring: ring.clone(), + fds: Default::default(), + recycle: Default::default(), + }); + let task = eng.spawn("eventfd-cache", inner.clone().recycle()); + Rc::new(Self { inner, _task: task }) + } + + pub fn acquire(&self) -> Result { + let fd = match self.inner.fds.pop() { + Some(fd) => fd, + _ => uapi::eventfd(0, c::EFD_CLOEXEC) + .map(Rc::new) + .map_os_err(EventfdError::CreateEventfd)?, + }; + Ok(Eventfd { + cache: self.inner.clone(), + fd, + signaled: Default::default(), + }) + } +} + +impl Eventfd { + pub fn is_signaled(&self) -> bool { + self.signaled.get() + } + + pub async fn signaled(&self) -> Result<(), IoUringError> { + if self.signaled.get() { + return Ok(()); + } + self.cache.ring.readable(&self.fd).await?; + self.signaled.set(true); + Ok(()) + } + + pub fn signaled_blocking(&self) -> Result<(), OsError> { + if self.signaled.get() { + return Ok(()); + } + let mut pollfd = c::pollfd { + fd: self.fd.raw(), + events: c::POLLIN, + revents: 0, + }; + uapi::poll(slice::from_mut(&mut pollfd), -1).to_os_error()?; + self.signaled.set(true); + Ok(()) + } +} + +impl Inner { + async fn recycle(self: Rc) { + let slf = &*self; + let mut fds = vec![]; + let mut bufs = vec![]; + let mut tasks = vec![]; + let mut todo = vec![]; + loop { + fds.clear(); + tasks.clear(); + todo.clear(); + slf.recycle.non_empty().await; + while let Some(fd) = slf.recycle.try_pop() { + fds.push(fd); + } + for (idx, fd) in fds.iter().enumerate() { + if idx >= bufs.len() { + bufs.push(Buf::new(size_of::())); + } + let fd = fd.clone(); + let buf = bufs[idx].clone(); + tasks.push(async move { slf.ring.read(&fd, buf).await }); + todo.push(idx); + } + poll_fn(|ctx| { + let mut i = 0; + while i < todo.len() { + let idx = todo[i]; + let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) }; + if let Poll::Ready(res) = task.poll(ctx) { + todo.swap_remove(i); + match res { + Ok(_) => { + self.fds.push(fds[idx].clone()); + } + Err(e) => { + log::error!("Could not read from eventfd: {}", ErrorFmt(e)); + } + } + } else { + i += 1; + } + } + if todo.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + } + } +} + +impl Drop for Eventfd { + fn drop(&mut self) { + if self.signaled.get() { + self.cache.recycle.push(self.fd.clone()); + } + } +} diff --git a/src/eventfd_cache/tests.rs b/eventfd-cache/src/tests.rs similarity index 94% rename from src/eventfd_cache/tests.rs rename to eventfd-cache/src/tests.rs index 2c536f6d..2e6ad069 100644 --- a/src/eventfd_cache/tests.rs +++ b/eventfd-cache/src/tests.rs @@ -1,7 +1,8 @@ use { - crate::{ - async_engine::AsyncEngine, eventfd_cache::EventfdCache, io_uring::IoUring, utils::array, - }, + crate::EventfdCache, + jay_async_engine::AsyncEngine, + jay_io_uring::IoUring, + jay_utils::array, std::{rc::Rc, slice}, uapi::c, }; diff --git a/src/eventfd_cache.rs b/src/eventfd_cache.rs index d2e8b55a..4eee87e1 100644 --- a/src/eventfd_cache.rs +++ b/src/eventfd_cache.rs @@ -1,159 +1 @@ -use { - crate::{ - async_engine::{AsyncEngine, SpawnedFuture}, - io_uring::{IoUring, IoUringError}, - utils::{ - buf::Buf, - errorfmt::ErrorFmt, - oserror::{OsError, OsErrorExt, OsErrorExt2}, - queue::AsyncQueue, - stack::Stack, - }, - }, - std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll}, - thiserror::Error, - uapi::{OwnedFd, c}, -}; - -#[cfg(test)] -mod tests; - -#[derive(Debug, Error)] -pub enum EventfdError { - #[error("Could not create an eventfd")] - CreateEventfd(#[source] OsError), -} - -pub struct EventfdCache { - inner: Rc, - _task: SpawnedFuture<()>, -} - -struct Inner { - ring: Rc, - fds: Stack>, - recycle: AsyncQueue>, -} - -pub struct Eventfd { - cache: Rc, - pub fd: Rc, - signaled: Cell, -} - -impl EventfdCache { - pub fn new(ring: &Rc, eng: &Rc) -> Rc { - let inner = Rc::new(Inner { - ring: ring.clone(), - fds: Default::default(), - recycle: Default::default(), - }); - let task = eng.spawn("eventfd-cache", inner.clone().recycle()); - Rc::new(Self { inner, _task: task }) - } - - pub fn acquire(&self) -> Result { - let fd = match self.inner.fds.pop() { - Some(fd) => fd, - _ => uapi::eventfd(0, c::EFD_CLOEXEC) - .map(Rc::new) - .map_os_err(EventfdError::CreateEventfd)?, - }; - Ok(Eventfd { - cache: self.inner.clone(), - fd, - signaled: Default::default(), - }) - } -} - -impl Eventfd { - pub fn is_signaled(&self) -> bool { - self.signaled.get() - } - - pub async fn signaled(&self) -> Result<(), IoUringError> { - if self.signaled.get() { - return Ok(()); - } - self.cache.ring.readable(&self.fd).await?; - self.signaled.set(true); - Ok(()) - } - - pub fn signaled_blocking(&self) -> Result<(), OsError> { - if self.signaled.get() { - return Ok(()); - } - let mut pollfd = c::pollfd { - fd: self.fd.raw(), - events: c::POLLIN, - revents: 0, - }; - uapi::poll(slice::from_mut(&mut pollfd), -1).to_os_error()?; - self.signaled.set(true); - Ok(()) - } -} - -impl Inner { - async fn recycle(self: Rc) { - let slf = &*self; - let mut fds = vec![]; - let mut bufs = vec![]; - let mut tasks = vec![]; - let mut todo = vec![]; - loop { - fds.clear(); - tasks.clear(); - todo.clear(); - slf.recycle.non_empty().await; - while let Some(fd) = slf.recycle.try_pop() { - fds.push(fd); - } - for (idx, fd) in fds.iter().enumerate() { - if idx >= bufs.len() { - bufs.push(Buf::new(size_of::())); - } - let fd = fd.clone(); - let buf = bufs[idx].clone(); - tasks.push(async move { slf.ring.read(&fd, buf).await }); - todo.push(idx); - } - poll_fn(|ctx| { - let mut i = 0; - while i < todo.len() { - let idx = todo[i]; - let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) }; - if let Poll::Ready(res) = task.poll(ctx) { - todo.swap_remove(i); - match res { - Ok(_) => { - self.fds.push(fds[idx].clone()); - } - Err(e) => { - log::error!("Could not read from eventfd: {}", ErrorFmt(e)); - } - } - } else { - i += 1; - } - } - if todo.is_empty() { - Poll::Ready(()) - } else { - Poll::Pending - } - }) - .await; - } - } -} - -impl Drop for Eventfd { - fn drop(&mut self) { - if self.signaled.get() { - self.cache.recycle.push(self.fd.clone()); - } - } -} +pub use jay_eventfd_cache::*;