diff --git a/src/async_engine.rs b/src/async_engine.rs index 2f669e08..62655792 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -1,7 +1,6 @@ mod ae_fd; mod ae_queue; mod ae_task; -mod ae_timeout; mod ae_timer; mod ae_yield; @@ -9,18 +8,15 @@ pub use { crate::async_engine::ae_yield::Yield, ae_fd::{AsyncFd, FdStatus}, ae_task::SpawnedFuture, - ae_timeout::Timeout, ae_timer::Timer, }; use { crate::{ event_loop::{EventLoop, EventLoopError}, utils::{copyhashmap::CopyHashMap, numcell::NumCell, oserror::OsError}, - wheel::{Wheel, WheelError}, }, ae_fd::AsyncFdData, ae_queue::{DispatchQueue, Dispatcher}, - ae_timeout::TimeoutData, std::{ cell::{Cell, RefCell}, future::Future, @@ -32,8 +28,6 @@ use { #[derive(Debug, Error)] pub enum AsyncError { - #[error("The timer wheel returned an error")] - WheelError(#[from] WheelError), #[error("The event loop caused an error")] EventLoopError(#[from] EventLoopError), #[error("Could not read from a timer")] @@ -54,37 +48,21 @@ pub enum Phase { const NUM_PHASES: usize = 4; pub struct AsyncEngine { - wheel: Rc, el: Rc, queue: Rc, fds: CopyHashMap>, } impl AsyncEngine { - pub fn install(el: &Rc, wheel: &Rc) -> Result, AsyncError> { + pub fn install(el: &Rc) -> Result, AsyncError> { let queue = Dispatcher::install(el)?; Ok(Rc::new(Self { - wheel: wheel.clone(), el: el.clone(), queue, fds: CopyHashMap::new(), })) } - pub fn timeout(&self, ms: u64) -> Result { - let data = Rc::new(TimeoutData { - expired: Cell::new(false), - waker: RefCell::new(None), - }); - let id = self.wheel.id(); - self.wheel.timeout(id, ms, data.clone())?; - Ok(Timeout { - id, - wheel: self.wheel.clone(), - data, - }) - } - pub fn timer(self: &Rc, clock_id: c::c_int) -> Result { Timer::new(self, clock_id) } diff --git a/src/async_engine/ae_fd.rs b/src/async_engine/ae_fd.rs index 8ae6b922..ad3e08ce 100644 --- a/src/async_engine/ae_fd.rs +++ b/src/async_engine/ae_fd.rs @@ -146,10 +146,6 @@ impl AsyncFd { self.data.fd.raw() } - pub fn eng(&self) -> &Rc { - &self.engine - } - pub fn readable(&self) -> AsyncFdReadable { AsyncFdReadable { fd: self, diff --git a/src/async_engine/ae_timeout.rs b/src/async_engine/ae_timeout.rs deleted file mode 100644 index bc2d94b2..00000000 --- a/src/async_engine/ae_timeout.rs +++ /dev/null @@ -1,51 +0,0 @@ -use { - crate::wheel::{Wheel, WheelDispatcher, WheelId}, - std::{ - cell::{Cell, RefCell}, - error::Error, - future::Future, - pin::Pin, - rc::Rc, - task::{Context, Poll, Waker}, - }, -}; - -pub(super) struct TimeoutData { - pub expired: Cell, - pub waker: RefCell>, -} - -impl WheelDispatcher for TimeoutData { - fn dispatch(self: Rc) -> Result<(), Box> { - self.expired.set(true); - if let Some(w) = self.waker.borrow_mut().take() { - w.wake(); - } - Ok(()) - } -} - -pub struct Timeout { - pub(super) id: WheelId, - pub(super) wheel: Rc, - pub(super) data: Rc, -} - -impl Future for Timeout { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.data.expired.get() { - Poll::Ready(()) - } else { - *self.data.waker.borrow_mut() = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -impl Drop for Timeout { - fn drop(&mut self) { - self.wheel.remove(self.id); - } -} diff --git a/src/client/tasks.rs b/src/client/tasks.rs index 14e50482..70481bf6 100644 --- a/src/client/tasks.rs +++ b/src/client/tasks.rs @@ -23,9 +23,8 @@ pub async fn client(data: Rc) { } drop(recv); data.flush_request.trigger(); - match data.state.eng.timeout(5000) { - Ok(timeout) => { - timeout.await; + match data.state.wheel.timeout(5000).await { + Ok(_) => { log::error!("Could not shut down client {} within 5 seconds", data.id.0); } Err(e) => { @@ -101,7 +100,7 @@ async fn receive(data: Rc) { async fn send(data: Rc) { let send = async { - let mut out = BufFdOut::new(data.socket.clone()); + let mut out = BufFdOut::new(data.socket.clone(), &data.state.wheel); let mut buffers = VecDeque::new(); loop { data.flush_request.triggered().await; diff --git a/src/compositor.rs b/src/compositor.rs index 8d7f48ba..784bb3f7 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -118,8 +118,8 @@ fn start_compositor2( sighand::install(&el)?; let xkb_ctx = XkbContext::new().unwrap(); let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap(); - let wheel = Wheel::install(&el)?; - let engine = AsyncEngine::install(&el, &wheel)?; + let engine = AsyncEngine::install(&el)?; + let wheel = Wheel::new(&engine)?; let io_uring = IoUring::new(&engine, 32)?; let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine); let node_ids = NodeIds::default(); diff --git a/src/forker.rs b/src/forker.rs index 0f47a710..7611ae45 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -241,7 +241,7 @@ impl ForkerProxy { } async fn outgoing(self: Rc, state: Rc, socket: AsyncFd) { - let mut io = IoOut::new(socket); + let mut io = IoOut::new(socket, &state.wheel); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { @@ -305,6 +305,7 @@ enum ForkerMessage { struct Forker { socket: AsyncFd, ae: Rc, + wheel: Rc, fds: RefCell>>, outgoing: AsyncQueue, pending_spawns: CopyHashMap>, @@ -331,11 +332,12 @@ impl Forker { }) }); let el = EventLoop::new().unwrap(); - let wheel = Wheel::install(&el).unwrap(); - let ae = AsyncEngine::install(&el, &wheel).unwrap(); + let ae = AsyncEngine::install(&el).unwrap(); + let wheel = Wheel::new(&ae).unwrap(); let forker = Rc::new(Forker { socket: ae.fd(&socket).unwrap(), ae: ae.clone(), + wheel, fds: RefCell::new(vec![]), outgoing: Default::default(), pending_spawns: Default::default(), @@ -347,7 +349,7 @@ impl Forker { } async fn outgoing(self: Rc) { - let mut io = IoOut::new(self.socket.clone()); + let mut io = IoOut::new(self.socket.clone(), &self.wheel); loop { let msg = self.outgoing.pop().await; for fd in self.fds.borrow_mut().drain(..) { diff --git a/src/forker/io.rs b/src/forker/io.rs index 04877878..531ff93b 100644 --- a/src/forker/io.rs +++ b/src/forker/io.rs @@ -11,6 +11,7 @@ use { buffd::{BufFdIn, BufFdOut}, vec_ext::VecExt, }, + wheel::Wheel, }, jay_config::_private::bincode_ops, uapi::OwnedFd, @@ -62,9 +63,9 @@ pub struct IoOut { } impl IoOut { - pub fn new(fd: AsyncFd) -> Self { + pub fn new(fd: AsyncFd, wheel: &Rc) -> Self { Self { - outgoing: BufFdOut::new(fd), + outgoing: BufFdOut::new(fd, wheel), scratch: vec![], fds: vec![], } diff --git a/src/it.rs b/src/it.rs index 6b0a44f0..b0e14840 100644 --- a/src/it.rs +++ b/src/it.rs @@ -141,7 +141,7 @@ fn run_test(it_run: &ItRun, test: &'static dyn TestCase, cfg: Rc) { Box::new(async move { let future: Pin<_> = test.run(testrun.clone()).into(); let future = state.eng.spawn2(Phase::Present, future); - let timeout = state.eng.timeout(5000).unwrap(); + let timeout = state.wheel.timeout(5000); match future::select(future, timeout).await { Either::Left((Ok(..), _)) => {} Either::Left((Err(e), _)) => { diff --git a/src/it/test_transport.rs b/src/it/test_transport.rs index 7c155c8e..e90429d3 100644 --- a/src/it/test_transport.rs +++ b/src/it/test_transport.rs @@ -141,7 +141,7 @@ impl TestTransport { self.run.state.eng.spawn( Outgoing { tc: self.clone(), - buf: BufFdOut::new(self.fd.clone()), + buf: BufFdOut::new(self.fd.clone(), &self.run.state.wheel), buffers: Default::default(), } .run(), diff --git a/src/tools/tool_client.rs b/src/tools/tool_client.rs index ccf30ab0..09a779d7 100644 --- a/src/tools/tool_client.rs +++ b/src/tools/tool_client.rs @@ -127,14 +127,14 @@ impl ToolClient { Ok(e) => e, Err(e) => return Err(ToolClientError::CreateEventLoop(e)), }; - let wheel = match Wheel::install(&el) { - Ok(w) => w, - Err(e) => return Err(ToolClientError::CreateWheel(e)), - }; - let eng = match AsyncEngine::install(&el, &wheel) { + let eng = match AsyncEngine::install(&el) { Ok(e) => e, Err(e) => return Err(ToolClientError::CreateEngine(e)), }; + let wheel = match Wheel::new(&eng) { + Ok(w) => w, + Err(e) => return Err(ToolClientError::CreateWheel(e)), + }; let xrd = match xrd() { Some(d) => d, _ => return Err(ToolClientError::XrdNotSet), @@ -206,7 +206,7 @@ impl ToolClient { slf.eng.spawn( Outgoing { tc: slf.clone(), - buf: BufFdOut::new(fd.clone()), + buf: BufFdOut::new(fd.clone(), &slf.wheel), buffers: Default::default(), } .run(), diff --git a/src/utils/buffd/buf_out.rs b/src/utils/buffd/buf_out.rs index 2acdf088..3bbd2a42 100644 --- a/src/utils/buffd/buf_out.rs +++ b/src/utils/buffd/buf_out.rs @@ -1,7 +1,8 @@ use { crate::{ - async_engine::{AsyncFd, Timeout}, + async_engine::AsyncFd, utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE}, + wheel::{Wheel, WheelTimeoutFuture}, }, futures_util::{future::Fuse, select, FutureExt}, std::{ @@ -79,14 +80,16 @@ impl OutBufferSwapchain { pub struct BufFdOut { fd: AsyncFd, + wheel: Rc, cmsg_buf: Box<[MaybeUninit; CMSG_BUF_SIZE]>, fd_ids: Vec, } impl BufFdOut { - pub fn new(fd: AsyncFd) -> Self { + pub fn new(fd: AsyncFd, wheel: &Rc) -> Self { Self { fd, + wheel: wheel.clone(), cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]), fd_ids: vec![], } @@ -95,12 +98,12 @@ impl BufFdOut { pub async fn flush( &mut self, buf: &mut OutBuffer, - timeout: &mut Option>, + timeout: &mut Option>, ) -> Result<(), BufFdError> { while buf.read_pos < buf.write_pos { if self.flush_sync(buf)? { if timeout.is_none() { - *timeout = Some(self.fd.eng().timeout(5000)?.fuse()); + *timeout = Some(self.wheel.timeout(5000).fuse()); } select! { _ = timeout.as_mut().unwrap() => { diff --git a/src/wheel.rs b/src/wheel.rs index 7b0ab103..b51a0689 100644 --- a/src/wheel.rs +++ b/src/wheel.rs @@ -1,112 +1,174 @@ use { crate::{ - event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId}, + async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture}, time::{Time, TimeError}, - utils::{copyhashmap::CopyHashMap, numcell::NumCell}, + utils::{ + copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsError, + stack::Stack, + }, }, std::{ cell::{Cell, RefCell}, cmp::Reverse, collections::BinaryHeap, - error::Error, + future::Future, + pin::Pin, rc::Rc, + task::{Context, Poll, Waker}, time::Duration, }, thiserror::Error, - uapi::{c, OwnedFd}, + uapi::c, }; #[derive(Debug, Error)] pub enum WheelError { - #[error("Could not create the timerfd: {0}")] - CreateFailed(crate::utils::oserror::OsError), - #[error("Could not set the timerfd: {0}")] - SetFailed(crate::utils::oserror::OsError), - #[error("The timerfd is in an error state")] - ErrorEvent, - #[error("An event loop error occurred: {0}")] - EventLoopError(#[from] EventLoopError), - #[error("Cannot determine the time: {0}")] + #[error("Could not create the timerfd")] + CreateFailed(#[source] OsError), + #[error("Could not set the timerfd")] + SetFailed(#[source] OsError), + #[error("An async error occurred")] + AsyncError(#[from] AsyncError), + #[error("Cannot determine the time")] TimeError(#[from] TimeError), #[error("The timer wheel is already destroyed")] Destroyed, -} - -pub trait WheelDispatcher { - fn dispatch(self: Rc) -> Result<(), Box>; + #[error("Could not read from the timerfd")] + Read(#[source] OsError), } #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] struct WheelEntry { expiration: Time, - id: WheelId, + id: u64, } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] -pub struct WheelId(u64); - pub struct Wheel { + data: Rc, +} + +impl Drop for Wheel { + fn drop(&mut self) { + self.data.kill(); + } +} + +struct WheelTimeoutData { + id: u64, + expired: Cell>>, + wheel: Rc, + waker: Cell>, +} + +impl WheelTimeoutData { + fn complete(&self, res: Result<(), WheelError>) { + self.expired.set(Some(res)); + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +pub struct WheelTimeoutFuture { + data: Rc, +} + +impl Drop for WheelTimeoutFuture { + fn drop(&mut self) { + self.data.wheel.dispatchers.remove(&self.data.id); + self.data.waker.set(None); + if !self.data.wheel.destroyed.get() { + self.data.expired.take(); + self.data.wheel.cached_futures.push(self.data.clone()); + } + } +} + +impl Future for WheelTimeoutFuture { + type Output = Result<(), WheelError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(res) = self.data.expired.take() { + Poll::Ready(res) + } else { + self.data.waker.set(Some(cx.waker().clone())); + Poll::Pending + } + } +} + +pub struct WheelData { destroyed: Cell, - fd: OwnedFd, + fd: AsyncFd, next_id: NumCell, start: Time, current_expiration: Cell>, - dispatchers: CopyHashMap>, - periodic_dispatchers: CopyHashMap>, + dispatchers: CopyHashMap>, expirations: RefCell>>, - id: EventLoopId, - el: Rc, + dispatcher: Cell>>, + cached_futures: Stack>, } impl Wheel { - pub fn install(el: &Rc) -> Result, WheelError> { + pub fn new(eng: &Rc) -> Result, WheelError> { let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { - Ok(fd) => fd, + Ok(fd) => Rc::new(fd), Err(e) => return Err(WheelError::CreateFailed(e.into())), }; - let id = el.id(); - let wheel = Rc::new(Self { + let fd = eng.fd(&fd)?; + let data = Rc::new(WheelData { destroyed: Cell::new(false), fd, - next_id: Default::default(), + next_id: NumCell::new(1), start: Time::now()?, - current_expiration: Cell::new(None), - dispatchers: CopyHashMap::new(), - periodic_dispatchers: Default::default(), - expirations: RefCell::new(Default::default()), - id, - el: el.clone(), + current_expiration: Default::default(), + dispatchers: Default::default(), + expirations: Default::default(), + dispatcher: Default::default(), + cached_futures: Default::default(), }); - let wrapper = Rc::new(WheelWrapper { - wheel: wheel.clone(), - }); - el.insert(id, Some(wheel.fd.raw()), c::EPOLLIN, wrapper)?; - Ok(wheel) + data.dispatcher + .set(Some(eng.spawn(data.clone().dispatch()))); + Ok(Rc::new(Wheel { data })) } - pub fn id(&self) -> WheelId { - WheelId(self.next_id.fetch_add(1)) + fn future(&self) -> WheelTimeoutFuture { + let data = self.data.cached_futures.pop().unwrap_or_else(|| { + Rc::new(WheelTimeoutData { + id: self.data.next_id.fetch_add(1), + expired: Cell::new(None), + wheel: self.data.clone(), + waker: Cell::new(None), + }) + }); + WheelTimeoutFuture { data } } - fn check_destroyed(&self) -> Result<(), WheelError> { - if self.destroyed.get() { - return Err(WheelError::Destroyed); + pub fn timeout(&self, ms: u64) -> WheelTimeoutFuture { + if self.data.destroyed.get() { + return WheelTimeoutFuture { + data: Rc::new(WheelTimeoutData { + id: 0, + expired: Cell::new(Some(Err(WheelError::Destroyed))), + wheel: self.data.clone(), + waker: Default::default(), + }), + }; } - Ok(()) - } - - pub fn timeout( - &self, - id: WheelId, - ms: u64, - dispatcher: Rc, - ) -> Result<(), WheelError> { - self.check_destroyed()?; - let expiration = (Time::now()? + Duration::from_millis(ms)).round_to_ms(); - let current = self.current_expiration.get(); - if current.is_none() || expiration - self.start < current.unwrap() - self.start { + let future = self.future(); + let now = match Time::now() { + Ok(n) => n, + Err(e) => { + future.data.expired.set(Some(Err(WheelError::TimeError(e)))); + return future; + } + }; + let expiration = (now + Duration::from_millis(ms)).round_to_ms(); + let current = self.data.current_expiration.get(); + if current.is_none() || expiration - self.data.start < current.unwrap() - self.data.start { + log::info!("programming timer {}", self.data.fd.raw()); let res = uapi::timerfd_settime( - self.fd.raw(), + self.data.fd.raw(), c::TFD_TIMER_ABSTIME, &c::itimerspec { it_interval: uapi::pod_zeroed(), @@ -114,101 +176,82 @@ impl Wheel { }, ); if let Err(e) = res { - return Err(WheelError::SetFailed(e.into())); + future + .data + .expired + .set(Some(Err(WheelError::SetFailed(e.into())))); + return future; } - self.current_expiration.set(Some(expiration)); - } - self.expirations - .borrow_mut() - .push(Reverse(WheelEntry { expiration, id })); - self.dispatchers.set(id, dispatcher); - Ok(()) - } - - #[allow(dead_code)] - pub fn periodic( - &self, - id: WheelId, - us: u64, - dispatcher: Rc, - ) -> Result<(), WheelError> { - self.check_destroyed()?; - let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { - Ok(fd) => fd, - Err(e) => return Err(WheelError::CreateFailed(e.into())), - }; - let tv_sec = (us / 1_000_000) as _; - let tv_nsec = (us % 1_000_000 * 1_000) as _; - let res = uapi::timerfd_settime( - fd.raw(), - 0, - &c::itimerspec { - it_interval: c::timespec { tv_sec, tv_nsec }, - it_value: c::timespec { tv_sec, tv_nsec }, - }, - ); - if let Err(e) = res { - return Err(WheelError::SetFailed(e.into())); - } - let el_id = self.el.id(); - let pd = Rc::new(PeriodicDispatcher { - fd, - id: el_id, - el: self.el.clone(), - dispatcher, - }); - self.el - .insert(el_id, Some(pd.fd.raw()), c::EPOLLIN, pd.clone())?; - self.periodic_dispatchers.set(id, pd); - Ok(()) - } - - pub fn remove(&self, id: WheelId) { - // log::trace!("removing {:?} from wheel", id); - self.dispatchers.remove(&id); - if let Some(d) = self.periodic_dispatchers.remove(&id) { - let _ = self.el.remove(d.id); + self.data.current_expiration.set(Some(expiration)); } + self.data.expirations.borrow_mut().push(Reverse(WheelEntry { + expiration, + id: future.data.id, + })); + self.data + .dispatchers + .set(future.data.id, future.data.clone()); + future } } -struct WheelWrapper { - wheel: Rc, -} - -impl EventLoopDispatcher for WheelWrapper { - fn dispatch( - self: Rc, - _fd: Option, - events: i32, - ) -> Result<(), Box> { - if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { - return Err(Box::new(WheelError::ErrorEvent)); +impl WheelData { + fn kill(&self) { + self.destroyed.set(true); + self.dispatcher.set(None); + self.cached_futures.take(); + for (_, dispatcher) in self.dispatchers.lock().drain() { + dispatcher.complete(Err(WheelError::Destroyed)); } + } + + async fn dispatch(self: Rc) { + loop { + if let Err(e) = self.fd.readable().await { + log::error!( + "Could not wait for the timerfd to become readable: {}", + ErrorFmt(e) + ); + self.kill(); + return; + } + if let Err(e) = self.dispatch_once() { + log::error!("Could not dispatch wheel expirations: {}", ErrorFmt(e)); + self.kill(); + return; + } + } + } + + fn dispatch_once(&self) -> Result<(), WheelError> { let mut n = 0u64; - while uapi::read(self.wheel.fd.raw(), &mut n).is_ok() {} - let now = match Time::now() { - Ok(n) => n, - Err(e) => return Err(Box::new(e)), - }; - let dist = now - self.wheel.start; - let mut to_dispatch = vec![]; - { - let mut expirations = self.wheel.expirations.borrow_mut(); - while let Some(Reverse(entry)) = expirations.peek() { - if entry.expiration - self.wheel.start > dist { + loop { + if let Err(e) = uapi::read(self.fd.raw(), &mut n) { + if e.0 == c::EAGAIN { break; } - if let Some(dispatcher) = self.wheel.dispatchers.remove(&entry.id) { + return Err(WheelError::Read(e.into())); + } + } + let now = Time::now()?; + let dist = now - self.start; + let mut to_dispatch = vec![]; + { + let mut expirations = self.expirations.borrow_mut(); + while let Some(Reverse(entry)) = expirations.peek() { + if entry.expiration - self.start > dist { + break; + } + if let Some(dispatcher) = self.dispatchers.remove(&entry.id) { to_dispatch.push(dispatcher); } expirations.pop(); } - self.wheel.current_expiration.set(None); + self.current_expiration.set(None); while let Some(Reverse(entry)) = expirations.peek() { - if self.wheel.dispatchers.get(&entry.id).is_some() { + if self.dispatchers.get(&entry.id).is_some() { let res = uapi::timerfd_settime( - self.wheel.fd.raw(), + self.fd.raw(), c::TFD_TIMER_ABSTIME, &c::itimerspec { it_interval: uapi::pod_zeroed(), @@ -216,49 +259,17 @@ impl EventLoopDispatcher for WheelWrapper { }, ); if let Err(e) = res { - return Err(Box::new(WheelError::SetFailed(e.into()))); + return Err(WheelError::SetFailed(e.into())); } - self.wheel.current_expiration.set(Some(entry.expiration)); + self.current_expiration.set(Some(entry.expiration)); break; } expirations.pop(); } } for dispatcher in to_dispatch { - dispatcher.dispatch()?; + dispatcher.complete(Ok(())); } Ok(()) } } - -impl Drop for WheelWrapper { - fn drop(&mut self) { - self.wheel.destroyed.set(true); - self.wheel.dispatchers.clear(); - let _ = self.wheel.el.remove(self.wheel.id); - } -} - -struct PeriodicDispatcher { - fd: OwnedFd, - id: EventLoopId, - el: Rc, - dispatcher: Rc, -} - -impl EventLoopDispatcher for PeriodicDispatcher { - fn dispatch(self: Rc, _fd: Option, events: i32) -> Result<(), Box> { - if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { - return Err(Box::new(WheelError::ErrorEvent)); - } - let mut n = 0u64; - while uapi::read(self.fd.raw(), &mut n).is_ok() {} - self.dispatcher.clone().dispatch() - } -} - -impl Drop for PeriodicDispatcher { - fn drop(&mut self) { - let _ = self.el.remove(self.id); - } -} diff --git a/src/xwayland/xwm.rs b/src/xwayland/xwm.rs index 94a17e38..e9ec3622 100644 --- a/src/xwayland/xwm.rs +++ b/src/xwayland/xwm.rs @@ -2385,13 +2385,7 @@ struct XToWaylandTransfer { impl XToWaylandTransfer { async fn run(self) { - let timeout = match self.state.eng.timeout(5000) { - Ok(to) => to, - Err(e) => { - log::error!("Could not create a timeout: {}", ErrorFmt(e)); - return; - } - }; + let timeout = self.state.wheel.timeout(5000); pin_mut!(timeout); let mut pos = 0; while pos < self.data.len() {