diff --git a/Cargo.lock b/Cargo.lock index e81a6f12..cc2a504c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -374,8 +380,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", + "futures-io", "futures-macro", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -532,8 +540,11 @@ name = "jay-config" version = "0.1.0" dependencies = [ "bincode", + "futures-util", "log", "serde", + "thiserror", + "uapi", ] [[package]] @@ -691,18 +702,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -969,18 +980,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", diff --git a/jay-config/Cargo.toml b/jay-config/Cargo.toml index 773dff34..896649dc 100644 --- a/jay-config/Cargo.toml +++ b/jay-config/Cargo.toml @@ -9,3 +9,6 @@ description = "Configuration crate for the Jay compositor" bincode = "1.3.3" serde = { version = "1.0.196", features = ["derive"] } log = "0.4.14" +futures-util = { version = "0.3.30", features = ["io"] } +uapi = "0.2.10" +thiserror = "1.0.57" diff --git a/jay-config/src/_private.rs b/jay-config/src/_private.rs index 7e7da600..04a26cf2 100644 --- a/jay-config/src/_private.rs +++ b/jay-config/src/_private.rs @@ -1,6 +1,7 @@ pub mod client; pub mod ipc; mod logging; +pub(crate) mod string_error; use { crate::video::Mode, @@ -58,3 +59,6 @@ impl WireMode { } } } + +#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub struct PollableId(pub u64); diff --git a/jay-config/src/_private/client.rs b/jay-config/src/_private/client.rs index 64faadad..496692bc 100644 --- a/jay-config/src/_private/client.rs +++ b/jay-config/src/_private/client.rs @@ -5,12 +5,13 @@ use { _private::{ bincode_ops, ipc::{ClientMessage, InitMessage, Response, ServerMessage}, - logging, Config, ConfigEntry, ConfigEntryGen, WireMode, VERSION, + logging, Config, ConfigEntry, ConfigEntryGen, PollableId, WireMode, VERSION, }, exec::Command, input::{acceleration::AccelProfile, capability::Capability, InputDevice, Seat}, keyboard::Keymap, logging::LogLevel, + tasks::JoinSlot, theme::{colors::Colorable, sized::Resizable, Color}, timer::Timer, video::{ @@ -20,13 +21,22 @@ use { Axis, Direction, ModifiedKeySym, PciId, Workspace, }, bincode::Options, + futures_util::task::ArcWake, std::{ cell::{Cell, RefCell}, - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, VecDeque}, + future::Future, + mem, ops::Deref, + pin::Pin, ptr, rc::Rc, slice, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, Mutex, + }, + task::{Context, Poll, Waker}, time::Duration, }, }; @@ -50,6 +60,40 @@ pub(crate) struct Client { on_idle: RefCell>>, bufs: RefCell>>, reload: Cell, + read_interests: RefCell>, + write_interests: RefCell>, + tasks: Tasks, +} + +struct Interest { + result: Option>, + waker: Option, +} + +#[derive(Default)] +struct Tasks { + last_id: Cell, + ready_front: RefCell>, + ready_back: Arc, + tasks: RefCell>>>, +} + +#[derive(Default)] +struct TasksBackBuffer { + any: AtomicBool, + tasks: Mutex>, +} + +impl TasksBackBuffer { + fn append(&self, task: u64) { + self.tasks.lock().unwrap().push_back(task); + self.any.store(true, Relaxed); + } +} + +struct Task { + task: Pin>>, + waker: Waker, } impl Drop for Client { @@ -138,6 +182,9 @@ pub unsafe extern "C" fn init( on_idle: Default::default(), bufs: Default::default(), reload: Cell::new(false), + read_interests: Default::default(), + write_interests: Default::default(), + tasks: Default::default(), }); let init = slice::from_raw_parts(init, size); client.handle_init_msg(init); @@ -729,7 +776,122 @@ impl Client { }) } + pub fn create_pollable(&self, fd: i32) -> Result { + let res = self.send_with_response(&ClientMessage::AddPollable { fd }); + get_response!( + res, + Err("Compositor did not send a response".to_string()), + AddPollable { id } + ); + id + } + + pub fn remove_pollable(&self, id: PollableId) { + self.send(&ClientMessage::RemovePollable { id }); + self.write_interests.borrow_mut().remove(&id); + self.read_interests.borrow_mut().remove(&id); + } + + pub fn poll_io( + &self, + pollable: PollableId, + writable: bool, + ctx: &mut Context<'_>, + ) -> Poll> { + let interests = match writable { + true => &self.write_interests, + false => &self.read_interests, + }; + let mut interests = interests.borrow_mut(); + match interests.entry(pollable) { + Entry::Occupied(mut o) => { + let interest = o.get_mut(); + if interest.result.is_some() { + Poll::Ready(o.remove().result.unwrap()) + } else { + interest.waker = Some(ctx.waker().clone()); + Poll::Pending + } + } + Entry::Vacant(v) => { + self.send(&ClientMessage::AddInterest { pollable, writable }); + v.insert(Interest { + result: None, + waker: Some(ctx.waker().clone()), + }); + Poll::Pending + } + } + } + fn handle_msg(&self, msg: &[u8]) { + self.handle_msg2(msg); + self.dispatch_futures(); + } + + fn dispatch_futures(&self) { + let futures = &self.tasks; + if !futures.ready_back.any.load(Relaxed) { + return; + } + let mut ready = futures.ready_front.borrow_mut(); + loop { + mem::swap(&mut *ready, &mut *futures.ready_back.tasks.lock().unwrap()); + futures.ready_back.any.store(false, Relaxed); + while let Some(id) = ready.pop_front() { + let fut = futures.tasks.borrow_mut().get(&id).cloned(); + if let Some(fut) = fut { + let mut fut = fut.borrow_mut(); + let fut = &mut *fut; + if let Poll::Ready(()) = + fut.task.as_mut().poll(&mut Context::from_waker(&fut.waker)) + { + futures.tasks.borrow_mut().remove(&id); + } + } + } + if !futures.ready_back.any.load(Relaxed) { + return; + } + } + } + + pub fn spawn_task(&self, f: impl Future + 'static) -> Rc> { + struct Waker(Arc, u64); + impl ArcWake for Waker { + fn wake_by_ref(arc_self: &Arc) { + arc_self.0.append(arc_self.1); + } + } + let tasks = &self.tasks; + let id = tasks.last_id.get() + 1; + tasks.last_id.set(id); + let waker = futures_util::task::waker(Arc::new(Waker(tasks.ready_back.clone(), id))); + tasks.ready_back.append(id); + let slot = Rc::new(JoinSlot { + task_id: id, + slot: Cell::new(None), + waker: Cell::new(None), + }); + let slot2 = slot.clone(); + let task = Rc::new(RefCell::new(Task { + task: Box::pin(async move { + slot2.slot.set(Some(f.await)); + if let Some(waker) = slot2.waker.take() { + waker.wake(); + } + }), + waker, + })); + tasks.tasks.borrow_mut().insert(id, task); + slot + } + + pub fn abort_task(&self, id: u64) { + self.tasks.tasks.borrow_mut().remove(&id); + } + + fn handle_msg2(&self, msg: &[u8]) { let res = bincode_ops().deserialize::(msg); let msg = match res { Ok(msg) => msg, @@ -813,6 +975,19 @@ impl Client { handler(); } } + ServerMessage::InterestReady { id, writable, res } => { + let interests = match writable { + true => &self.write_interests, + false => &self.read_interests, + }; + let mut interests = interests.borrow_mut(); + if let Some(interest) = interests.get_mut(&id) { + interest.result = Some(res); + if let Some(waker) = interest.waker.take() { + waker.wake(); + } + } + } } } diff --git a/jay-config/src/_private/ipc.rs b/jay-config/src/_private/ipc.rs index 24751afe..15c2be29 100644 --- a/jay-config/src/_private/ipc.rs +++ b/jay-config/src/_private/ipc.rs @@ -7,7 +7,7 @@ use { timer::Timer, video::{connector_type::ConnectorType, Connector, DrmDevice, GfxApi, Transform}, Axis, Direction, PciId, Workspace, - _private::WireMode, + _private::{PollableId, WireMode}, }, serde::{Deserialize, Serialize}, std::time::Duration, @@ -57,6 +57,11 @@ pub enum ServerMessage { }, Idle, DevicesEnumerated, + InterestReady { + id: PollableId, + writable: bool, + res: Result<(), String>, + }, } #[derive(Serialize, Deserialize, Debug)] @@ -360,6 +365,16 @@ pub enum ClientMessage<'a> { connector: Connector, mode: WireMode, }, + AddPollable { + fd: i32, + }, + RemovePollable { + id: PollableId, + }, + AddInterest { + pollable: PollableId, + writable: bool, + }, } #[derive(Serialize, Deserialize, Debug)] @@ -465,6 +480,9 @@ pub enum Response { ConnectorModes { modes: Vec, }, + AddPollable { + id: Result, + }, } #[derive(Serialize, Deserialize, Debug)] diff --git a/jay-config/src/_private/string_error.rs b/jay-config/src/_private/string_error.rs new file mode 100644 index 00000000..9b8cfcf7 --- /dev/null +++ b/jay-config/src/_private/string_error.rs @@ -0,0 +1,15 @@ +use std::{ + error::Error, + fmt::{Display, Formatter}, +}; + +#[derive(Debug)] +pub struct StringError(pub String); + +impl Display for StringError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl Error for StringError {} diff --git a/jay-config/src/io.rs b/jay-config/src/io.rs new file mode 100644 index 00000000..ff116e89 --- /dev/null +++ b/jay-config/src/io.rs @@ -0,0 +1,194 @@ +//! Tools for IO operations. + +use { + crate::_private::PollableId, + futures_util::{io::AsyncRead, AsyncWrite}, + std::{ + future::poll_fn, + io::{self, ErrorKind, IoSlice, IoSliceMut, Read, Write}, + os::fd::{AsFd, AsRawFd}, + pin::Pin, + task::{ready, Context, Poll}, + }, + thiserror::Error, + uapi::c, +}; + +#[derive(Debug, Error)] +enum AsyncError { + #[error("Could not retrieve the file description flags")] + GetFl(#[source] io::Error), + #[error("Could not set the file description flags")] + SetFl(#[source] io::Error), + #[error("This configuration has already been destroyed")] + Destroyed, + #[error("The compositor could not create the necessary data structures: {0}")] + CompositorSetup(String), + #[error("Could not poll the file description: {0}")] + Poll(String), +} + +impl From for io::Error { + fn from(value: AsyncError) -> Self { + io::Error::new(ErrorKind::Other, value) + } +} + +/// An async adapter for types implementing [`AsFd`]. +pub struct Async { + id: PollableIdWrapper, + t: Option, +} + +impl Unpin for Async {} + +struct PollableIdWrapper { + id: PollableId, +} + +impl Drop for PollableIdWrapper { + fn drop(&mut self) { + get!().remove_pollable(self.id); + } +} + +impl Async +where + T: AsFd, +{ + /// Creates a new async adapter. + /// + /// This takes ownership of the file description and duplicates the file descriptor. + /// You should not modify the file description while this object is in use, otherwise + /// the behavior is undefined. + pub fn new(t: T) -> Result { + Ok(Self::new_(t)?) + } + + fn new_(t: T) -> Result { + let fd = t.as_fd(); + let fl = uapi::fcntl_getfl(fd.as_raw_fd()) + .map_err(|e| AsyncError::GetFl(io::Error::from_raw_os_error(e.0)))?; + uapi::fcntl_setfl(fd.as_raw_fd(), fl | c::O_NONBLOCK) + .map_err(|e| AsyncError::SetFl(io::Error::from_raw_os_error(e.0)))?; + let id = get!(Err(AsyncError::Destroyed)) + .create_pollable(fd.as_raw_fd()) + .map_err(AsyncError::CompositorSetup)?; + Ok(Self { + id: PollableIdWrapper { id }, + t: Some(t), + }) + } +} + +impl Async { + /// Unwraps the underlying object. + /// + /// Note that the underlying object is still non-blocking at this point. + pub fn unwrap(self) -> T { + self.t.unwrap() + } + + fn poll_(&self, writable: bool, cx: &mut Context<'_>) -> Poll> { + get!(Poll::Ready(Err(AsyncError::Destroyed))) + .poll_io(self.id.id, writable, cx) + .map_err(AsyncError::Poll) + } + + async fn poll(&self, writable: bool) -> Result<(), io::Error> { + poll_fn(|cx| self.poll_(writable, cx)).await?; + Ok(()) + } + + /// Waits for the file description to become readable. + pub async fn readable(&self) -> Result<(), io::Error> { + self.poll(false).await + } + + /// Waits for the file description to become writable. + pub async fn writable(&self) -> Result<(), io::Error> { + self.poll(true).await + } +} + +impl AsRef for Async { + fn as_ref(&self) -> &T { + self.t.as_ref().unwrap() + } +} + +impl AsMut for Async { + fn as_mut(&mut self) -> &mut T { + self.t.as_mut().unwrap() + } +} + +fn poll_io( + slf: &mut Async, + writable: bool, + cx: &mut Context<'_>, + mut f: impl FnMut(&mut Async) -> io::Result, +) -> Poll> { + loop { + ready!(slf.poll_(writable, cx))?; + match f(slf) { + Err(e) if e.kind() == ErrorKind::WouldBlock => {} + res => return Poll::Ready(res), + } + } +} + +impl AsyncRead for Async +where + T: Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + poll_io(self.get_mut(), false, cx, |slf| slf.as_mut().read(buf)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + poll_io(self.get_mut(), false, cx, |slf| { + slf.as_mut().read_vectored(bufs) + }) + } +} + +impl AsyncWrite for Async +where + T: Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().write(buf)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + poll_io(self.get_mut(), true, cx, |slf| { + slf.as_mut().write_vectored(bufs) + }) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().flush()) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + self.get_mut().t.take(); + Poll::Ready(Ok(())) + } +} diff --git a/jay-config/src/lib.rs b/jay-config/src/lib.rs index c007e646..e0ac9b58 100644 --- a/jay-config/src/lib.rs +++ b/jay-config/src/lib.rs @@ -53,9 +53,11 @@ pub mod _private; pub mod embedded; pub mod exec; pub mod input; +pub mod io; pub mod keyboard; pub mod logging; pub mod status; +pub mod tasks; pub mod theme; pub mod timer; pub mod video; diff --git a/jay-config/src/macros.rs b/jay-config/src/macros.rs index 597809eb..d470b6f2 100644 --- a/jay-config/src/macros.rs +++ b/jay-config/src/macros.rs @@ -16,6 +16,20 @@ macro_rules! config { }; } +macro_rules! try_get { + () => {{ + #[allow(unused_unsafe)] + unsafe { + let client = crate::_private::client::CLIENT.with(|client| client.get()); + if client.is_null() { + None + } else { + Some(&*client) + } + } + }}; +} + macro_rules! get { () => {{ get!(Default::default()) diff --git a/jay-config/src/tasks.rs b/jay-config/src/tasks.rs new file mode 100644 index 00000000..5e5e4d35 --- /dev/null +++ b/jay-config/src/tasks.rs @@ -0,0 +1,69 @@ +//! Tools for async task management. + +use std::{ + cell::Cell, + fmt::{Debug, Formatter}, + future::Future, + pin::Pin, + rc::Rc, + task::{Context, Poll, Waker}, +}; + +/// Spawns an asynchronous task that will run in the background. +pub fn spawn(f: F) -> JoinHandle +where + T: 'static, + F: Future + 'static, +{ + let slot = match try_get!() { + None => Rc::new(JoinSlot { + task_id: 0, + slot: Cell::new(None), + waker: Cell::new(None), + }), + Some(c) => c.spawn_task(f), + }; + JoinHandle { slot } +} + +pub(crate) struct JoinSlot { + pub task_id: u64, + pub slot: Cell>, + pub waker: Cell>, +} + +/// A handle to join or abort a spawned task. +/// +/// When the handle is dropped, the task continues to run in the background. +pub struct JoinHandle { + slot: Rc>, +} + +impl Debug for JoinHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JoinHandle") + .field("task_id", &self.slot.task_id) + .finish_non_exhaustive() + } +} + +impl Unpin for JoinHandle {} + +impl JoinHandle { + /// Aborts the task immediately. + pub fn abort(self) { + get!().abort_task(self.slot.task_id); + } +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(t) = self.slot.slot.take() { + return Poll::Ready(t); + } + self.slot.waker.set(Some(cx.waker().clone())); + Poll::Pending + } +} diff --git a/src/config.rs b/src/config.rs index f85b668f..975e4b0b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -175,6 +175,8 @@ impl ConfigProxy { timer_ids: NumCell::new(1), timers_by_name: Default::default(), timers_by_id: Default::default(), + pollable_id: Default::default(), + pollables: Default::default(), }); let init_msg = bincode_ops() .serialize(&InitMessage::V1(V1InitMessage {})) diff --git a/src/config/handler.rs b/src/config/handler.rs index 276d13c3..d18e453f 100644 --- a/src/config/handler.rs +++ b/src/config/handler.rs @@ -8,15 +8,18 @@ use { compositor::MAX_EXTENTS, config::ConfigProxy, ifs::wl_seat::{SeatId, WlSeatGlobal}, + io_uring::TaskResultExt, scale::Scale, state::{ConnectorData, DeviceHandlerData, DrmDevData, OutputData, State}, theme::{Color, ThemeSized, DEFAULT_FONT}, tree::{ContainerNode, ContainerSplit, FloatNode, Node, NodeVisitorBase, OutputNode}, utils::{ + asyncevent::AsyncEvent, copyhashmap::CopyHashMap, debug_fn::debug_fn, errorfmt::ErrorFmt, numcell::NumCell, + oserror::OsError, stack::Stack, timer::{TimerError, TimerFd}, }, @@ -27,7 +30,7 @@ use { _private::{ bincode_ops, ipc::{ClientMessage, Response, ServerMessage}, - WireMode, + PollableId, WireMode, }, input::{ acceleration::{AccelProfile, ACCEL_PROFILE_ADAPTIVE, ACCEL_PROFILE_FLAT}, @@ -48,7 +51,7 @@ use { log::Level, std::{cell::Cell, ops::Deref, rc::Rc, time::Duration}, thiserror::Error, - uapi::c, + uapi::{c, fcntl_dupfd_cloexec}, }; pub(super) struct ConfigProxyHandler { @@ -70,6 +73,16 @@ pub(super) struct ConfigProxyHandler { pub timer_ids: NumCell, pub timers_by_name: CopyHashMap, Rc>, pub timers_by_id: CopyHashMap>, + + pub pollable_id: NumCell, + pub pollables: CopyHashMap>, +} + +pub struct Pollable { + write_trigger: Rc, + _write_future: SpawnedFuture<()>, + read_trigger: Rc, + _read_future: SpawnedFuture<()>, } pub(super) struct TimerData { @@ -85,6 +98,8 @@ impl ConfigProxyHandler { self.timers_by_name.clear(); self.timers_by_id.clear(); + + self.pollables.clear(); } pub fn send(&self, msg: &ServerMessage) { @@ -1027,6 +1042,74 @@ impl ConfigProxyHandler { Ok(()) } + fn handle_add_pollable(self: &Rc, fd: i32) -> Result<(), CphError> { + let fd = match fcntl_dupfd_cloexec(fd, 0) { + Ok(fd) => Rc::new(fd), + Err(e) => { + let err = format!( + "Could not invoke F_DUPFD_CLOEXEC: {}", + ErrorFmt(OsError::from(e)) + ); + log::error!("{}", err); + self.respond(Response::AddPollable { id: Err(err) }); + return Ok(()); + } + }; + let id = self.pollable_id.fetch_add(1); + let id = PollableId(id); + let create = |writable: bool, events: c::c_short| { + let event = Rc::new(AsyncEvent::default()); + let slf = self.clone(); + let trigger = event.clone(); + let fd = fd.clone(); + let future = self.state.eng.spawn(async move { + loop { + trigger.triggered().await; + let res = slf.state.ring.poll(&fd, events).await.merge(); + if let Err(e) = &res { + log::warn!("Could not poll fd: {}", ErrorFmt(e)); + } + let res = res.map_err(|e| ErrorFmt(e).to_string()).map(drop); + slf.send(&ServerMessage::InterestReady { id, writable, res }); + } + }); + (event, future) + }; + let (read_trigger, _read_future) = create(false, c::POLLIN); + let (write_trigger, _write_future) = create(true, c::POLLOUT); + self.pollables.set( + id, + Rc::new(Pollable { + write_trigger, + _write_future, + read_trigger, + _read_future, + }), + ); + self.respond(Response::AddPollable { id: Ok(id) }); + Ok(()) + } + + fn handle_remove_pollable(self: &Rc, id: PollableId) { + self.pollables.remove(&id); + } + + fn handle_add_interest( + self: &Rc, + id: PollableId, + writable: bool, + ) -> Result<(), CphError> { + let Some(pollable) = self.pollables.get(&id) else { + return Err(CphError::PollableDoesNotExist); + }; + let trigger = match writable { + true => &pollable.write_trigger, + false => &pollable.read_trigger, + }; + trigger.trigger(); + Ok(()) + } + fn spaces_change(&self) { struct V; impl NodeVisitorBase for V { @@ -1408,6 +1491,13 @@ impl ConfigProxyHandler { ClientMessage::ConnectorSetMode { connector, mode } => self .handle_connector_set_mode(connector, mode) .wrn("connector_set_mode")?, + ClientMessage::AddPollable { fd } => { + self.handle_add_pollable(fd).wrn("add_pollable")? + } + ClientMessage::RemovePollable { id } => self.handle_remove_pollable(id), + ClientMessage::AddInterest { pollable, writable } => self + .handle_add_interest(pollable, writable) + .wrn("add_interest")?, } Ok(()) } @@ -1465,6 +1555,8 @@ enum CphError { ScaleTooLarge(f64), #[error("Tried to set a negative cursor size")] NegativeCursorSize, + #[error("Config referred to a pollable that does not exist")] + PollableDoesNotExist, } trait WithRequestName { diff --git a/src/it/test_config.rs b/src/it/test_config.rs index e98dc8a7..1768786a 100644 --- a/src/it/test_config.rs +++ b/src/it/test_config.rs @@ -106,6 +106,7 @@ unsafe extern "C" fn handle_msg(data: *const u8, msg: *const u8, size: usize) { ServerMessage::DelDrmDev { .. } => {} ServerMessage::Idle => {} ServerMessage::DevicesEnumerated => {} + ServerMessage::InterestReady { .. } => {} } }