From 9416efeabe97720827d947c9dfa7eda81c27cad8 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Thu, 12 May 2022 22:50:33 +0200 Subject: [PATCH] runtime: replace epoll by io-uring --- src/acceptor.rs | 5 +- src/async_engine.rs | 162 ++++++++++++++------------- src/async_engine/ae_fd.rs | 196 --------------------------------- src/async_engine/ae_queue.rs | 115 ------------------- src/async_engine/ae_task.rs | 8 +- src/async_engine/ae_yield.rs | 4 +- src/backends/x.rs | 4 +- src/client/error.rs | 3 - src/compositor.rs | 21 ++-- src/config/handler.rs | 2 +- src/event_loop.rs | 208 ----------------------------------- src/forker.rs | 8 +- src/ifs/jay_compositor.rs | 2 +- src/io_uring.rs | 132 ++++++++++------------ src/main.rs | 1 - src/sighand.rs | 10 +- src/state.rs | 2 - src/tools/tool_client.rs | 20 +--- src/utils/queue.rs | 4 + src/wheel.rs | 4 +- src/xcon.rs | 4 +- 21 files changed, 173 insertions(+), 742 deletions(-) delete mode 100644 src/async_engine/ae_fd.rs delete mode 100644 src/async_engine/ae_queue.rs delete mode 100644 src/event_loop.rs diff --git a/src/acceptor.rs b/src/acceptor.rs index bc35c93f..77f82531 100644 --- a/src/acceptor.rs +++ b/src/acceptor.rs @@ -1,7 +1,6 @@ use { crate::{ async_engine::SpawnedFuture, - event_loop::EventLoopError, state::State, utils::{errorfmt::ErrorFmt, oserror::OsError, xrd::xrd}, }, @@ -30,8 +29,6 @@ pub enum AcceptorError { BindFailed(#[source] OsError), #[error("All wayland addresses in the range 0..1000 are already in use")] AddressesInUse, - #[error("The event loop caused an error")] - EventLoopError(#[from] EventLoopError), } pub struct Acceptor { @@ -205,5 +202,5 @@ async fn accept(fd: Rc, state: Rc, secure: bool) { } } } - state.el.stop(); + state.ring.stop(); } diff --git a/src/async_engine.rs b/src/async_engine.rs index bf7b50ac..f8faf271 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -1,35 +1,15 @@ -mod ae_fd; -mod ae_queue; mod ae_task; mod ae_yield; -pub use { - crate::async_engine::ae_yield::Yield, - ae_fd::{AsyncFd, FdStatus}, - ae_task::SpawnedFuture, -}; +pub use {crate::async_engine::ae_yield::Yield, ae_task::SpawnedFuture}; use { crate::{ - event_loop::{EventLoop, EventLoopError}, - utils::{copyhashmap::CopyHashMap, numcell::NumCell}, + async_engine::ae_task::Runnable, + utils::{array, numcell::NumCell, syncqueue::SyncQueue}, }, - ae_fd::AsyncFdData, - ae_queue::{DispatchQueue, Dispatcher}, - std::{ - cell::{Cell, RefCell}, - future::Future, - rc::Rc, - }, - thiserror::Error, - uapi::OwnedFd, + std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker}, }; -#[derive(Debug, Error)] -pub enum AsyncError { - #[error("The event loop caused an error")] - EventLoopError(#[from] EventLoopError), -} - #[derive(Copy, Clone, Eq, PartialEq)] pub enum Phase { EventHandling, @@ -40,70 +20,88 @@ pub enum Phase { const NUM_PHASES: usize = 4; pub struct AsyncEngine { - el: Rc, - queue: Rc, - fds: CopyHashMap>, + num_queued: NumCell, + queues: [SyncQueue; NUM_PHASES], + iteration: NumCell, + yields: SyncQueue, + stash: RefCell>, + yield_stash: RefCell>, } impl AsyncEngine { - pub fn install(el: &Rc) -> Result, AsyncError> { - let queue = Dispatcher::install(el)?; - Ok(Rc::new(Self { - el: el.clone(), - queue, - fds: CopyHashMap::new(), - })) - } - - pub fn clear(&self) { - for (_, fd) in self.fds.lock().drain() { - fd.readers.take(); - fd.writers.take(); - } - self.queue.clear(); - } - - pub fn spawn + 'static>(&self, f: F) -> SpawnedFuture { - self.queue.spawn(Phase::EventHandling, f) - } - - pub fn spawn2 + 'static>( - &self, - phase: Phase, - f: F, - ) -> SpawnedFuture { - self.queue.spawn(phase, f) - } - - pub fn fd(self: &Rc, fd: &Rc) -> Result { - let data = if let Some(afd) = self.fds.get(&fd.raw()) { - afd.ref_count.fetch_add(1); - afd - } else { - let id = self.el.id(); - let afd = Rc::new(AsyncFdData { - ref_count: NumCell::new(1), - fd: fd.clone(), - id, - el: self.el.clone(), - write_registered: Cell::new(false), - read_registered: Cell::new(false), - readers: RefCell::new(vec![]), - writers: RefCell::new(vec![]), - }); - self.el.insert(id, Some(fd.raw()), 0, afd.clone())?; - afd - }; - Ok(AsyncFd { - engine: self.clone(), - data, + pub fn new() -> Rc { + Rc::new(Self { + num_queued: Default::default(), + queues: array::from_fn(|_| Default::default()), + iteration: Default::default(), + yields: Default::default(), + stash: Default::default(), + yield_stash: Default::default(), }) } - pub fn yield_now(&self) -> Yield { - Yield { - iteration: self.queue.iteration(), - queue: self.queue.clone(), + pub fn clear(&self) { + self.stash.borrow_mut().clear(); + self.yield_stash.borrow_mut().clear(); + self.yields.take(); + for queue in &self.queues { + queue.take(); } } + + pub fn spawn + 'static>(self: &Rc, f: F) -> SpawnedFuture { + self.spawn_(Phase::EventHandling, f) + } + + pub fn spawn2 + 'static>( + self: &Rc, + phase: Phase, + f: F, + ) -> SpawnedFuture { + self.spawn_(phase, f) + } + + pub fn yield_now(self: &Rc) -> Yield { + Yield { + iteration: self.iteration(), + queue: self.clone(), + } + } + + pub fn dispatch(&self) { + let mut stash = self.stash.borrow_mut(); + let mut yield_stash = self.yield_stash.borrow_mut(); + while self.num_queued.get() > 0 { + self.iteration.fetch_add(1); + let mut phase = 0; + while phase < NUM_PHASES as usize { + self.queues[phase].swap(&mut *stash); + if stash.is_empty() { + phase += 1; + continue; + } + self.num_queued.fetch_sub(stash.len()); + for runnable in stash.drain(..) { + runnable.run(); + } + } + self.yields.swap(&mut *yield_stash); + for waker in yield_stash.drain(..) { + waker.wake(); + } + } + } + + fn push(&self, runnable: Runnable, phase: Phase) { + self.queues[phase as usize].push(runnable); + self.num_queued.fetch_add(1); + } + + fn push_yield(&self, waker: Waker) { + self.yields.push(waker); + } + + fn iteration(&self) -> u64 { + self.iteration.get() + } } diff --git a/src/async_engine/ae_fd.rs b/src/async_engine/ae_fd.rs deleted file mode 100644 index ad3e08ce..00000000 --- a/src/async_engine/ae_fd.rs +++ /dev/null @@ -1,196 +0,0 @@ -use { - crate::{ - async_engine::{AsyncEngine, AsyncError}, - event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId}, - utils::numcell::NumCell, - }, - std::{ - cell::{Cell, RefCell}, - error::Error, - fmt::{Debug, Formatter}, - future::Future, - pin::Pin, - rc::Rc, - task::{Context, Poll, Waker}, - }, - uapi::{c, OwnedFd}, -}; - -type Queue = RefCell>>)>>; - -pub(super) struct AsyncFdData { - pub(super) ref_count: NumCell, - pub(super) fd: Rc, - pub(super) id: EventLoopId, - pub(super) el: Rc, - pub(super) write_registered: Cell, - pub(super) read_registered: Cell, - pub(super) readers: Queue, - pub(super) writers: Queue, -} - -impl AsyncFdData { - fn update_interests(&self) -> Result<(), EventLoopError> { - let mut events = 0; - if self.write_registered.get() { - events |= c::EPOLLOUT; - } - if self.read_registered.get() { - events |= c::EPOLLIN; - } - let res = self.el.modify(self.id, events); - if res.is_err() { - if let Err(e) = self.el.remove(self.id) { - log::error!( - "Fatal error: Cannot remove file descriptor from event loop: {:?}", - e - ); - self.el.stop(); - } - } - res - } - - fn poll( - &self, - woken: &Rc>>, - cx: &mut Context<'_>, - registered: impl Fn(&AsyncFdData) -> &Cell, - queue: impl Fn(&AsyncFdData) -> &Queue, - ) -> Poll> { - if let Some(status) = woken.get() { - return Poll::Ready(Ok(status)); - } - if !registered(self).get() { - registered(self).set(true); - if let Err(e) = self.update_interests() { - return Poll::Ready(Err(AsyncError::EventLoopError(e))); - } - } - queue(self) - .borrow_mut() - .push((cx.waker().clone(), woken.clone())); - Poll::Pending - } -} - -impl EventLoopDispatcher for AsyncFdData { - fn dispatch(self: Rc, _fd: Option, events: i32) -> Result<(), Box> { - let mut status = FdStatus::Ok; - if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { - status = FdStatus::Err; - if let Err(e) = self.el.remove(self.id) { - return Err(Box::new(e)); - } - } - let mut woke_any = false; - if events & c::EPOLLIN != 0 || status == FdStatus::Err { - let mut readers = self.readers.borrow_mut(); - woke_any |= !readers.is_empty(); - for (waker, woken) in readers.drain(..) { - woken.set(Some(status)); - waker.wake(); - } - } - if events & c::EPOLLOUT != 0 || status == FdStatus::Err { - let mut writers = self.writers.borrow_mut(); - woke_any |= !writers.is_empty(); - for (waker, woken) in writers.drain(..) { - woken.set(Some(status)); - waker.wake(); - } - } - if !woke_any && status == FdStatus::Ok { - self.read_registered.set(false); - self.write_registered.set(false); - if let Err(e) = self.update_interests() { - return Err(Box::new(e)); - } - } - Ok(()) - } -} - -pub struct AsyncFd { - pub(super) engine: Rc, - pub(super) data: Rc, -} - -impl Debug for AsyncFd { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AsyncFd").finish_non_exhaustive() - } -} - -impl Clone for AsyncFd { - fn clone(&self) -> Self { - self.data.ref_count.fetch_add(1); - Self { - engine: self.engine.clone(), - data: self.data.clone(), - } - } -} - -impl Drop for AsyncFd { - fn drop(&mut self) { - if self.data.ref_count.fetch_sub(1) == 1 { - self.engine.fds.remove(&self.data.fd.raw()); - let _ = self.data.el.remove(self.data.id); - } - } -} - -impl AsyncFd { - pub fn raw(&self) -> i32 { - self.data.fd.raw() - } - - pub fn readable(&self) -> AsyncFdReadable { - AsyncFdReadable { - fd: self, - woken: Rc::new(Cell::new(None)), - } - } - - pub fn writable(&self) -> AsyncFdWritable { - AsyncFdWritable { - fd: self, - woken: Rc::new(Cell::new(None)), - } - } -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum FdStatus { - Ok, - Err, -} - -pub struct AsyncFdReadable<'a> { - fd: &'a AsyncFd, - woken: Rc>>, -} - -impl<'a> Future for AsyncFdReadable<'a> { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let data = &self.fd.data; - data.poll(&self.woken, cx, |d| &d.read_registered, |d| &d.readers) - } -} - -pub struct AsyncFdWritable<'a> { - fd: &'a AsyncFd, - woken: Rc>>, -} - -impl<'a> Future for AsyncFdWritable<'a> { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let data = &self.fd.data; - data.poll(&self.woken, cx, |d| &d.write_registered, |d| &d.writers) - } -} diff --git a/src/async_engine/ae_queue.rs b/src/async_engine/ae_queue.rs deleted file mode 100644 index 58c4f9ea..00000000 --- a/src/async_engine/ae_queue.rs +++ /dev/null @@ -1,115 +0,0 @@ -use { - crate::{ - async_engine::{ae_task::Runnable, AsyncError, Phase, NUM_PHASES}, - event_loop::{EventLoop, EventLoopDispatcher, EventLoopId}, - utils::{array, numcell::NumCell, syncqueue::SyncQueue}, - }, - std::{ - cell::{Cell, RefCell}, - collections::VecDeque, - error::Error, - rc::Rc, - task::Waker, - }, -}; - -pub(super) struct Dispatcher { - queue: Rc, - stash: RefCell>, - yield_stash: RefCell>, -} - -impl Dispatcher { - pub fn install(el: &Rc) -> Result, AsyncError> { - let id = el.id(); - let queue = Rc::new(DispatchQueue { - id, - el: el.clone(), - dispatch_scheduled: Cell::new(false), - num_queued: Default::default(), - queues: array::from_fn(|_| Default::default()), - iteration: Default::default(), - yields: Default::default(), - }); - let slf = Rc::new(Dispatcher { - queue: queue.clone(), - stash: Default::default(), - yield_stash: Default::default(), - }); - el.insert(id, None, 0, slf)?; - Ok(queue) - } -} - -impl EventLoopDispatcher for Dispatcher { - fn dispatch(self: Rc, _fd: Option, _events: i32) -> Result<(), Box> { - let mut stash = self.stash.borrow_mut(); - let mut yield_stash = self.yield_stash.borrow_mut(); - while self.queue.num_queued.get() > 0 { - self.queue.iteration.fetch_add(1); - let mut phase = 0; - while phase < NUM_PHASES as usize { - self.queue.queues[phase].swap(&mut *stash); - if stash.is_empty() { - phase += 1; - continue; - } - self.queue.num_queued.fetch_sub(stash.len()); - for runnable in stash.drain(..) { - runnable.run(); - } - } - self.queue.yields.swap(&mut *yield_stash); - for waker in yield_stash.drain(..) { - waker.wake(); - } - } - self.queue.dispatch_scheduled.set(false); - Ok(()) - } -} - -impl Drop for Dispatcher { - fn drop(&mut self) { - let _ = self.queue.el.remove(self.queue.id); - for queue in &self.queue.queues { - queue.swap(&mut VecDeque::new()); - } - } -} - -pub(super) struct DispatchQueue { - dispatch_scheduled: Cell, - id: EventLoopId, - el: Rc, - num_queued: NumCell, - queues: [SyncQueue; NUM_PHASES], - iteration: NumCell, - yields: SyncQueue, -} - -impl DispatchQueue { - pub fn clear(&self) { - self.yields.take(); - for queue in &self.queues { - queue.take(); - } - } - - pub fn push(&self, runnable: Runnable, phase: Phase) { - self.queues[phase as usize].push(runnable); - self.num_queued.fetch_add(1); - if !self.dispatch_scheduled.get() { - let _ = self.el.schedule(self.id); - self.dispatch_scheduled.set(true); - } - } - - pub fn push_yield(&self, waker: Waker) { - self.yields.push(waker); - } - - pub fn iteration(&self) -> u64 { - self.iteration.get() - } -} diff --git a/src/async_engine/ae_task.rs b/src/async_engine/ae_task.rs index a147a99e..4dfee337 100644 --- a/src/async_engine/ae_task.rs +++ b/src/async_engine/ae_task.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{ae_queue::DispatchQueue, Phase}, + async_engine::{AsyncEngine, Phase}, utils::{ numcell::NumCell, ptr_ext::{MutPtrExt, PtrExt}, @@ -94,7 +94,7 @@ struct Task> { state: NumCell, data: UnsafeCell>, waker: Cell>, - queue: Rc, + queue: Rc, } pub(super) struct Runnable { @@ -119,8 +119,8 @@ impl Drop for Runnable { } } -impl DispatchQueue { - pub(super) fn spawn>( +impl AsyncEngine { + pub(super) fn spawn_>( self: &Rc, phase: Phase, f: F, diff --git a/src/async_engine/ae_yield.rs b/src/async_engine/ae_yield.rs index 0e38affd..7ada5c81 100644 --- a/src/async_engine/ae_yield.rs +++ b/src/async_engine/ae_yield.rs @@ -1,5 +1,5 @@ use { - crate::async_engine::ae_queue::DispatchQueue, + crate::async_engine::AsyncEngine, std::{ future::Future, pin::Pin, @@ -10,7 +10,7 @@ use { pub struct Yield { pub(super) iteration: u64, - pub(super) queue: Rc, + pub(super) queue: Rc, } impl Future for Yield { diff --git a/src/backends/x.rs b/src/backends/x.rs index 3f2300e8..6acc86a7 100644 --- a/src/backends/x.rs +++ b/src/backends/x.rs @@ -289,7 +289,7 @@ impl XBackend { "Fatal error: Could not handle an event from the X server: {}", ErrorFmt(e) ); - self.state.el.stop(); + self.state.ring.stop(); return; } } @@ -851,7 +851,7 @@ impl XBackend { } fn handle_destroy(&self, event: &Event) -> Result<(), XBackendError> { - self.state.el.stop(); + self.state.ring.stop(); let event: DestroyNotify = event.parse()?; let output = match self.outputs.remove(&event.event) { Some(o) => o, diff --git a/src/client/error.rs b/src/client/error.rs index 54b4b9de..feaff0ac 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -1,6 +1,5 @@ use { crate::{ - async_engine::AsyncError, client::ClientId, object::{Interface, ObjectId}, utils::buffd::{BufFdError, MsgParserError}, @@ -12,8 +11,6 @@ use { #[derive(Debug, Error)] pub enum ClientError { - #[error("An error occurred in the async engine")] - Async(#[from] AsyncError), #[error("An error occurred reading from/writing to the client")] Io(#[from] BufFdError), #[error("An error occurred while processing a request")] diff --git a/src/compositor.rs b/src/compositor.rs index 0f0e6d6e..86188129 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -3,7 +3,7 @@ use crate::it::test_backend::TestBackend; use { crate::{ acceptor::{Acceptor, AcceptorError}, - async_engine::{AsyncEngine, AsyncError, Phase, SpawnedFuture}, + async_engine::{AsyncEngine, Phase, SpawnedFuture}, backend::{self, Backend}, backends::{ dummy::{DummyBackend, DummyOutput}, @@ -14,7 +14,6 @@ use { clientmem::{self, ClientMemError}, config::ConfigProxy, dbus::Dbus, - event_loop::{EventLoop, EventLoopError}, forker, globals::Globals, ifs::{wl_output::WlOutputGlobal, wl_surface::NoneSurfaceExt}, @@ -76,16 +75,12 @@ fn create_forker() -> Rc { pub enum CompositorError { #[error("The client acceptor caused an error")] AcceptorError(#[from] AcceptorError), - #[error("The event loop caused an error")] - EventLoopError(#[from] EventLoopError), #[error("The signal handler caused an error")] SighandError(#[from] SighandError), #[error("The clientmem subsystem caused an error")] ClientmemError(#[from] ClientMemError), #[error("The timer subsystem caused an error")] WheelError(#[from] WheelError), - #[error("The async subsystem caused an error")] - AsyncError(#[from] AsyncError), #[error("The render backend caused an error")] RenderError(#[from] RenderError), #[error("Could not create an io-uring")] @@ -114,12 +109,11 @@ fn start_compositor2( leaks::init(); render::init()?; clientmem::init()?; - let el = EventLoop::new()?; let xkb_ctx = XkbContext::new().unwrap(); let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap(); - let engine = AsyncEngine::install(&el)?; + let engine = AsyncEngine::new(); let ring = IoUring::new(&engine, 32)?; - let _signal_future = sighand::install(&el, &engine, &ring)?; + let _signal_future = sighand::install(&engine, &ring)?; let wheel = Wheel::new(&engine, &ring)?; let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine); let node_ids = NodeIds::default(); @@ -129,7 +123,6 @@ fn start_compositor2( forker: Default::default(), default_keymap: xkb_keymap, eng: engine.clone(), - el: el.clone(), render_ctx: Default::default(), render_ctx_version: NumCell::new(1), render_ctx_ever_initialized: Cell::new(false), @@ -186,7 +179,7 @@ fn start_compositor2( tracker: Default::default(), data_offer_ids: Default::default(), drm_dev_ids: Default::default(), - ring, + ring: ring.clone(), }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); @@ -200,7 +193,7 @@ fn start_compositor2( forker.setenv(key.as_bytes(), val.as_bytes()); } let compositor = engine.spawn(start_compositor3(state.clone(), test_future)); - el.run()?; + ring.run()?; drop(compositor); drop(acceptor_future); drop(acceptor); @@ -222,7 +215,7 @@ async fn start_compositor3(state: Rc, test_future: Option) { Some(b) => b, _ => { log::error!("Could not create a backend"); - state.el.stop(); + state.ring.stop(); return; } }; @@ -249,7 +242,7 @@ async fn start_compositor3(state: Rc, test_future: Option) { Err(e) => log::error!("Backend failed: {}", ErrorFmt(e.deref())), _ => log::error!("Backend stopped without an error"), } - state.el.stop(); + state.ring.stop(); } fn load_config(state: &Rc, #[allow(unused_variables)] for_test: bool) -> ConfigProxy { diff --git a/src/config/handler.rs b/src/config/handler.rs index 84a366eb..956b7826 100644 --- a/src/config/handler.rs +++ b/src/config/handler.rs @@ -768,7 +768,7 @@ impl ConfigProxyHandler { fn handle_quit(&self) { log::info!("Quitting"); - self.state.el.stop(); + self.state.ring.stop(); } fn handle_switch_to(&self, vtnr: u32) { diff --git a/src/event_loop.rs b/src/event_loop.rs deleted file mode 100644 index a97dcf6c..00000000 --- a/src/event_loop.rs +++ /dev/null @@ -1,208 +0,0 @@ -use { - crate::utils::{clonecell::UnsafeCellCloneSafe, copyhashmap::CopyHashMap, numcell::NumCell}, - std::{ - cell::{Cell, RefCell}, - collections::VecDeque, - rc::Rc, - }, - thiserror::Error, - uapi::{c, Errno, OwnedFd}, -}; - -#[derive(Debug, Error)] -pub enum EventLoopError { - #[error("Could not create an epoll fd: {0}")] - CreateFailed(crate::utils::oserror::OsError), - #[error("epoll_wait failed: {0}")] - WaitFailed(crate::utils::oserror::OsError), - #[error("A dispatcher returned a fatal error: {0}")] - DispatcherError(Box), - #[error("Could not insert an fd to wait on: {0}")] - InsertFailed(crate::utils::oserror::OsError), - #[error("Could not modify an fd to wait on: {0}")] - ModifyFailed(crate::utils::oserror::OsError), - #[error("Could not remove an fd to wait on: {0}")] - RemoveFailed(crate::utils::oserror::OsError), - #[error("Entry is not registered")] - NoEntry, - #[error("Event loop is already destroyed")] - Destroyed, -} - -#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)] -pub struct EventLoopId(u64); - -pub trait EventLoopDispatcher { - fn dispatch( - self: Rc, - fd: Option, - events: i32, - ) -> Result<(), Box>; -} - -#[derive(Clone)] -struct Entry { - fd: Option, - dispatcher: Rc, -} - -unsafe impl UnsafeCellCloneSafe for Entry {} - -pub struct EventLoop { - destroyed: Cell, - epoll: OwnedFd, - next_id: NumCell, - entries: CopyHashMap, - scheduled: RefCell>, -} - -impl EventLoop { - pub fn new() -> Result, EventLoopError> { - let epoll = match uapi::epoll_create1(c::EPOLL_CLOEXEC) { - Ok(e) => e, - Err(e) => return Err(EventLoopError::CreateFailed(e.into())), - }; - Ok(Rc::new(Self { - destroyed: Cell::new(false), - epoll, - next_id: NumCell::new(1), - entries: CopyHashMap::new(), - scheduled: RefCell::new(Default::default()), - })) - } - - fn check_destroyed(&self) -> Result<(), EventLoopError> { - if self.destroyed.get() { - return Err(EventLoopError::Destroyed); - } - Ok(()) - } - - pub fn id(&self) -> EventLoopId { - EventLoopId(self.next_id.fetch_add(1)) - } - - pub fn stop(&self) { - self.destroyed.set(true); - self.entries.clear(); - } - - pub fn insert( - &self, - id: EventLoopId, - fd: Option, - events: i32, - dispatcher: Rc, - ) -> Result<(), EventLoopError> { - self.check_destroyed()?; - let id = id.0; - if let Some(fd) = fd { - let event = c::epoll_event { - events: events as _, - u64: id, - }; - if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_ADD, fd, Some(&event)) { - return Err(EventLoopError::InsertFailed(e.into())); - } - } - self.entries.set(id, Entry { fd, dispatcher }); - Ok(()) - } - - pub fn modify(&self, id: EventLoopId, events: i32) -> Result<(), EventLoopError> { - self.check_destroyed()?; - let id = id.0; - let entry = match self.entries.get(&id) { - Some(e) => e, - None => return Err(EventLoopError::NoEntry), - }; - if let Some(fd) = entry.fd { - let event = c::epoll_event { - events: events as _, - u64: id, - }; - if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_MOD, fd, Some(&event)) { - return Err(EventLoopError::ModifyFailed(e.into())); - } - } - Ok(()) - } - - pub fn remove(&self, id: EventLoopId) -> Result<(), EventLoopError> { - self.check_destroyed()?; - let id = id.0; - let entry = match self.entries.remove(&id) { - Some(e) => e, - None => return Err(EventLoopError::NoEntry), - }; - if let Some(fd) = entry.fd { - if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_DEL, fd, None) { - if e.0 != c::ENOENT { - return Err(EventLoopError::RemoveFailed(e.into())); - } - } - } - Ok(()) - } - - pub fn schedule(&self, id: EventLoopId) -> Result<(), EventLoopError> { - self.check_destroyed()?; - self.scheduled.borrow_mut().push_back(id.0); - Ok(()) - } - - pub fn run(&self) -> Result<(), EventLoopError> { - let res = self.run_(); - self.stop(); - res - } - - fn run_(&self) -> Result<(), EventLoopError> { - self.check_destroyed()?; - let mut buf = [c::epoll_event { events: 0, u64: 0 }; 16]; - 'outer: while !self.destroyed.get() { - while let Some(id) = self.scheduled.borrow_mut().pop_front() { - if self.destroyed.get() { - break 'outer; - } - if let Some(entry) = self.entries.get(&id) { - if let Err(e) = entry.dispatcher.clone().dispatch(entry.fd, 0) { - return Err(EventLoopError::DispatcherError(e)); - } - } - } - if self.destroyed.get() { - break 'outer; - } - let num = match uapi::epoll_wait(self.epoll.raw(), &mut buf, -1) { - Ok(n) => n, - Err(Errno(c::EINTR)) => continue, - Err(e) => return Err(EventLoopError::WaitFailed(e.into())), - }; - for event in &buf[..num] { - if self.destroyed.get() { - break 'outer; - } - let id = event.u64; - let entry = match self.entries.get(&id) { - Some(d) => d, - None => { - log::warn!( - "Client {} created an event but has already been removed", - id, - ); - continue; - } - }; - if let Err(e) = entry - .dispatcher - .clone() - .dispatch(entry.fd, event.events as i32) - { - return Err(EventLoopError::DispatcherError(e)); - } - } - } - Ok(()) - } -} diff --git a/src/forker.rs b/src/forker.rs index abf38a7f..b7c4455a 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -5,7 +5,6 @@ use { crate::{ async_engine::{AsyncEngine, SpawnedFuture}, compositor::{DISPLAY, WAYLAND_DISPLAY}, - event_loop::EventLoop, forker::{ clone3::{fork_with_pidfd, Forked}, io::{IoIn, IoOut}, @@ -328,14 +327,13 @@ impl Forker { let _ = Fd::new(socket).write_all(&msg); }) }); - let el = EventLoop::new().unwrap(); - let ae = AsyncEngine::install(&el).unwrap(); + let ae = AsyncEngine::new(); let ring = IoUring::new(&ae, 32).unwrap(); let wheel = Wheel::new(&ae, &ring).unwrap(); let forker = Rc::new(Forker { socket, ae: ae.clone(), - ring, + ring: ring.clone(), wheel, fds: RefCell::new(vec![]), outgoing: Default::default(), @@ -343,7 +341,7 @@ impl Forker { }); let _f1 = ae.spawn(forker.clone().incoming()); let _f2 = ae.spawn(forker.clone().outgoing()); - let _ = el.run(); + let _ = ring.run(); unreachable!(); } diff --git a/src/ifs/jay_compositor.rs b/src/ifs/jay_compositor.rs index d2c8048d..f74838b9 100644 --- a/src/ifs/jay_compositor.rs +++ b/src/ifs/jay_compositor.rs @@ -92,7 +92,7 @@ impl JayCompositor { fn quit(&self, parser: MsgParser<'_, '_>) -> Result<(), JayCompositorError> { let _req: Quit = self.client.parse(self, parser)?; log::info!("Quitting"); - self.client.state.el.stop(); + self.client.state.ring.stop(); Ok(()) } diff --git a/src/io_uring.rs b/src/io_uring.rs index f1c07957..a13ef4ae 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture}, + async_engine::AsyncEngine, io_uring::{ ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask}, pending_result::PendingResults, @@ -33,7 +33,10 @@ use { }, }, thiserror::Error, - uapi::c::{self}, + uapi::{ + c::{self}, + OwnedFd, + }, }; macro_rules! map_err { @@ -56,8 +59,6 @@ mod sys; pub enum IoUringError { #[error(transparent)] OsError(OsError), - #[error("The async engine returned an error")] - AsyncError(#[from] AsyncError), #[error("Could not create an io-uring")] CreateUring(#[source] OsError), #[error("The kernel does not support the IORING_FEAT_NODROP feature")] @@ -70,6 +71,8 @@ pub enum IoUringError { MapCqRing(#[source] OsError), #[error("The io-uring has already been destroyed")] Destroyed, + #[error("io_uring_enter failed")] + Enter(#[source] OsError), } pub struct IoUring { @@ -86,7 +89,7 @@ impl IoUring { pub fn new(eng: &Rc, entries: u32) -> Result, IoUringError> { let mut params = io_uring_params::default(); let fd = match io_uring_setup(entries, &mut params) { - Ok(f) => Rc::new(f), + Ok(f) => f, Err(e) => return Err(IoUringError::CreateUring(e.into())), }; if !params.features.contains(IORING_FEAT_NODROP) { @@ -172,10 +175,10 @@ impl IoUring { .cast(); std::slice::from_raw_parts(base, params.cq_entries as _) }; - let fd = eng.fd(&fd)?; let data = Rc::new(IoUringData { destroyed: Cell::new(false), fd, + eng: eng.clone(), _sqesmap_map: sqesmap_map, _sqmap_map: sqmap_map, sqmask, @@ -198,21 +201,26 @@ impl IoUring { cached_writes: Default::default(), cached_cancels: Default::default(), cached_polls: Default::default(), - reader: Cell::new(None), - submitter: Cell::new(None), }); - let submitter = eng.spawn2(Phase::Present, data.clone().submit()); - let reader = eng.spawn(data.clone().reader()); - data.reader.set(Some(reader)); - data.submitter.set(Some(submitter)); Ok(Rc::new(Self { ring: data })) } + + pub fn stop(&self) { + self.ring.kill(); + } + + pub fn run(&self) -> Result<(), IoUringError> { + let res = self.ring.run(); + self.ring.kill(); + res + } } struct IoUringData { destroyed: Cell, - fd: AsyncFd, + fd: OwnedFd, + eng: Rc, _sqesmap_map: Mmapped, _sqmap_map: Mmapped, @@ -240,9 +248,6 @@ struct IoUringData { cached_writes: Stack>, cached_cancels: Stack>, cached_polls: Stack>, - - reader: Cell>>, - submitter: Cell>>, } unsafe trait Task { @@ -256,20 +261,47 @@ unsafe trait Task { } impl IoUringData { - async fn reader(self: Rc) { + fn run(&self) -> Result<(), IoUringError> { + let mut to_submit = 0; loop { - if !self.dispatch_completions() { - match self.fd.readable().await { - Err(e) => { - log::error!("Could not wait for the fd to become readable: {}", e); - } - Ok(FdStatus::Err) => { - log::error!("Fd is in an error state"); - } - _ => continue, + loop { + self.eng.dispatch(); + if self.destroyed.get() { + return Ok(()); + } + if !self.dispatch_completions() { + break; + } + } + to_submit += self.encode(); + let res = if to_submit == 0 { + io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS) + } else if self.to_encode.is_empty() { + io_uring_enter(self.fd.raw(), to_submit as _, 1, IORING_ENTER_GETEVENTS) + } else { + io_uring_enter(self.fd.raw(), !0, 0, 0) + }; + let mut submitted_any = false; + match res { + Ok(n) => { + if n > 0 { + submitted_any = true; + } + to_submit -= n; + } + Err(e) => { + if !matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) { + return Err(IoUringError::Enter(e)); + } + } + } + if to_submit > 0 && !submitted_any { + let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS); + if let Err(e) = res { + if e.0 != c::EINTR { + return Err(IoUringError::Enter(e)); + } } - self.kill(); - return; } } } @@ -327,48 +359,6 @@ impl IoUringData { encoded } - async fn submit(self: Rc) { - let mut to_submit = 0; - loop { - self.to_encode.non_empty().await; - to_submit += self.encode(); - if to_submit == 0 { - match self.fd.writable().await { - Err(e) => { - log::error!("Could not write for fd to become writable: {}", ErrorFmt(e)); - } - Ok(FdStatus::Err) => { - log::error!("Fd is in an error state"); - } - _ => continue, - } - self.kill(); - return; - } - while to_submit > 0 { - let res = io_uring_enter(self.fd.raw(), to_submit as _, 0, 0); - match res { - Ok(0) => { - panic!("io_uring_enter returned 0"); - } - Ok(n) => to_submit -= n, - Err(e) => match e.0 { - c::EAGAIN | c::EBUSY => { - log::debug!("waiting for completion events"); - self.cqes_consumed.clear(); - self.cqes_consumed.triggered().await; - } - _ => { - log::error!("io_uring_enter returned an error: {}", ErrorFmt(e)); - self.kill(); - return; - } - }, - } - } - } - } - fn id(&self) -> Cancellable { Cancellable { id: self.id_raw(), @@ -409,8 +399,6 @@ impl IoUringData { } fn kill(&self) { - self.reader.take(); - self.submitter.take(); let mut to_cancel = vec![]; for task in self.tasks.lock().values() { if !task.is_cancel() { diff --git a/src/main.rs b/src/main.rs index f9d85d5e..2515365b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,7 +46,6 @@ mod config; mod cursor; mod dbus; mod edid; -mod event_loop; mod fixed; mod forker; mod format; diff --git a/src/sighand.rs b/src/sighand.rs index 93c9bde0..201cb02b 100644 --- a/src/sighand.rs +++ b/src/sighand.rs @@ -1,7 +1,6 @@ use { crate::{ async_engine::{AsyncEngine, SpawnedFuture}, - event_loop::{EventLoop, EventLoopError}, io_uring::IoUring, utils::{errorfmt::ErrorFmt, oserror::OsError}, }, @@ -16,12 +15,9 @@ pub enum SighandError { BlockFailed(#[source] OsError), #[error("Could not create a signalfd")] CreateFailed(#[source] OsError), - #[error("The event loop caused an error")] - EventLoopError(#[from] EventLoopError), } pub fn install( - el: &Rc, eng: &Rc, ring: &Rc, ) -> Result, SighandError> { @@ -36,10 +32,10 @@ pub fn install( Ok(fd) => Rc::new(fd), Err(e) => return Err(SighandError::CreateFailed(e.into())), }; - Ok(eng.spawn(handle_signals(fd, ring.clone(), el.clone()))) + Ok(eng.spawn(handle_signals(fd, ring.clone()))) } -async fn handle_signals(fd: Rc, ring: Rc, el: Rc) { +async fn handle_signals(fd: Rc, ring: Rc) { let mut siginfo: c::signalfd_siginfo = uapi::pod_zeroed(); loop { if let Err(e) = ring.readable(&fd).await { @@ -66,7 +62,7 @@ async fn handle_signals(fd: Rc, ring: Rc, el: Rc) { log::info!("Received signal {}", sig); if matches!(sig, c::SIGINT | c::SIGTERM) { log::info!("Exiting"); - el.stop(); + ring.stop(); } } } diff --git a/src/state.rs b/src/state.rs index d7eb5026..32ec76c7 100644 --- a/src/state.rs +++ b/src/state.rs @@ -12,7 +12,6 @@ use { config::ConfigProxy, cursor::ServerCursors, dbus::Dbus, - event_loop::EventLoop, forker::ForkerProxy, globals::{Globals, GlobalsError, WaylandGlobal}, ifs::{ @@ -63,7 +62,6 @@ pub struct State { pub forker: CloneCell>>, pub default_keymap: Rc, pub eng: Rc, - pub el: Rc, pub render_ctx: CloneCell>>, pub render_ctx_version: NumCell, pub render_ctx_ever_initialized: Cell, diff --git a/src/tools/tool_client.rs b/src/tools/tool_client.rs index 68883332..2b240803 100644 --- a/src/tools/tool_client.rs +++ b/src/tools/tool_client.rs @@ -1,9 +1,8 @@ use { crate::{ - async_engine::{AsyncEngine, AsyncError, SpawnedFuture}, + async_engine::{AsyncEngine, SpawnedFuture}, client::{EventFormatter, RequestParser}, compositor::WAYLAND_DISPLAY, - event_loop::{EventLoop, EventLoopError}, io_uring::{IoUring, IoUringError}, logger::Logger, object::{ObjectId, WL_DISPLAY_ID}, @@ -44,12 +43,8 @@ use { #[derive(Debug, Error)] pub enum ToolClientError { - #[error("Could not create an event loop")] - CreateEventLoop(#[source] EventLoopError), #[error("Could not create a timer wheel")] CreateWheel(#[source] WheelError), - #[error("Could not create an async engine")] - CreateEngine(#[source] AsyncError), #[error("Could not create an io-uring")] CreateRing(#[source] IoUringError), #[error("XDG_RUNTIME_DIR is not set")] @@ -78,7 +73,6 @@ pub enum ToolClientError { pub struct ToolClient { pub logger: Arc, - pub el: Rc, pub ring: Rc, pub wheel: Rc, pub eng: Rc, @@ -118,21 +112,14 @@ impl ToolClient { f.await; std::process::exit(0); }); - if let Err(e) = self.el.run() { + if let Err(e) = self.ring.run() { fatal!("A fatal error occurred: {}", ErrorFmt(e)); } } pub fn try_new(level: Level) -> Result, ToolClientError> { let logger = Logger::install_stderr(level); - let el = match EventLoop::new() { - Ok(e) => e, - Err(e) => return Err(ToolClientError::CreateEventLoop(e)), - }; - let eng = match AsyncEngine::install(&el) { - Ok(e) => e, - Err(e) => return Err(ToolClientError::CreateEngine(e)), - }; + let eng = AsyncEngine::new(); let ring = match IoUring::new(&eng, 32) { Ok(e) => e, Err(e) => return Err(ToolClientError::CreateRing(e)), @@ -174,7 +161,6 @@ impl ToolClient { obj_ids.take(1); let slf = Rc::new(Self { logger, - el, ring, wheel, eng, diff --git a/src/utils/queue.rs b/src/utils/queue.rs index bcf88d88..789c727e 100644 --- a/src/utils/queue.rs +++ b/src/utils/queue.rs @@ -49,6 +49,10 @@ impl AsyncQueue { mem::take(&mut *self.data.borrow_mut()); self.waiter.take(); } + + pub fn is_empty(&self) -> bool { + self.data.borrow_mut().is_empty() + } } pub struct AsyncQueuePop<'a, T> { diff --git a/src/wheel.rs b/src/wheel.rs index 591b2f3e..cd465cf6 100644 --- a/src/wheel.rs +++ b/src/wheel.rs @@ -1,6 +1,6 @@ use { crate::{ - async_engine::{AsyncEngine, AsyncError, SpawnedFuture}, + async_engine::{AsyncEngine, SpawnedFuture}, io_uring::IoUring, time::{Time, TimeError}, utils::{ @@ -28,8 +28,6 @@ pub enum WheelError { CreateFailed(#[source] OsError), #[error("Could not set the timerfd")] SetFailed(#[source] OsError), - #[error("An async error occurred")] - AsyncError(#[from] AsyncError), #[error("Cannot determine the time")] TimeError(#[from] TimeError), #[error("The timer wheel is already destroyed")] diff --git a/src/xcon.rs b/src/xcon.rs index 0d8dfb3b..65ccde72 100644 --- a/src/xcon.rs +++ b/src/xcon.rs @@ -5,7 +5,7 @@ pub use crate::xcon::{ }; use { crate::{ - async_engine::{AsyncError, Phase, SpawnedFuture}, + async_engine::{Phase, SpawnedFuture}, compositor::DISPLAY, state::State, utils::{ @@ -87,8 +87,6 @@ pub enum XconError { #[error("Server requires additional authentication: {0}")] Authenticate(BString), #[error(transparent)] - AsyncError(#[from] AsyncError), - #[error(transparent)] BufIoError(#[from] BufIoError), #[error("The server did not send a reply to a request")] MissingReply,