1
0
Fork 0
forked from wry/wry

runtime: replace epoll by io-uring

This commit is contained in:
Julian Orth 2022-05-12 22:50:33 +02:00
parent 98cc85e2d3
commit 9416efeabe
21 changed files with 173 additions and 742 deletions

View file

@ -1,7 +1,6 @@
use {
crate::{
async_engine::SpawnedFuture,
event_loop::EventLoopError,
state::State,
utils::{errorfmt::ErrorFmt, oserror::OsError, xrd::xrd},
},
@ -30,8 +29,6 @@ pub enum AcceptorError {
BindFailed(#[source] OsError),
#[error("All wayland addresses in the range 0..1000 are already in use")]
AddressesInUse,
#[error("The event loop caused an error")]
EventLoopError(#[from] EventLoopError),
}
pub struct Acceptor {
@ -205,5 +202,5 @@ async fn accept(fd: Rc<OwnedFd>, state: Rc<State>, secure: bool) {
}
}
}
state.el.stop();
state.ring.stop();
}

View file

@ -1,35 +1,15 @@
mod ae_fd;
mod ae_queue;
mod ae_task;
mod ae_yield;
pub use {
crate::async_engine::ae_yield::Yield,
ae_fd::{AsyncFd, FdStatus},
ae_task::SpawnedFuture,
};
pub use {crate::async_engine::ae_yield::Yield, ae_task::SpawnedFuture};
use {
crate::{
event_loop::{EventLoop, EventLoopError},
utils::{copyhashmap::CopyHashMap, numcell::NumCell},
async_engine::ae_task::Runnable,
utils::{array, numcell::NumCell, syncqueue::SyncQueue},
},
ae_fd::AsyncFdData,
ae_queue::{DispatchQueue, Dispatcher},
std::{
cell::{Cell, RefCell},
future::Future,
rc::Rc,
},
thiserror::Error,
uapi::OwnedFd,
std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker},
};
#[derive(Debug, Error)]
pub enum AsyncError {
#[error("The event loop caused an error")]
EventLoopError(#[from] EventLoopError),
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum Phase {
EventHandling,
@ -40,70 +20,88 @@ pub enum Phase {
const NUM_PHASES: usize = 4;
pub struct AsyncEngine {
el: Rc<EventLoop>,
queue: Rc<DispatchQueue>,
fds: CopyHashMap<i32, Rc<AsyncFdData>>,
num_queued: NumCell<usize>,
queues: [SyncQueue<Runnable>; NUM_PHASES],
iteration: NumCell<u64>,
yields: SyncQueue<Waker>,
stash: RefCell<VecDeque<Runnable>>,
yield_stash: RefCell<VecDeque<Waker>>,
}
impl AsyncEngine {
pub fn install(el: &Rc<EventLoop>) -> Result<Rc<Self>, AsyncError> {
let queue = Dispatcher::install(el)?;
Ok(Rc::new(Self {
el: el.clone(),
queue,
fds: CopyHashMap::new(),
}))
}
pub fn clear(&self) {
for (_, fd) in self.fds.lock().drain() {
fd.readers.take();
fd.writers.take();
}
self.queue.clear();
}
pub fn spawn<T, F: Future<Output = T> + 'static>(&self, f: F) -> SpawnedFuture<T> {
self.queue.spawn(Phase::EventHandling, f)
}
pub fn spawn2<T, F: Future<Output = T> + 'static>(
&self,
phase: Phase,
f: F,
) -> SpawnedFuture<T> {
self.queue.spawn(phase, f)
}
pub fn fd(self: &Rc<Self>, fd: &Rc<OwnedFd>) -> Result<AsyncFd, AsyncError> {
let data = if let Some(afd) = self.fds.get(&fd.raw()) {
afd.ref_count.fetch_add(1);
afd
} else {
let id = self.el.id();
let afd = Rc::new(AsyncFdData {
ref_count: NumCell::new(1),
fd: fd.clone(),
id,
el: self.el.clone(),
write_registered: Cell::new(false),
read_registered: Cell::new(false),
readers: RefCell::new(vec![]),
writers: RefCell::new(vec![]),
});
self.el.insert(id, Some(fd.raw()), 0, afd.clone())?;
afd
};
Ok(AsyncFd {
engine: self.clone(),
data,
pub fn new() -> Rc<Self> {
Rc::new(Self {
num_queued: Default::default(),
queues: array::from_fn(|_| Default::default()),
iteration: Default::default(),
yields: Default::default(),
stash: Default::default(),
yield_stash: Default::default(),
})
}
pub fn yield_now(&self) -> Yield {
Yield {
iteration: self.queue.iteration(),
queue: self.queue.clone(),
pub fn clear(&self) {
self.stash.borrow_mut().clear();
self.yield_stash.borrow_mut().clear();
self.yields.take();
for queue in &self.queues {
queue.take();
}
}
pub fn spawn<T, F: Future<Output = T> + 'static>(self: &Rc<Self>, f: F) -> SpawnedFuture<T> {
self.spawn_(Phase::EventHandling, f)
}
pub fn spawn2<T, F: Future<Output = T> + 'static>(
self: &Rc<Self>,
phase: Phase,
f: F,
) -> SpawnedFuture<T> {
self.spawn_(phase, f)
}
pub fn yield_now(self: &Rc<Self>) -> Yield {
Yield {
iteration: self.iteration(),
queue: self.clone(),
}
}
pub fn dispatch(&self) {
let mut stash = self.stash.borrow_mut();
let mut yield_stash = self.yield_stash.borrow_mut();
while self.num_queued.get() > 0 {
self.iteration.fetch_add(1);
let mut phase = 0;
while phase < NUM_PHASES as usize {
self.queues[phase].swap(&mut *stash);
if stash.is_empty() {
phase += 1;
continue;
}
self.num_queued.fetch_sub(stash.len());
for runnable in stash.drain(..) {
runnable.run();
}
}
self.yields.swap(&mut *yield_stash);
for waker in yield_stash.drain(..) {
waker.wake();
}
}
}
fn push(&self, runnable: Runnable, phase: Phase) {
self.queues[phase as usize].push(runnable);
self.num_queued.fetch_add(1);
}
fn push_yield(&self, waker: Waker) {
self.yields.push(waker);
}
fn iteration(&self) -> u64 {
self.iteration.get()
}
}

View file

@ -1,196 +0,0 @@
use {
crate::{
async_engine::{AsyncEngine, AsyncError},
event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId},
utils::numcell::NumCell,
},
std::{
cell::{Cell, RefCell},
error::Error,
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
},
uapi::{c, OwnedFd},
};
type Queue = RefCell<Vec<(Waker, Rc<Cell<Option<FdStatus>>>)>>;
pub(super) struct AsyncFdData {
pub(super) ref_count: NumCell<u64>,
pub(super) fd: Rc<OwnedFd>,
pub(super) id: EventLoopId,
pub(super) el: Rc<EventLoop>,
pub(super) write_registered: Cell<bool>,
pub(super) read_registered: Cell<bool>,
pub(super) readers: Queue,
pub(super) writers: Queue,
}
impl AsyncFdData {
fn update_interests(&self) -> Result<(), EventLoopError> {
let mut events = 0;
if self.write_registered.get() {
events |= c::EPOLLOUT;
}
if self.read_registered.get() {
events |= c::EPOLLIN;
}
let res = self.el.modify(self.id, events);
if res.is_err() {
if let Err(e) = self.el.remove(self.id) {
log::error!(
"Fatal error: Cannot remove file descriptor from event loop: {:?}",
e
);
self.el.stop();
}
}
res
}
fn poll(
&self,
woken: &Rc<Cell<Option<FdStatus>>>,
cx: &mut Context<'_>,
registered: impl Fn(&AsyncFdData) -> &Cell<bool>,
queue: impl Fn(&AsyncFdData) -> &Queue,
) -> Poll<Result<FdStatus, AsyncError>> {
if let Some(status) = woken.get() {
return Poll::Ready(Ok(status));
}
if !registered(self).get() {
registered(self).set(true);
if let Err(e) = self.update_interests() {
return Poll::Ready(Err(AsyncError::EventLoopError(e)));
}
}
queue(self)
.borrow_mut()
.push((cx.waker().clone(), woken.clone()));
Poll::Pending
}
}
impl EventLoopDispatcher for AsyncFdData {
fn dispatch(self: Rc<Self>, _fd: Option<i32>, events: i32) -> Result<(), Box<dyn Error>> {
let mut status = FdStatus::Ok;
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 {
status = FdStatus::Err;
if let Err(e) = self.el.remove(self.id) {
return Err(Box::new(e));
}
}
let mut woke_any = false;
if events & c::EPOLLIN != 0 || status == FdStatus::Err {
let mut readers = self.readers.borrow_mut();
woke_any |= !readers.is_empty();
for (waker, woken) in readers.drain(..) {
woken.set(Some(status));
waker.wake();
}
}
if events & c::EPOLLOUT != 0 || status == FdStatus::Err {
let mut writers = self.writers.borrow_mut();
woke_any |= !writers.is_empty();
for (waker, woken) in writers.drain(..) {
woken.set(Some(status));
waker.wake();
}
}
if !woke_any && status == FdStatus::Ok {
self.read_registered.set(false);
self.write_registered.set(false);
if let Err(e) = self.update_interests() {
return Err(Box::new(e));
}
}
Ok(())
}
}
pub struct AsyncFd {
pub(super) engine: Rc<AsyncEngine>,
pub(super) data: Rc<AsyncFdData>,
}
impl Debug for AsyncFd {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd").finish_non_exhaustive()
}
}
impl Clone for AsyncFd {
fn clone(&self) -> Self {
self.data.ref_count.fetch_add(1);
Self {
engine: self.engine.clone(),
data: self.data.clone(),
}
}
}
impl Drop for AsyncFd {
fn drop(&mut self) {
if self.data.ref_count.fetch_sub(1) == 1 {
self.engine.fds.remove(&self.data.fd.raw());
let _ = self.data.el.remove(self.data.id);
}
}
}
impl AsyncFd {
pub fn raw(&self) -> i32 {
self.data.fd.raw()
}
pub fn readable(&self) -> AsyncFdReadable {
AsyncFdReadable {
fd: self,
woken: Rc::new(Cell::new(None)),
}
}
pub fn writable(&self) -> AsyncFdWritable {
AsyncFdWritable {
fd: self,
woken: Rc::new(Cell::new(None)),
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum FdStatus {
Ok,
Err,
}
pub struct AsyncFdReadable<'a> {
fd: &'a AsyncFd,
woken: Rc<Cell<Option<FdStatus>>>,
}
impl<'a> Future for AsyncFdReadable<'a> {
type Output = Result<FdStatus, AsyncError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data = &self.fd.data;
data.poll(&self.woken, cx, |d| &d.read_registered, |d| &d.readers)
}
}
pub struct AsyncFdWritable<'a> {
fd: &'a AsyncFd,
woken: Rc<Cell<Option<FdStatus>>>,
}
impl<'a> Future for AsyncFdWritable<'a> {
type Output = Result<FdStatus, AsyncError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data = &self.fd.data;
data.poll(&self.woken, cx, |d| &d.write_registered, |d| &d.writers)
}
}

View file

@ -1,115 +0,0 @@
use {
crate::{
async_engine::{ae_task::Runnable, AsyncError, Phase, NUM_PHASES},
event_loop::{EventLoop, EventLoopDispatcher, EventLoopId},
utils::{array, numcell::NumCell, syncqueue::SyncQueue},
},
std::{
cell::{Cell, RefCell},
collections::VecDeque,
error::Error,
rc::Rc,
task::Waker,
},
};
pub(super) struct Dispatcher {
queue: Rc<DispatchQueue>,
stash: RefCell<VecDeque<Runnable>>,
yield_stash: RefCell<VecDeque<Waker>>,
}
impl Dispatcher {
pub fn install(el: &Rc<EventLoop>) -> Result<Rc<DispatchQueue>, AsyncError> {
let id = el.id();
let queue = Rc::new(DispatchQueue {
id,
el: el.clone(),
dispatch_scheduled: Cell::new(false),
num_queued: Default::default(),
queues: array::from_fn(|_| Default::default()),
iteration: Default::default(),
yields: Default::default(),
});
let slf = Rc::new(Dispatcher {
queue: queue.clone(),
stash: Default::default(),
yield_stash: Default::default(),
});
el.insert(id, None, 0, slf)?;
Ok(queue)
}
}
impl EventLoopDispatcher for Dispatcher {
fn dispatch(self: Rc<Self>, _fd: Option<i32>, _events: i32) -> Result<(), Box<dyn Error>> {
let mut stash = self.stash.borrow_mut();
let mut yield_stash = self.yield_stash.borrow_mut();
while self.queue.num_queued.get() > 0 {
self.queue.iteration.fetch_add(1);
let mut phase = 0;
while phase < NUM_PHASES as usize {
self.queue.queues[phase].swap(&mut *stash);
if stash.is_empty() {
phase += 1;
continue;
}
self.queue.num_queued.fetch_sub(stash.len());
for runnable in stash.drain(..) {
runnable.run();
}
}
self.queue.yields.swap(&mut *yield_stash);
for waker in yield_stash.drain(..) {
waker.wake();
}
}
self.queue.dispatch_scheduled.set(false);
Ok(())
}
}
impl Drop for Dispatcher {
fn drop(&mut self) {
let _ = self.queue.el.remove(self.queue.id);
for queue in &self.queue.queues {
queue.swap(&mut VecDeque::new());
}
}
}
pub(super) struct DispatchQueue {
dispatch_scheduled: Cell<bool>,
id: EventLoopId,
el: Rc<EventLoop>,
num_queued: NumCell<usize>,
queues: [SyncQueue<Runnable>; NUM_PHASES],
iteration: NumCell<u64>,
yields: SyncQueue<Waker>,
}
impl DispatchQueue {
pub fn clear(&self) {
self.yields.take();
for queue in &self.queues {
queue.take();
}
}
pub fn push(&self, runnable: Runnable, phase: Phase) {
self.queues[phase as usize].push(runnable);
self.num_queued.fetch_add(1);
if !self.dispatch_scheduled.get() {
let _ = self.el.schedule(self.id);
self.dispatch_scheduled.set(true);
}
}
pub fn push_yield(&self, waker: Waker) {
self.yields.push(waker);
}
pub fn iteration(&self) -> u64 {
self.iteration.get()
}
}

View file

@ -1,6 +1,6 @@
use {
crate::{
async_engine::{ae_queue::DispatchQueue, Phase},
async_engine::{AsyncEngine, Phase},
utils::{
numcell::NumCell,
ptr_ext::{MutPtrExt, PtrExt},
@ -94,7 +94,7 @@ struct Task<T, F: Future<Output = T>> {
state: NumCell<u32>,
data: UnsafeCell<TaskData<T, F>>,
waker: Cell<Option<Waker>>,
queue: Rc<DispatchQueue>,
queue: Rc<AsyncEngine>,
}
pub(super) struct Runnable {
@ -119,8 +119,8 @@ impl Drop for Runnable {
}
}
impl DispatchQueue {
pub(super) fn spawn<T, F: Future<Output = T>>(
impl AsyncEngine {
pub(super) fn spawn_<T, F: Future<Output = T>>(
self: &Rc<Self>,
phase: Phase,
f: F,

View file

@ -1,5 +1,5 @@
use {
crate::async_engine::ae_queue::DispatchQueue,
crate::async_engine::AsyncEngine,
std::{
future::Future,
pin::Pin,
@ -10,7 +10,7 @@ use {
pub struct Yield {
pub(super) iteration: u64,
pub(super) queue: Rc<DispatchQueue>,
pub(super) queue: Rc<AsyncEngine>,
}
impl Future for Yield {

View file

@ -289,7 +289,7 @@ impl XBackend {
"Fatal error: Could not handle an event from the X server: {}",
ErrorFmt(e)
);
self.state.el.stop();
self.state.ring.stop();
return;
}
}
@ -851,7 +851,7 @@ impl XBackend {
}
fn handle_destroy(&self, event: &Event) -> Result<(), XBackendError> {
self.state.el.stop();
self.state.ring.stop();
let event: DestroyNotify = event.parse()?;
let output = match self.outputs.remove(&event.event) {
Some(o) => o,

View file

@ -1,6 +1,5 @@
use {
crate::{
async_engine::AsyncError,
client::ClientId,
object::{Interface, ObjectId},
utils::buffd::{BufFdError, MsgParserError},
@ -12,8 +11,6 @@ use {
#[derive(Debug, Error)]
pub enum ClientError {
#[error("An error occurred in the async engine")]
Async(#[from] AsyncError),
#[error("An error occurred reading from/writing to the client")]
Io(#[from] BufFdError),
#[error("An error occurred while processing a request")]

View file

@ -3,7 +3,7 @@ use crate::it::test_backend::TestBackend;
use {
crate::{
acceptor::{Acceptor, AcceptorError},
async_engine::{AsyncEngine, AsyncError, Phase, SpawnedFuture},
async_engine::{AsyncEngine, Phase, SpawnedFuture},
backend::{self, Backend},
backends::{
dummy::{DummyBackend, DummyOutput},
@ -14,7 +14,6 @@ use {
clientmem::{self, ClientMemError},
config::ConfigProxy,
dbus::Dbus,
event_loop::{EventLoop, EventLoopError},
forker,
globals::Globals,
ifs::{wl_output::WlOutputGlobal, wl_surface::NoneSurfaceExt},
@ -76,16 +75,12 @@ fn create_forker() -> Rc<ForkerProxy> {
pub enum CompositorError {
#[error("The client acceptor caused an error")]
AcceptorError(#[from] AcceptorError),
#[error("The event loop caused an error")]
EventLoopError(#[from] EventLoopError),
#[error("The signal handler caused an error")]
SighandError(#[from] SighandError),
#[error("The clientmem subsystem caused an error")]
ClientmemError(#[from] ClientMemError),
#[error("The timer subsystem caused an error")]
WheelError(#[from] WheelError),
#[error("The async subsystem caused an error")]
AsyncError(#[from] AsyncError),
#[error("The render backend caused an error")]
RenderError(#[from] RenderError),
#[error("Could not create an io-uring")]
@ -114,12 +109,11 @@ fn start_compositor2(
leaks::init();
render::init()?;
clientmem::init()?;
let el = EventLoop::new()?;
let xkb_ctx = XkbContext::new().unwrap();
let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap();
let engine = AsyncEngine::install(&el)?;
let engine = AsyncEngine::new();
let ring = IoUring::new(&engine, 32)?;
let _signal_future = sighand::install(&el, &engine, &ring)?;
let _signal_future = sighand::install(&engine, &ring)?;
let wheel = Wheel::new(&engine, &ring)?;
let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine);
let node_ids = NodeIds::default();
@ -129,7 +123,6 @@ fn start_compositor2(
forker: Default::default(),
default_keymap: xkb_keymap,
eng: engine.clone(),
el: el.clone(),
render_ctx: Default::default(),
render_ctx_version: NumCell::new(1),
render_ctx_ever_initialized: Cell::new(false),
@ -186,7 +179,7 @@ fn start_compositor2(
tracker: Default::default(),
data_offer_ids: Default::default(),
drm_dev_ids: Default::default(),
ring,
ring: ring.clone(),
});
state.tracker.register(ClientId::from_raw(0));
create_dummy_output(&state);
@ -200,7 +193,7 @@ fn start_compositor2(
forker.setenv(key.as_bytes(), val.as_bytes());
}
let compositor = engine.spawn(start_compositor3(state.clone(), test_future));
el.run()?;
ring.run()?;
drop(compositor);
drop(acceptor_future);
drop(acceptor);
@ -222,7 +215,7 @@ async fn start_compositor3(state: Rc<State>, test_future: Option<TestFuture>) {
Some(b) => b,
_ => {
log::error!("Could not create a backend");
state.el.stop();
state.ring.stop();
return;
}
};
@ -249,7 +242,7 @@ async fn start_compositor3(state: Rc<State>, test_future: Option<TestFuture>) {
Err(e) => log::error!("Backend failed: {}", ErrorFmt(e.deref())),
_ => log::error!("Backend stopped without an error"),
}
state.el.stop();
state.ring.stop();
}
fn load_config(state: &Rc<State>, #[allow(unused_variables)] for_test: bool) -> ConfigProxy {

View file

@ -768,7 +768,7 @@ impl ConfigProxyHandler {
fn handle_quit(&self) {
log::info!("Quitting");
self.state.el.stop();
self.state.ring.stop();
}
fn handle_switch_to(&self, vtnr: u32) {

View file

@ -1,208 +0,0 @@
use {
crate::utils::{clonecell::UnsafeCellCloneSafe, copyhashmap::CopyHashMap, numcell::NumCell},
std::{
cell::{Cell, RefCell},
collections::VecDeque,
rc::Rc,
},
thiserror::Error,
uapi::{c, Errno, OwnedFd},
};
#[derive(Debug, Error)]
pub enum EventLoopError {
#[error("Could not create an epoll fd: {0}")]
CreateFailed(crate::utils::oserror::OsError),
#[error("epoll_wait failed: {0}")]
WaitFailed(crate::utils::oserror::OsError),
#[error("A dispatcher returned a fatal error: {0}")]
DispatcherError(Box<dyn std::error::Error>),
#[error("Could not insert an fd to wait on: {0}")]
InsertFailed(crate::utils::oserror::OsError),
#[error("Could not modify an fd to wait on: {0}")]
ModifyFailed(crate::utils::oserror::OsError),
#[error("Could not remove an fd to wait on: {0}")]
RemoveFailed(crate::utils::oserror::OsError),
#[error("Entry is not registered")]
NoEntry,
#[error("Event loop is already destroyed")]
Destroyed,
}
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct EventLoopId(u64);
pub trait EventLoopDispatcher {
fn dispatch(
self: Rc<Self>,
fd: Option<i32>,
events: i32,
) -> Result<(), Box<dyn std::error::Error>>;
}
#[derive(Clone)]
struct Entry {
fd: Option<i32>,
dispatcher: Rc<dyn EventLoopDispatcher>,
}
unsafe impl UnsafeCellCloneSafe for Entry {}
pub struct EventLoop {
destroyed: Cell<bool>,
epoll: OwnedFd,
next_id: NumCell<u64>,
entries: CopyHashMap<u64, Entry>,
scheduled: RefCell<VecDeque<u64>>,
}
impl EventLoop {
pub fn new() -> Result<Rc<Self>, EventLoopError> {
let epoll = match uapi::epoll_create1(c::EPOLL_CLOEXEC) {
Ok(e) => e,
Err(e) => return Err(EventLoopError::CreateFailed(e.into())),
};
Ok(Rc::new(Self {
destroyed: Cell::new(false),
epoll,
next_id: NumCell::new(1),
entries: CopyHashMap::new(),
scheduled: RefCell::new(Default::default()),
}))
}
fn check_destroyed(&self) -> Result<(), EventLoopError> {
if self.destroyed.get() {
return Err(EventLoopError::Destroyed);
}
Ok(())
}
pub fn id(&self) -> EventLoopId {
EventLoopId(self.next_id.fetch_add(1))
}
pub fn stop(&self) {
self.destroyed.set(true);
self.entries.clear();
}
pub fn insert(
&self,
id: EventLoopId,
fd: Option<i32>,
events: i32,
dispatcher: Rc<dyn EventLoopDispatcher>,
) -> Result<(), EventLoopError> {
self.check_destroyed()?;
let id = id.0;
if let Some(fd) = fd {
let event = c::epoll_event {
events: events as _,
u64: id,
};
if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_ADD, fd, Some(&event)) {
return Err(EventLoopError::InsertFailed(e.into()));
}
}
self.entries.set(id, Entry { fd, dispatcher });
Ok(())
}
pub fn modify(&self, id: EventLoopId, events: i32) -> Result<(), EventLoopError> {
self.check_destroyed()?;
let id = id.0;
let entry = match self.entries.get(&id) {
Some(e) => e,
None => return Err(EventLoopError::NoEntry),
};
if let Some(fd) = entry.fd {
let event = c::epoll_event {
events: events as _,
u64: id,
};
if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_MOD, fd, Some(&event)) {
return Err(EventLoopError::ModifyFailed(e.into()));
}
}
Ok(())
}
pub fn remove(&self, id: EventLoopId) -> Result<(), EventLoopError> {
self.check_destroyed()?;
let id = id.0;
let entry = match self.entries.remove(&id) {
Some(e) => e,
None => return Err(EventLoopError::NoEntry),
};
if let Some(fd) = entry.fd {
if let Err(e) = uapi::epoll_ctl(self.epoll.raw(), c::EPOLL_CTL_DEL, fd, None) {
if e.0 != c::ENOENT {
return Err(EventLoopError::RemoveFailed(e.into()));
}
}
}
Ok(())
}
pub fn schedule(&self, id: EventLoopId) -> Result<(), EventLoopError> {
self.check_destroyed()?;
self.scheduled.borrow_mut().push_back(id.0);
Ok(())
}
pub fn run(&self) -> Result<(), EventLoopError> {
let res = self.run_();
self.stop();
res
}
fn run_(&self) -> Result<(), EventLoopError> {
self.check_destroyed()?;
let mut buf = [c::epoll_event { events: 0, u64: 0 }; 16];
'outer: while !self.destroyed.get() {
while let Some(id) = self.scheduled.borrow_mut().pop_front() {
if self.destroyed.get() {
break 'outer;
}
if let Some(entry) = self.entries.get(&id) {
if let Err(e) = entry.dispatcher.clone().dispatch(entry.fd, 0) {
return Err(EventLoopError::DispatcherError(e));
}
}
}
if self.destroyed.get() {
break 'outer;
}
let num = match uapi::epoll_wait(self.epoll.raw(), &mut buf, -1) {
Ok(n) => n,
Err(Errno(c::EINTR)) => continue,
Err(e) => return Err(EventLoopError::WaitFailed(e.into())),
};
for event in &buf[..num] {
if self.destroyed.get() {
break 'outer;
}
let id = event.u64;
let entry = match self.entries.get(&id) {
Some(d) => d,
None => {
log::warn!(
"Client {} created an event but has already been removed",
id,
);
continue;
}
};
if let Err(e) = entry
.dispatcher
.clone()
.dispatch(entry.fd, event.events as i32)
{
return Err(EventLoopError::DispatcherError(e));
}
}
}
Ok(())
}
}

View file

@ -5,7 +5,6 @@ use {
crate::{
async_engine::{AsyncEngine, SpawnedFuture},
compositor::{DISPLAY, WAYLAND_DISPLAY},
event_loop::EventLoop,
forker::{
clone3::{fork_with_pidfd, Forked},
io::{IoIn, IoOut},
@ -328,14 +327,13 @@ impl Forker {
let _ = Fd::new(socket).write_all(&msg);
})
});
let el = EventLoop::new().unwrap();
let ae = AsyncEngine::install(&el).unwrap();
let ae = AsyncEngine::new();
let ring = IoUring::new(&ae, 32).unwrap();
let wheel = Wheel::new(&ae, &ring).unwrap();
let forker = Rc::new(Forker {
socket,
ae: ae.clone(),
ring,
ring: ring.clone(),
wheel,
fds: RefCell::new(vec![]),
outgoing: Default::default(),
@ -343,7 +341,7 @@ impl Forker {
});
let _f1 = ae.spawn(forker.clone().incoming());
let _f2 = ae.spawn(forker.clone().outgoing());
let _ = el.run();
let _ = ring.run();
unreachable!();
}

View file

@ -92,7 +92,7 @@ impl JayCompositor {
fn quit(&self, parser: MsgParser<'_, '_>) -> Result<(), JayCompositorError> {
let _req: Quit = self.client.parse(self, parser)?;
log::info!("Quitting");
self.client.state.el.stop();
self.client.state.ring.stop();
Ok(())
}

View file

@ -1,6 +1,6 @@
use {
crate::{
async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture},
async_engine::AsyncEngine,
io_uring::{
ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask},
pending_result::PendingResults,
@ -33,7 +33,10 @@ use {
},
},
thiserror::Error,
uapi::c::{self},
uapi::{
c::{self},
OwnedFd,
},
};
macro_rules! map_err {
@ -56,8 +59,6 @@ mod sys;
pub enum IoUringError {
#[error(transparent)]
OsError(OsError),
#[error("The async engine returned an error")]
AsyncError(#[from] AsyncError),
#[error("Could not create an io-uring")]
CreateUring(#[source] OsError),
#[error("The kernel does not support the IORING_FEAT_NODROP feature")]
@ -70,6 +71,8 @@ pub enum IoUringError {
MapCqRing(#[source] OsError),
#[error("The io-uring has already been destroyed")]
Destroyed,
#[error("io_uring_enter failed")]
Enter(#[source] OsError),
}
pub struct IoUring {
@ -86,7 +89,7 @@ impl IoUring {
pub fn new(eng: &Rc<AsyncEngine>, entries: u32) -> Result<Rc<Self>, IoUringError> {
let mut params = io_uring_params::default();
let fd = match io_uring_setup(entries, &mut params) {
Ok(f) => Rc::new(f),
Ok(f) => f,
Err(e) => return Err(IoUringError::CreateUring(e.into())),
};
if !params.features.contains(IORING_FEAT_NODROP) {
@ -172,10 +175,10 @@ impl IoUring {
.cast();
std::slice::from_raw_parts(base, params.cq_entries as _)
};
let fd = eng.fd(&fd)?;
let data = Rc::new(IoUringData {
destroyed: Cell::new(false),
fd,
eng: eng.clone(),
_sqesmap_map: sqesmap_map,
_sqmap_map: sqmap_map,
sqmask,
@ -198,21 +201,26 @@ impl IoUring {
cached_writes: Default::default(),
cached_cancels: Default::default(),
cached_polls: Default::default(),
reader: Cell::new(None),
submitter: Cell::new(None),
});
let submitter = eng.spawn2(Phase::Present, data.clone().submit());
let reader = eng.spawn(data.clone().reader());
data.reader.set(Some(reader));
data.submitter.set(Some(submitter));
Ok(Rc::new(Self { ring: data }))
}
pub fn stop(&self) {
self.ring.kill();
}
pub fn run(&self) -> Result<(), IoUringError> {
let res = self.ring.run();
self.ring.kill();
res
}
}
struct IoUringData {
destroyed: Cell<bool>,
fd: AsyncFd,
fd: OwnedFd,
eng: Rc<AsyncEngine>,
_sqesmap_map: Mmapped,
_sqmap_map: Mmapped,
@ -240,9 +248,6 @@ struct IoUringData {
cached_writes: Stack<Box<WriteTask>>,
cached_cancels: Stack<Box<AsyncCancelTask>>,
cached_polls: Stack<Box<PollTask>>,
reader: Cell<Option<SpawnedFuture<()>>>,
submitter: Cell<Option<SpawnedFuture<()>>>,
}
unsafe trait Task {
@ -256,20 +261,47 @@ unsafe trait Task {
}
impl IoUringData {
async fn reader(self: Rc<Self>) {
fn run(&self) -> Result<(), IoUringError> {
let mut to_submit = 0;
loop {
if !self.dispatch_completions() {
match self.fd.readable().await {
Err(e) => {
log::error!("Could not wait for the fd to become readable: {}", e);
}
Ok(FdStatus::Err) => {
log::error!("Fd is in an error state");
}
_ => continue,
loop {
self.eng.dispatch();
if self.destroyed.get() {
return Ok(());
}
if !self.dispatch_completions() {
break;
}
}
to_submit += self.encode();
let res = if to_submit == 0 {
io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS)
} else if self.to_encode.is_empty() {
io_uring_enter(self.fd.raw(), to_submit as _, 1, IORING_ENTER_GETEVENTS)
} else {
io_uring_enter(self.fd.raw(), !0, 0, 0)
};
let mut submitted_any = false;
match res {
Ok(n) => {
if n > 0 {
submitted_any = true;
}
to_submit -= n;
}
Err(e) => {
if !matches!(e.0, c::EAGAIN | c::EBUSY | c::EINTR) {
return Err(IoUringError::Enter(e));
}
}
}
if to_submit > 0 && !submitted_any {
let res = io_uring_enter(self.fd.raw(), 0, 1, IORING_ENTER_GETEVENTS);
if let Err(e) = res {
if e.0 != c::EINTR {
return Err(IoUringError::Enter(e));
}
}
self.kill();
return;
}
}
}
@ -327,48 +359,6 @@ impl IoUringData {
encoded
}
async fn submit(self: Rc<Self>) {
let mut to_submit = 0;
loop {
self.to_encode.non_empty().await;
to_submit += self.encode();
if to_submit == 0 {
match self.fd.writable().await {
Err(e) => {
log::error!("Could not write for fd to become writable: {}", ErrorFmt(e));
}
Ok(FdStatus::Err) => {
log::error!("Fd is in an error state");
}
_ => continue,
}
self.kill();
return;
}
while to_submit > 0 {
let res = io_uring_enter(self.fd.raw(), to_submit as _, 0, 0);
match res {
Ok(0) => {
panic!("io_uring_enter returned 0");
}
Ok(n) => to_submit -= n,
Err(e) => match e.0 {
c::EAGAIN | c::EBUSY => {
log::debug!("waiting for completion events");
self.cqes_consumed.clear();
self.cqes_consumed.triggered().await;
}
_ => {
log::error!("io_uring_enter returned an error: {}", ErrorFmt(e));
self.kill();
return;
}
},
}
}
}
}
fn id(&self) -> Cancellable {
Cancellable {
id: self.id_raw(),
@ -409,8 +399,6 @@ impl IoUringData {
}
fn kill(&self) {
self.reader.take();
self.submitter.take();
let mut to_cancel = vec![];
for task in self.tasks.lock().values() {
if !task.is_cancel() {

View file

@ -46,7 +46,6 @@ mod config;
mod cursor;
mod dbus;
mod edid;
mod event_loop;
mod fixed;
mod forker;
mod format;

View file

@ -1,7 +1,6 @@
use {
crate::{
async_engine::{AsyncEngine, SpawnedFuture},
event_loop::{EventLoop, EventLoopError},
io_uring::IoUring,
utils::{errorfmt::ErrorFmt, oserror::OsError},
},
@ -16,12 +15,9 @@ pub enum SighandError {
BlockFailed(#[source] OsError),
#[error("Could not create a signalfd")]
CreateFailed(#[source] OsError),
#[error("The event loop caused an error")]
EventLoopError(#[from] EventLoopError),
}
pub fn install(
el: &Rc<EventLoop>,
eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
) -> Result<SpawnedFuture<()>, SighandError> {
@ -36,10 +32,10 @@ pub fn install(
Ok(fd) => Rc::new(fd),
Err(e) => return Err(SighandError::CreateFailed(e.into())),
};
Ok(eng.spawn(handle_signals(fd, ring.clone(), el.clone())))
Ok(eng.spawn(handle_signals(fd, ring.clone())))
}
async fn handle_signals(fd: Rc<OwnedFd>, ring: Rc<IoUring>, el: Rc<EventLoop>) {
async fn handle_signals(fd: Rc<OwnedFd>, ring: Rc<IoUring>) {
let mut siginfo: c::signalfd_siginfo = uapi::pod_zeroed();
loop {
if let Err(e) = ring.readable(&fd).await {
@ -66,7 +62,7 @@ async fn handle_signals(fd: Rc<OwnedFd>, ring: Rc<IoUring>, el: Rc<EventLoop>) {
log::info!("Received signal {}", sig);
if matches!(sig, c::SIGINT | c::SIGTERM) {
log::info!("Exiting");
el.stop();
ring.stop();
}
}
}

View file

@ -12,7 +12,6 @@ use {
config::ConfigProxy,
cursor::ServerCursors,
dbus::Dbus,
event_loop::EventLoop,
forker::ForkerProxy,
globals::{Globals, GlobalsError, WaylandGlobal},
ifs::{
@ -63,7 +62,6 @@ pub struct State {
pub forker: CloneCell<Option<Rc<ForkerProxy>>>,
pub default_keymap: Rc<XkbKeymap>,
pub eng: Rc<AsyncEngine>,
pub el: Rc<EventLoop>,
pub render_ctx: CloneCell<Option<Rc<RenderContext>>>,
pub render_ctx_version: NumCell<u32>,
pub render_ctx_ever_initialized: Cell<bool>,

View file

@ -1,9 +1,8 @@
use {
crate::{
async_engine::{AsyncEngine, AsyncError, SpawnedFuture},
async_engine::{AsyncEngine, SpawnedFuture},
client::{EventFormatter, RequestParser},
compositor::WAYLAND_DISPLAY,
event_loop::{EventLoop, EventLoopError},
io_uring::{IoUring, IoUringError},
logger::Logger,
object::{ObjectId, WL_DISPLAY_ID},
@ -44,12 +43,8 @@ use {
#[derive(Debug, Error)]
pub enum ToolClientError {
#[error("Could not create an event loop")]
CreateEventLoop(#[source] EventLoopError),
#[error("Could not create a timer wheel")]
CreateWheel(#[source] WheelError),
#[error("Could not create an async engine")]
CreateEngine(#[source] AsyncError),
#[error("Could not create an io-uring")]
CreateRing(#[source] IoUringError),
#[error("XDG_RUNTIME_DIR is not set")]
@ -78,7 +73,6 @@ pub enum ToolClientError {
pub struct ToolClient {
pub logger: Arc<Logger>,
pub el: Rc<EventLoop>,
pub ring: Rc<IoUring>,
pub wheel: Rc<Wheel>,
pub eng: Rc<AsyncEngine>,
@ -118,21 +112,14 @@ impl ToolClient {
f.await;
std::process::exit(0);
});
if let Err(e) = self.el.run() {
if let Err(e) = self.ring.run() {
fatal!("A fatal error occurred: {}", ErrorFmt(e));
}
}
pub fn try_new(level: Level) -> Result<Rc<Self>, ToolClientError> {
let logger = Logger::install_stderr(level);
let el = match EventLoop::new() {
Ok(e) => e,
Err(e) => return Err(ToolClientError::CreateEventLoop(e)),
};
let eng = match AsyncEngine::install(&el) {
Ok(e) => e,
Err(e) => return Err(ToolClientError::CreateEngine(e)),
};
let eng = AsyncEngine::new();
let ring = match IoUring::new(&eng, 32) {
Ok(e) => e,
Err(e) => return Err(ToolClientError::CreateRing(e)),
@ -174,7 +161,6 @@ impl ToolClient {
obj_ids.take(1);
let slf = Rc::new(Self {
logger,
el,
ring,
wheel,
eng,

View file

@ -49,6 +49,10 @@ impl<T> AsyncQueue<T> {
mem::take(&mut *self.data.borrow_mut());
self.waiter.take();
}
pub fn is_empty(&self) -> bool {
self.data.borrow_mut().is_empty()
}
}
pub struct AsyncQueuePop<'a, T> {

View file

@ -1,6 +1,6 @@
use {
crate::{
async_engine::{AsyncEngine, AsyncError, SpawnedFuture},
async_engine::{AsyncEngine, SpawnedFuture},
io_uring::IoUring,
time::{Time, TimeError},
utils::{
@ -28,8 +28,6 @@ pub enum WheelError {
CreateFailed(#[source] OsError),
#[error("Could not set the timerfd")]
SetFailed(#[source] OsError),
#[error("An async error occurred")]
AsyncError(#[from] AsyncError),
#[error("Cannot determine the time")]
TimeError(#[from] TimeError),
#[error("The timer wheel is already destroyed")]

View file

@ -5,7 +5,7 @@ pub use crate::xcon::{
};
use {
crate::{
async_engine::{AsyncError, Phase, SpawnedFuture},
async_engine::{Phase, SpawnedFuture},
compositor::DISPLAY,
state::State,
utils::{
@ -87,8 +87,6 @@ pub enum XconError {
#[error("Server requires additional authentication: {0}")]
Authenticate(BString),
#[error(transparent)]
AsyncError(#[from] AsyncError),
#[error(transparent)]
BufIoError(#[from] BufIoError),
#[error("The server did not send a reply to a request")]
MissingReply,