diff --git a/src/compositor.rs b/src/compositor.rs index 312ccbba..3a2957ff 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -26,6 +26,7 @@ use { damage::{DamageVisualizer, visualize_damage}, dbus::Dbus, ei::ei_client::EiClients, + eventfd_cache::EventfdCache, forker, format::XRGB8888, gfx_api::GfxApi, @@ -374,6 +375,7 @@ fn start_compositor2( gfx_ctx_changed: Default::default(), copy_device_registry: Rc::new(CopyDeviceRegistry::new(&ring, &engine)), supports_presentation_feedback: Default::default(), + eventfd_cache: EventfdCache::new(&ring, &engine), }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); diff --git a/src/eventfd_cache.rs b/src/eventfd_cache.rs new file mode 100644 index 00000000..b282ea7c --- /dev/null +++ b/src/eventfd_cache.rs @@ -0,0 +1,158 @@ +use { + crate::{ + async_engine::{AsyncEngine, SpawnedFuture}, + io_uring::{IoUring, IoUringError}, + utils::{buf::Buf, errorfmt::ErrorFmt, oserror::OsError, queue::AsyncQueue, stack::Stack}, + }, + std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll}, + thiserror::Error, + uapi::{OwnedFd, c}, +}; + +#[cfg(test)] +mod tests; + +#[derive(Debug, Error)] +pub enum EventfdError { + #[error("Could not create an eventfd")] + CreateEventfd(#[source] OsError), +} + +pub struct EventfdCache { + inner: Rc, + _task: SpawnedFuture<()>, +} + +struct Inner { + ring: Rc, + fds: Stack>, + recycle: AsyncQueue>, +} + +pub struct Eventfd { + cache: Rc, + pub fd: Rc, + signaled: Cell, +} + +impl EventfdCache { + pub fn new(ring: &Rc, eng: &Rc) -> Rc { + let inner = Rc::new(Inner { + ring: ring.clone(), + fds: Default::default(), + recycle: Default::default(), + }); + let task = eng.spawn("eventfd-cache", inner.clone().recycle()); + Rc::new(Self { inner, _task: task }) + } + + #[cfg_attr(not(test), expect(dead_code))] + pub fn acquire(&self) -> Result { + let fd = match self.inner.fds.pop() { + Some(fd) => fd, + _ => uapi::eventfd(0, c::EFD_CLOEXEC) + .map(Rc::new) + .map_err(Into::into) + .map_err(EventfdError::CreateEventfd)?, + }; + Ok(Eventfd { + cache: self.inner.clone(), + fd, + signaled: Default::default(), + }) + } +} + +impl Eventfd { + #[expect(dead_code)] + pub fn is_signaled(&self) -> bool { + self.signaled.get() + } + + #[cfg_attr(not(test), expect(dead_code))] + pub async fn signaled(&self) -> Result<(), IoUringError> { + if self.signaled.get() { + return Ok(()); + } + self.cache.ring.readable(&self.fd).await?; + self.signaled.set(true); + Ok(()) + } + + #[expect(dead_code)] + pub fn signaled_blocking(&self) -> Result<(), OsError> { + if self.signaled.get() { + return Ok(()); + } + let mut pollfd = c::pollfd { + fd: self.fd.raw(), + events: c::POLLIN, + revents: 0, + }; + uapi::poll(slice::from_mut(&mut pollfd), -1)?; + self.signaled.set(true); + Ok(()) + } +} + +impl Inner { + async fn recycle(self: Rc) { + let slf = &*self; + let mut fds = vec![]; + let mut bufs = vec![]; + let mut tasks = vec![]; + let mut todo = vec![]; + loop { + fds.clear(); + tasks.clear(); + todo.clear(); + slf.recycle.non_empty().await; + while let Some(fd) = slf.recycle.try_pop() { + fds.push(fd); + } + for (idx, fd) in fds.iter().enumerate() { + if idx >= bufs.len() { + bufs.push(Buf::new(size_of::())); + } + let fd = fd.clone(); + let buf = bufs[idx].clone(); + tasks.push(async move { slf.ring.read(&fd, buf).await }); + todo.push(idx); + } + poll_fn(|ctx| { + let mut i = 0; + while i < todo.len() { + let idx = todo[i]; + let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) }; + if let Poll::Ready(res) = task.poll(ctx) { + todo.swap_remove(i); + match res { + Ok(_) => { + self.fds.push(fds[idx].clone()); + } + Err(e) => { + log::error!("Could not read from eventfd: {}", ErrorFmt(e)); + } + } + } else { + i += 1; + } + } + if todo.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + } + } +} + +impl Drop for Eventfd { + fn drop(&mut self) { + if self.signaled.get() { + self.cache.recycle.push(self.fd.clone()); + } + } +} diff --git a/src/eventfd_cache/tests.rs b/src/eventfd_cache/tests.rs new file mode 100644 index 00000000..2c536f6d --- /dev/null +++ b/src/eventfd_cache/tests.rs @@ -0,0 +1,66 @@ +use { + crate::{ + async_engine::AsyncEngine, eventfd_cache::EventfdCache, io_uring::IoUring, utils::array, + }, + std::{rc::Rc, slice}, + uapi::c, +}; + +#[test] +fn test() { + let eng = AsyncEngine::new(); + let ring = IoUring::new(&eng, 32).unwrap(); + let cache = Rc::new(EventfdCache::new(&ring, &eng)); + const TOTAL: usize = 5; + let signaled = 3; + let fd1: [_; TOTAL] = array::from_fn(|_| cache.acquire().unwrap()); + let fd2: [_; TOTAL] = array::from_fn(|_| cache.acquire().unwrap()); + for fd in fd1.iter().chain(fd2.iter()) { + uapi::eventfd_write(fd.fd.raw(), 1).unwrap(); + let mut poll = c::pollfd { + fd: fd.fd.raw(), + events: c::POLLIN, + revents: 0, + }; + uapi::poll(slice::from_mut(&mut poll), 0).unwrap(); + assert_eq!(poll.revents, c::POLLIN); + } + assert_eq!(cache.inner.fds.len(), 0); + let ring2 = ring.clone(); + let cache2 = cache.clone(); + let _fut1 = eng.spawn("", async move { + for i in 0..signaled { + fd1[i].signaled().await.unwrap(); + } + drop(fd1); + let debouncer = ring2.debouncer(0); + while cache2.inner.fds.len() != signaled { + debouncer.debounce().await; + } + for i in 0..signaled { + fd2[i].signaled().await.unwrap(); + } + drop(fd2); + while cache2.inner.fds.len() != 2 * signaled { + debouncer.debounce().await; + } + ring2.stop(); + }); + let now_nsec = eng.now().nsec(); + let ring2 = ring.clone(); + let _fut2 = eng.spawn("", async move { + ring2.timeout(now_nsec + 1_000_000_000).await.unwrap(); + ring2.stop(); + }); + ring.run().unwrap(); + assert_eq!(cache.inner.fds.len(), 2 * signaled); + for fd in cache.inner.fds.take() { + let mut poll = c::pollfd { + fd: fd.raw(), + events: c::POLLIN, + revents: 0, + }; + uapi::poll(slice::from_mut(&mut poll), 0).unwrap(); + assert_eq!(poll.revents, 0); + } +} diff --git a/src/main.rs b/src/main.rs index ad8c8db2..129978a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,6 +68,7 @@ mod dbus; mod drm_feedback; mod edid; mod ei; +mod eventfd_cache; mod fixed; mod forker; mod format; diff --git a/src/portal.rs b/src/portal.rs index f3e57a5a..6ba3ceae 100644 --- a/src/portal.rs +++ b/src/portal.rs @@ -15,6 +15,7 @@ use { BUS_DEST, BUS_PATH, DBUS_NAME_FLAG_DO_NOT_QUEUE, DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER, Dbus, DbusSocket, }, + eventfd_cache::EventfdCache, forker::ForkerError, io_uring::IoUring, logger::Logger, @@ -242,9 +243,11 @@ async fn run_async( None } }; + let eventfd_cache = EventfdCache::new(&ring, &eng); let state = Rc::new(PortalState { xrd, ring, + eventfd_cache, eng, wheel, displays: Default::default(), @@ -324,6 +327,8 @@ async fn init_dbus_session(dbus: &Dbus, logger: Arc, path_sink: OwnedFd) struct PortalState { xrd: String, ring: Rc, + #[expect(dead_code)] + eventfd_cache: Rc, eng: Rc, wheel: Rc, displays: CopyHashMap>, diff --git a/src/state.rs b/src/state.rs index eeaf2290..a063cef7 100644 --- a/src/state.rs +++ b/src/state.rs @@ -27,6 +27,7 @@ use { ei_acceptor::EiAcceptor, ei_client::{EiClient, EiClients}, }, + eventfd_cache::EventfdCache, fixed::Fixed, forker::ForkerProxy, format::Format, @@ -288,6 +289,8 @@ pub struct State { pub gfx_ctx_changed: EventSource, pub copy_device_registry: Rc, pub supports_presentation_feedback: Cell, + #[expect(dead_code)] + pub eventfd_cache: Rc, } // impl Drop for State { diff --git a/src/utils/stack.rs b/src/utils/stack.rs index f415a8c8..0ff7ee6f 100644 --- a/src/utils/stack.rs +++ b/src/utils/stack.rs @@ -42,4 +42,9 @@ impl Stack { pub fn take(&self) -> Vec { unsafe { mem::take(self.vec.get().deref_mut()) } } + + #[cfg_attr(not(test), expect(dead_code))] + pub fn len(&self) -> usize { + unsafe { self.vec.get().deref().len() } + } }