eventfd-cache: add new util
This commit is contained in:
parent
19edb3ea95
commit
80a69ba7ef
7 changed files with 240 additions and 0 deletions
|
|
@ -26,6 +26,7 @@ use {
|
||||||
damage::{DamageVisualizer, visualize_damage},
|
damage::{DamageVisualizer, visualize_damage},
|
||||||
dbus::Dbus,
|
dbus::Dbus,
|
||||||
ei::ei_client::EiClients,
|
ei::ei_client::EiClients,
|
||||||
|
eventfd_cache::EventfdCache,
|
||||||
forker,
|
forker,
|
||||||
format::XRGB8888,
|
format::XRGB8888,
|
||||||
gfx_api::GfxApi,
|
gfx_api::GfxApi,
|
||||||
|
|
@ -374,6 +375,7 @@ fn start_compositor2(
|
||||||
gfx_ctx_changed: Default::default(),
|
gfx_ctx_changed: Default::default(),
|
||||||
copy_device_registry: Rc::new(CopyDeviceRegistry::new(&ring, &engine)),
|
copy_device_registry: Rc::new(CopyDeviceRegistry::new(&ring, &engine)),
|
||||||
supports_presentation_feedback: Default::default(),
|
supports_presentation_feedback: Default::default(),
|
||||||
|
eventfd_cache: EventfdCache::new(&ring, &engine),
|
||||||
});
|
});
|
||||||
state.tracker.register(ClientId::from_raw(0));
|
state.tracker.register(ClientId::from_raw(0));
|
||||||
create_dummy_output(&state);
|
create_dummy_output(&state);
|
||||||
|
|
|
||||||
158
src/eventfd_cache.rs
Normal file
158
src/eventfd_cache.rs
Normal file
|
|
@ -0,0 +1,158 @@
|
||||||
|
use {
|
||||||
|
crate::{
|
||||||
|
async_engine::{AsyncEngine, SpawnedFuture},
|
||||||
|
io_uring::{IoUring, IoUringError},
|
||||||
|
utils::{buf::Buf, errorfmt::ErrorFmt, oserror::OsError, queue::AsyncQueue, stack::Stack},
|
||||||
|
},
|
||||||
|
std::{cell::Cell, future::poll_fn, pin::Pin, rc::Rc, slice, task::Poll},
|
||||||
|
thiserror::Error,
|
||||||
|
uapi::{OwnedFd, c},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum EventfdError {
|
||||||
|
#[error("Could not create an eventfd")]
|
||||||
|
CreateEventfd(#[source] OsError),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EventfdCache {
|
||||||
|
inner: Rc<Inner>,
|
||||||
|
_task: SpawnedFuture<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
ring: Rc<IoUring>,
|
||||||
|
fds: Stack<Rc<OwnedFd>>,
|
||||||
|
recycle: AsyncQueue<Rc<OwnedFd>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Eventfd {
|
||||||
|
cache: Rc<Inner>,
|
||||||
|
pub fd: Rc<OwnedFd>,
|
||||||
|
signaled: Cell<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventfdCache {
|
||||||
|
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Rc<Self> {
|
||||||
|
let inner = Rc::new(Inner {
|
||||||
|
ring: ring.clone(),
|
||||||
|
fds: Default::default(),
|
||||||
|
recycle: Default::default(),
|
||||||
|
});
|
||||||
|
let task = eng.spawn("eventfd-cache", inner.clone().recycle());
|
||||||
|
Rc::new(Self { inner, _task: task })
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(not(test), expect(dead_code))]
|
||||||
|
pub fn acquire(&self) -> Result<Eventfd, EventfdError> {
|
||||||
|
let fd = match self.inner.fds.pop() {
|
||||||
|
Some(fd) => fd,
|
||||||
|
_ => uapi::eventfd(0, c::EFD_CLOEXEC)
|
||||||
|
.map(Rc::new)
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(EventfdError::CreateEventfd)?,
|
||||||
|
};
|
||||||
|
Ok(Eventfd {
|
||||||
|
cache: self.inner.clone(),
|
||||||
|
fd,
|
||||||
|
signaled: Default::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eventfd {
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub fn is_signaled(&self) -> bool {
|
||||||
|
self.signaled.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(not(test), expect(dead_code))]
|
||||||
|
pub async fn signaled(&self) -> Result<(), IoUringError> {
|
||||||
|
if self.signaled.get() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.cache.ring.readable(&self.fd).await?;
|
||||||
|
self.signaled.set(true);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub fn signaled_blocking(&self) -> Result<(), OsError> {
|
||||||
|
if self.signaled.get() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let mut pollfd = c::pollfd {
|
||||||
|
fd: self.fd.raw(),
|
||||||
|
events: c::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
};
|
||||||
|
uapi::poll(slice::from_mut(&mut pollfd), -1)?;
|
||||||
|
self.signaled.set(true);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inner {
|
||||||
|
async fn recycle(self: Rc<Self>) {
|
||||||
|
let slf = &*self;
|
||||||
|
let mut fds = vec![];
|
||||||
|
let mut bufs = vec![];
|
||||||
|
let mut tasks = vec![];
|
||||||
|
let mut todo = vec![];
|
||||||
|
loop {
|
||||||
|
fds.clear();
|
||||||
|
tasks.clear();
|
||||||
|
todo.clear();
|
||||||
|
slf.recycle.non_empty().await;
|
||||||
|
while let Some(fd) = slf.recycle.try_pop() {
|
||||||
|
fds.push(fd);
|
||||||
|
}
|
||||||
|
for (idx, fd) in fds.iter().enumerate() {
|
||||||
|
if idx >= bufs.len() {
|
||||||
|
bufs.push(Buf::new(size_of::<u64>()));
|
||||||
|
}
|
||||||
|
let fd = fd.clone();
|
||||||
|
let buf = bufs[idx].clone();
|
||||||
|
tasks.push(async move { slf.ring.read(&fd, buf).await });
|
||||||
|
todo.push(idx);
|
||||||
|
}
|
||||||
|
poll_fn(|ctx| {
|
||||||
|
let mut i = 0;
|
||||||
|
while i < todo.len() {
|
||||||
|
let idx = todo[i];
|
||||||
|
let task = unsafe { Pin::new_unchecked(&mut tasks[idx]) };
|
||||||
|
if let Poll::Ready(res) = task.poll(ctx) {
|
||||||
|
todo.swap_remove(i);
|
||||||
|
match res {
|
||||||
|
Ok(_) => {
|
||||||
|
self.fds.push(fds[idx].clone());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Could not read from eventfd: {}", ErrorFmt(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if todo.is_empty() {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Eventfd {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.signaled.get() {
|
||||||
|
self.cache.recycle.push(self.fd.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
66
src/eventfd_cache/tests.rs
Normal file
66
src/eventfd_cache/tests.rs
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
use {
|
||||||
|
crate::{
|
||||||
|
async_engine::AsyncEngine, eventfd_cache::EventfdCache, io_uring::IoUring, utils::array,
|
||||||
|
},
|
||||||
|
std::{rc::Rc, slice},
|
||||||
|
uapi::c,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test() {
|
||||||
|
let eng = AsyncEngine::new();
|
||||||
|
let ring = IoUring::new(&eng, 32).unwrap();
|
||||||
|
let cache = Rc::new(EventfdCache::new(&ring, &eng));
|
||||||
|
const TOTAL: usize = 5;
|
||||||
|
let signaled = 3;
|
||||||
|
let fd1: [_; TOTAL] = array::from_fn(|_| cache.acquire().unwrap());
|
||||||
|
let fd2: [_; TOTAL] = array::from_fn(|_| cache.acquire().unwrap());
|
||||||
|
for fd in fd1.iter().chain(fd2.iter()) {
|
||||||
|
uapi::eventfd_write(fd.fd.raw(), 1).unwrap();
|
||||||
|
let mut poll = c::pollfd {
|
||||||
|
fd: fd.fd.raw(),
|
||||||
|
events: c::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
};
|
||||||
|
uapi::poll(slice::from_mut(&mut poll), 0).unwrap();
|
||||||
|
assert_eq!(poll.revents, c::POLLIN);
|
||||||
|
}
|
||||||
|
assert_eq!(cache.inner.fds.len(), 0);
|
||||||
|
let ring2 = ring.clone();
|
||||||
|
let cache2 = cache.clone();
|
||||||
|
let _fut1 = eng.spawn("", async move {
|
||||||
|
for i in 0..signaled {
|
||||||
|
fd1[i].signaled().await.unwrap();
|
||||||
|
}
|
||||||
|
drop(fd1);
|
||||||
|
let debouncer = ring2.debouncer(0);
|
||||||
|
while cache2.inner.fds.len() != signaled {
|
||||||
|
debouncer.debounce().await;
|
||||||
|
}
|
||||||
|
for i in 0..signaled {
|
||||||
|
fd2[i].signaled().await.unwrap();
|
||||||
|
}
|
||||||
|
drop(fd2);
|
||||||
|
while cache2.inner.fds.len() != 2 * signaled {
|
||||||
|
debouncer.debounce().await;
|
||||||
|
}
|
||||||
|
ring2.stop();
|
||||||
|
});
|
||||||
|
let now_nsec = eng.now().nsec();
|
||||||
|
let ring2 = ring.clone();
|
||||||
|
let _fut2 = eng.spawn("", async move {
|
||||||
|
ring2.timeout(now_nsec + 1_000_000_000).await.unwrap();
|
||||||
|
ring2.stop();
|
||||||
|
});
|
||||||
|
ring.run().unwrap();
|
||||||
|
assert_eq!(cache.inner.fds.len(), 2 * signaled);
|
||||||
|
for fd in cache.inner.fds.take() {
|
||||||
|
let mut poll = c::pollfd {
|
||||||
|
fd: fd.raw(),
|
||||||
|
events: c::POLLIN,
|
||||||
|
revents: 0,
|
||||||
|
};
|
||||||
|
uapi::poll(slice::from_mut(&mut poll), 0).unwrap();
|
||||||
|
assert_eq!(poll.revents, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -68,6 +68,7 @@ mod dbus;
|
||||||
mod drm_feedback;
|
mod drm_feedback;
|
||||||
mod edid;
|
mod edid;
|
||||||
mod ei;
|
mod ei;
|
||||||
|
mod eventfd_cache;
|
||||||
mod fixed;
|
mod fixed;
|
||||||
mod forker;
|
mod forker;
|
||||||
mod format;
|
mod format;
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ use {
|
||||||
BUS_DEST, BUS_PATH, DBUS_NAME_FLAG_DO_NOT_QUEUE, DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER,
|
BUS_DEST, BUS_PATH, DBUS_NAME_FLAG_DO_NOT_QUEUE, DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER,
|
||||||
Dbus, DbusSocket,
|
Dbus, DbusSocket,
|
||||||
},
|
},
|
||||||
|
eventfd_cache::EventfdCache,
|
||||||
forker::ForkerError,
|
forker::ForkerError,
|
||||||
io_uring::IoUring,
|
io_uring::IoUring,
|
||||||
logger::Logger,
|
logger::Logger,
|
||||||
|
|
@ -242,9 +243,11 @@ async fn run_async(
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let eventfd_cache = EventfdCache::new(&ring, &eng);
|
||||||
let state = Rc::new(PortalState {
|
let state = Rc::new(PortalState {
|
||||||
xrd,
|
xrd,
|
||||||
ring,
|
ring,
|
||||||
|
eventfd_cache,
|
||||||
eng,
|
eng,
|
||||||
wheel,
|
wheel,
|
||||||
displays: Default::default(),
|
displays: Default::default(),
|
||||||
|
|
@ -324,6 +327,8 @@ async fn init_dbus_session(dbus: &Dbus, logger: Arc<Logger>, path_sink: OwnedFd)
|
||||||
struct PortalState {
|
struct PortalState {
|
||||||
xrd: String,
|
xrd: String,
|
||||||
ring: Rc<IoUring>,
|
ring: Rc<IoUring>,
|
||||||
|
#[expect(dead_code)]
|
||||||
|
eventfd_cache: Rc<EventfdCache>,
|
||||||
eng: Rc<AsyncEngine>,
|
eng: Rc<AsyncEngine>,
|
||||||
wheel: Rc<Wheel>,
|
wheel: Rc<Wheel>,
|
||||||
displays: CopyHashMap<PortalDisplayId, Rc<PortalDisplay>>,
|
displays: CopyHashMap<PortalDisplayId, Rc<PortalDisplay>>,
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ use {
|
||||||
ei_acceptor::EiAcceptor,
|
ei_acceptor::EiAcceptor,
|
||||||
ei_client::{EiClient, EiClients},
|
ei_client::{EiClient, EiClients},
|
||||||
},
|
},
|
||||||
|
eventfd_cache::EventfdCache,
|
||||||
fixed::Fixed,
|
fixed::Fixed,
|
||||||
forker::ForkerProxy,
|
forker::ForkerProxy,
|
||||||
format::Format,
|
format::Format,
|
||||||
|
|
@ -288,6 +289,8 @@ pub struct State {
|
||||||
pub gfx_ctx_changed: EventSource<WlBuffer>,
|
pub gfx_ctx_changed: EventSource<WlBuffer>,
|
||||||
pub copy_device_registry: Rc<CopyDeviceRegistry>,
|
pub copy_device_registry: Rc<CopyDeviceRegistry>,
|
||||||
pub supports_presentation_feedback: Cell<bool>,
|
pub supports_presentation_feedback: Cell<bool>,
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub eventfd_cache: Rc<EventfdCache>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// impl Drop for State {
|
// impl Drop for State {
|
||||||
|
|
|
||||||
|
|
@ -42,4 +42,9 @@ impl<T> Stack<T> {
|
||||||
pub fn take(&self) -> Vec<T> {
|
pub fn take(&self) -> Vec<T> {
|
||||||
unsafe { mem::take(self.vec.get().deref_mut()) }
|
unsafe { mem::take(self.vec.get().deref_mut()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(not(test), expect(dead_code))]
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
unsafe { self.vec.get().deref().len() }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue