1
0
Fork 0
forked from wry/wry

acceptor: rebase from EventLoop onto AsyncEngine

This commit is contained in:
Julian Orth 2022-05-12 21:35:08 +02:00
parent 3037ee439c
commit 98cc85e2d3
2 changed files with 38 additions and 52 deletions

View file

@ -1,7 +1,7 @@
use { use {
crate::{ crate::{
client::ClientError, async_engine::SpawnedFuture,
event_loop::{EventLoopDispatcher, EventLoopError, EventLoopId}, event_loop::EventLoopError,
state::State, state::State,
utils::{errorfmt::ErrorFmt, oserror::OsError, xrd::xrd}, utils::{errorfmt::ErrorFmt, oserror::OsError, xrd::xrd},
}, },
@ -26,12 +26,6 @@ pub enum AcceptorError {
OpenLockFile(#[source] OsError), OpenLockFile(#[source] OsError),
#[error("Could not lock the lock file")] #[error("Could not lock the lock file")]
LockLockFile(#[source] OsError), LockLockFile(#[source] OsError),
#[error("The wayland socket is in an error state")]
ErrorEvent,
#[error("Could not accept new connections")]
AcceptFailed(#[source] OsError),
#[error("Could not spawn an event handler for a new connection")]
SpawnFailed(#[source] ClientError),
#[error("Could not bind the socket to an address")] #[error("Could not bind the socket to an address")]
BindFailed(#[source] OsError), BindFailed(#[source] OsError),
#[error("All wayland addresses in the range 0..1000 are already in use")] #[error("All wayland addresses in the range 0..1000 are already in use")]
@ -41,9 +35,7 @@ pub enum AcceptorError {
} }
pub struct Acceptor { pub struct Acceptor {
ids: [EventLoopId; 2],
socket: AllocatedSocket, socket: AllocatedSocket,
global: Rc<State>,
} }
struct AllocatedSocket { struct AllocatedSocket {
@ -148,7 +140,9 @@ fn allocate_socket() -> Result<AllocatedSocket, AcceptorError> {
} }
impl Acceptor { impl Acceptor {
pub fn install(state: &Rc<State>) -> Result<Rc<Acceptor>, AcceptorError> { pub fn install(
state: &Rc<State>,
) -> Result<(Rc<Acceptor>, Vec<SpawnedFuture<()>>), AcceptorError> {
let socket = allocate_socket()?; let socket = allocate_socket()?;
log::info!("bound to socket {}", socket.path.display()); log::info!("bound to socket {}", socket.path.display());
for fd in [&socket.secure, &socket.insecure] { for fd in [&socket.secure, &socket.insecure] {
@ -156,24 +150,20 @@ impl Acceptor {
return Err(AcceptorError::ListenFailed(e.into())); return Err(AcceptorError::ListenFailed(e.into()));
} }
} }
let id1 = state.el.id(); let acc = Rc::new(Acceptor { socket });
let id2 = state.el.id(); let mut futures = vec![];
let acc = Rc::new(Acceptor { futures.push(
ids: [id1, id2], state
socket, .eng
global: state.clone(), .spawn(accept(acc.socket.secure.clone(), state.clone(), true)),
}); );
state.el.insert( futures.push(
id1, state
Some(acc.socket.insecure.raw()), .eng
c::EPOLLIN, .spawn(accept(acc.socket.insecure.clone(), state.clone(), false)),
acc.clone(), );
)?;
state
.el
.insert(id2, Some(acc.socket.secure.raw()), c::EPOLLIN, acc.clone())?;
state.acceptor.set(Some(acc.clone())); state.acceptor.set(Some(acc.clone()));
Ok(acc) Ok((acc, futures))
} }
pub fn socket_name(&self) -> &str { pub fn socket_name(&self) -> &str {
@ -186,39 +176,34 @@ impl Acceptor {
} }
} }
impl EventLoopDispatcher for Acceptor { async fn accept(fd: Rc<OwnedFd>, state: Rc<State>, secure: bool) {
fn dispatch( loop {
self: Rc<Self>, if let Err(e) = state.ring.readable(&fd).await {
fd: Option<i32>, log::error!(
events: i32, "Could not wait for the acceptor to become readable: {}",
) -> Result<(), Box<dyn std::error::Error>> { ErrorFmt(e)
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { );
return Err(Box::new(AcceptorError::ErrorEvent)); break;
} }
let fd = fd.unwrap();
let secure = fd == self.socket.secure.raw();
loop { loop {
let fd = match uapi::accept4( let fd = match uapi::accept4(
fd, fd.raw(),
uapi::sockaddr_none_mut(), uapi::sockaddr_none_mut(),
c::SOCK_NONBLOCK | c::SOCK_CLOEXEC, c::SOCK_NONBLOCK | c::SOCK_CLOEXEC,
) { ) {
Ok((fd, _)) => fd, Ok((fd, _)) => fd,
Err(Errno(c::EAGAIN)) => break, Err(Errno(c::EAGAIN)) => break,
Err(e) => return Err(Box::new(AcceptorError::AcceptFailed(e.into()))), Err(e) => {
log::error!("Could not accept a client: {}", ErrorFmt(OsError::from(e)));
break;
}
}; };
let id = self.global.clients.id(); let id = state.clients.id();
if let Err(e) = self.global.clients.spawn(id, &self.global, fd, secure) { if let Err(e) = state.clients.spawn(id, &state, fd, secure) {
return Err(Box::new(AcceptorError::SpawnFailed(e))); log::error!("Could not spawn a client: {}", ErrorFmt(e));
break;
} }
} }
Ok(())
}
}
impl Drop for Acceptor {
fn drop(&mut self) {
let _ = self.global.el.remove(self.ids[0]);
let _ = self.global.el.remove(self.ids[1]);
} }
state.el.stop();
} }

View file

@ -190,7 +190,7 @@ fn start_compositor2(
}); });
state.tracker.register(ClientId::from_raw(0)); state.tracker.register(ClientId::from_raw(0));
create_dummy_output(&state); create_dummy_output(&state);
let acceptor = Acceptor::install(&state)?; let (acceptor, acceptor_future) = Acceptor::install(&state)?;
forker.install(&state); forker.install(&state);
forker.setenv( forker.setenv(
WAYLAND_DISPLAY.as_bytes(), WAYLAND_DISPLAY.as_bytes(),
@ -202,6 +202,7 @@ fn start_compositor2(
let compositor = engine.spawn(start_compositor3(state.clone(), test_future)); let compositor = engine.spawn(start_compositor3(state.clone(), test_future));
el.run()?; el.run()?;
drop(compositor); drop(compositor);
drop(acceptor_future);
drop(acceptor); drop(acceptor);
drop(forker); drop(forker);
engine.clear(); engine.clear();