From 98cc85e2d3f9c1c0576d613dde0b5d9a3c079a85 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Thu, 12 May 2022 21:35:08 +0200 Subject: [PATCH] acceptor: rebase from EventLoop onto AsyncEngine --- src/acceptor.rs | 87 ++++++++++++++++++++--------------------------- src/compositor.rs | 3 +- 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/src/acceptor.rs b/src/acceptor.rs index 02401f6a..bc35c93f 100644 --- a/src/acceptor.rs +++ b/src/acceptor.rs @@ -1,7 +1,7 @@ use { crate::{ - client::ClientError, - event_loop::{EventLoopDispatcher, EventLoopError, EventLoopId}, + async_engine::SpawnedFuture, + event_loop::EventLoopError, state::State, utils::{errorfmt::ErrorFmt, oserror::OsError, xrd::xrd}, }, @@ -26,12 +26,6 @@ pub enum AcceptorError { OpenLockFile(#[source] OsError), #[error("Could not lock the lock file")] LockLockFile(#[source] OsError), - #[error("The wayland socket is in an error state")] - ErrorEvent, - #[error("Could not accept new connections")] - AcceptFailed(#[source] OsError), - #[error("Could not spawn an event handler for a new connection")] - SpawnFailed(#[source] ClientError), #[error("Could not bind the socket to an address")] BindFailed(#[source] OsError), #[error("All wayland addresses in the range 0..1000 are already in use")] @@ -41,9 +35,7 @@ pub enum AcceptorError { } pub struct Acceptor { - ids: [EventLoopId; 2], socket: AllocatedSocket, - global: Rc, } struct AllocatedSocket { @@ -148,7 +140,9 @@ fn allocate_socket() -> Result { } impl Acceptor { - pub fn install(state: &Rc) -> Result, AcceptorError> { + pub fn install( + state: &Rc, + ) -> Result<(Rc, Vec>), AcceptorError> { let socket = allocate_socket()?; log::info!("bound to socket {}", socket.path.display()); for fd in [&socket.secure, &socket.insecure] { @@ -156,24 +150,20 @@ impl Acceptor { return Err(AcceptorError::ListenFailed(e.into())); } } - let id1 = state.el.id(); - let id2 = state.el.id(); - let acc = Rc::new(Acceptor { - ids: [id1, id2], - socket, - global: state.clone(), - }); - state.el.insert( - id1, - Some(acc.socket.insecure.raw()), - c::EPOLLIN, - acc.clone(), - )?; - state - .el - .insert(id2, Some(acc.socket.secure.raw()), c::EPOLLIN, acc.clone())?; + let acc = Rc::new(Acceptor { socket }); + let mut futures = vec![]; + futures.push( + state + .eng + .spawn(accept(acc.socket.secure.clone(), state.clone(), true)), + ); + futures.push( + state + .eng + .spawn(accept(acc.socket.insecure.clone(), state.clone(), false)), + ); state.acceptor.set(Some(acc.clone())); - Ok(acc) + Ok((acc, futures)) } pub fn socket_name(&self) -> &str { @@ -186,39 +176,34 @@ impl Acceptor { } } -impl EventLoopDispatcher for Acceptor { - fn dispatch( - self: Rc, - fd: Option, - events: i32, - ) -> Result<(), Box> { - if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { - return Err(Box::new(AcceptorError::ErrorEvent)); +async fn accept(fd: Rc, state: Rc, secure: bool) { + loop { + if let Err(e) = state.ring.readable(&fd).await { + log::error!( + "Could not wait for the acceptor to become readable: {}", + ErrorFmt(e) + ); + break; } - let fd = fd.unwrap(); - let secure = fd == self.socket.secure.raw(); loop { let fd = match uapi::accept4( - fd, + fd.raw(), uapi::sockaddr_none_mut(), c::SOCK_NONBLOCK | c::SOCK_CLOEXEC, ) { Ok((fd, _)) => fd, Err(Errno(c::EAGAIN)) => break, - Err(e) => return Err(Box::new(AcceptorError::AcceptFailed(e.into()))), + Err(e) => { + log::error!("Could not accept a client: {}", ErrorFmt(OsError::from(e))); + break; + } }; - let id = self.global.clients.id(); - if let Err(e) = self.global.clients.spawn(id, &self.global, fd, secure) { - return Err(Box::new(AcceptorError::SpawnFailed(e))); + let id = state.clients.id(); + if let Err(e) = state.clients.spawn(id, &state, fd, secure) { + log::error!("Could not spawn a client: {}", ErrorFmt(e)); + break; } } - Ok(()) - } -} - -impl Drop for Acceptor { - fn drop(&mut self) { - let _ = self.global.el.remove(self.ids[0]); - let _ = self.global.el.remove(self.ids[1]); } + state.el.stop(); } diff --git a/src/compositor.rs b/src/compositor.rs index 6975b5da..0f0e6d6e 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -190,7 +190,7 @@ fn start_compositor2( }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); - let acceptor = Acceptor::install(&state)?; + let (acceptor, acceptor_future) = Acceptor::install(&state)?; forker.install(&state); forker.setenv( WAYLAND_DISPLAY.as_bytes(), @@ -202,6 +202,7 @@ fn start_compositor2( let compositor = engine.spawn(start_compositor3(state.clone(), test_future)); el.run()?; drop(compositor); + drop(acceptor_future); drop(acceptor); drop(forker); engine.clear();