From b387c57d570c9eb2da37c7adf762c42e14ea22fc Mon Sep 17 00:00:00 2001 From: kossLAN Date: Fri, 29 May 2026 19:34:06 -0400 Subject: [PATCH] forker: split proxy boundary --- src/forker.rs | 287 +------------------------------------------- src/forker/error.rs | 28 +++++ src/forker/proxy.rs | 260 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 293 insertions(+), 282 deletions(-) create mode 100644 src/forker/error.rs create mode 100644 src/forker/proxy.rs diff --git a/src/forker.rs b/src/forker.rs index 5db6289d..b4d38cba 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -1,287 +1,10 @@ +mod error; mod io; mod protocol; +mod proxy; mod worker; -use { - crate::{ - async_engine::SpawnedFuture, - compositor::LIBEI_SOCKET, - forker::io::{IoIn, IoOut}, - forker::protocol::{ForkerMessage, ServerMessage}, - forker::worker::Forker, - state::State, - utils::{ - buffd::BufFdError, - clone3::double_fork, - copyhashmap::CopyHashMap, - errorfmt::ErrorFmt, - numcell::NumCell, - oserror::OsErrorExt2, - queue::AsyncQueue, - }, - xwayland, - }, - log::Level, - std::{ - cell::{Cell, RefCell}, - rc::{Rc, Weak}, - task::{Poll, Waker}, - }, - thiserror::Error, - uapi::{OwnedFd, c}, +pub use { + error::ForkerError, + proxy::ForkerProxy, }; - -pub struct ForkerProxy { - pidfd: Rc, - socket: Rc, - task_in: Cell>>, - task_out: Cell>>, - task_proc: Cell>>, - outgoing: AsyncQueue, - next_id: NumCell, - pending_pidfds: CopyHashMap>, - fds: RefCell>>, -} - -struct PidfdHandoff { - pidfd: Cell, c::pid_t), ForkerError>>>, - waiter: Cell>, -} - -#[derive(Debug, Error)] -pub enum ForkerError { - #[error("Could not create a socketpair")] - Socketpair(#[source] crate::utils::oserror::OsError), - #[error("Could not fork")] - Fork(#[source] crate::utils::oserror::OsError), - #[error("Could not read the next message")] - ReadFailed(#[source] BufFdError), - #[error("Could not write the next message")] - WriteFailed(#[source] BufFdError), - #[error("Could not decode the next message")] - DecodeFailed(#[source] bincode::Error), - #[error("Could not encode the next message")] - EncodeFailed(#[source] bincode::Error), - #[error("Could not fork")] - PidfdForkFailed, - #[error("Could not receive pidfd from child")] - RecvPidfd(#[source] crate::utils::oserror::OsError), - #[error("Could not read cmsg")] - CmsgRead(#[source] crate::utils::oserror::OsError), - #[error("Cmsg has an unexpected form")] - InvalidCmsg, -} - -impl ForkerProxy { - pub fn clear(&self) { - self.task_in.take(); - self.task_out.take(); - self.task_proc.take(); - self.outgoing.clear(); - } - - pub fn create(reaper_pid: c::pid_t) -> Result { - let (parent, child) = uapi::socketpair(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) - .map_os_err(ForkerError::Socketpair)?; - match double_fork()? { - Some(pidfd) => Ok(ForkerProxy { - pidfd: Rc::new(pidfd), - socket: Rc::new(parent), - task_in: Cell::new(None), - task_out: Cell::new(None), - task_proc: Cell::new(None), - outgoing: Default::default(), - next_id: Default::default(), - pending_pidfds: Default::default(), - fds: Default::default(), - }), - None => { - drop(parent); - Forker::handle(reaper_pid, child) - } - } - } - - pub fn install(self: &Rc, state: &Rc) { - state.forker.set(Some(self.clone())); - self.task_proc.set(Some(state.eng.spawn( - "forker check process", - self.clone().check_process(state.clone()), - ))); - self.task_in.set(Some( - state - .eng - .spawn("forker incoming", self.clone().incoming(state.clone())), - )); - self.task_out.set(Some( - state - .eng - .spawn("forker outgoing", self.clone().outgoing(state.clone())), - )); - } - - pub fn setenv(&self, key: &[u8], val: &[u8]) { - self.outgoing.push(ServerMessage::SetEnv { - var: key.to_vec(), - val: Some(val.to_vec()), - }) - } - - pub fn unsetenv(&self, key: &[u8]) { - self.outgoing.push(ServerMessage::SetEnv { - var: key.to_vec(), - val: None, - }) - } - - async fn pidfd(&self, id: u32) -> Result<(Rc, c::pid_t), ForkerError> { - let handoff = Rc::new(PidfdHandoff { - pidfd: Cell::new(None), - waiter: Cell::new(None), - }); - self.pending_pidfds.set(id, Rc::downgrade(&handoff)); - futures_util::future::poll_fn(|ctx| { - if let Some(pidfd) = handoff.pidfd.take() { - Poll::Ready(pidfd) - } else { - handoff.waiter.set(Some(ctx.waker().clone())); - Poll::Pending - } - }) - .await - } - - pub async fn xwayland( - &self, - state: &State, - stderr: Rc, - dfd: Rc, - listenfd: Rc, - wmfd: Rc, - waylandfd: Rc, - ) -> Result<(Rc, c::pid_t), ForkerError> { - let (prog, args) = xwayland::build_args(state, self).await; - let env = vec![ - ("WAYLAND_SOCKET".to_string(), Some("6".to_string())), - (LIBEI_SOCKET.to_string(), None), - ]; - let fds = vec![ - (2, stderr), - (3, dfd), - (4, listenfd), - (5, wmfd), - (6, waylandfd), - ]; - let pidfd_id = self.next_id.fetch_add(1); - self.spawn_(prog, args, env, fds, Some(pidfd_id)); - self.pidfd(pidfd_id).await - } - - pub fn spawn( - &self, - prog: String, - args: Vec, - env: Vec<(String, Option)>, - fds: Vec<(i32, Rc)>, - ) { - self.spawn_(prog, args, env, fds, None) - } - - fn spawn_( - &self, - prog: String, - args: Vec, - env: Vec<(String, Option)>, - fds: Vec<(i32, Rc)>, - pidfd_id: Option, - ) { - for (_, fd) in &fds { - self.fds.borrow_mut().push(fd.clone()); - } - let fds = fds.into_iter().map(|(a, _)| a).collect(); - self.outgoing.push(ServerMessage::Spawn { - prog, - args, - env, - fds, - pidfd_id, - }) - } - - async fn incoming(self: Rc, state: Rc) { - let mut io = IoIn::new(&self.socket, &state.ring); - loop { - let msg = match io.read_msg().await { - Ok(msg) => msg, - Err(e) => { - log::error!("Could not read from the ol' forker: {}", ErrorFmt(e)); - self.task_in.take(); - return; - } - }; - self.handle_msg(msg, &mut io); - } - } - - fn handle_msg(&self, msg: ForkerMessage, io: &mut IoIn) { - match msg { - ForkerMessage::Log { level, msg } => self.handle_log(level, &msg), - ForkerMessage::PidFd { id, success, pid } => self.handle_pidfd(id, success, io, pid), - } - } - - fn handle_pidfd(&self, id: u32, success: bool, io: &mut IoIn, pid: c::pid_t) { - let res = match success { - true => Ok((io.pop_fd().unwrap(), pid)), - _ => Err(ForkerError::PidfdForkFailed), - }; - if let Some(handoff) = self.pending_pidfds.remove(&id) - && let Some(handoff) = handoff.upgrade() - { - handoff.pidfd.set(Some(res)); - if let Some(w) = handoff.waiter.take() { - w.wake(); - } - } - } - - fn handle_log(&self, level: usize, msg: &str) { - let level = match level { - 1 => Level::Error, - 2 => Level::Warn, - 3 => Level::Info, - 4 => Level::Debug, - 5 => Level::Trace, - _ => Level::Error, - }; - log::log!(level, "{}", msg); - } - - async fn outgoing(self: Rc, state: Rc) { - let mut io = IoOut::new(&self.socket, &state.ring); - loop { - let msg = self.outgoing.pop().await; - for fd in self.fds.borrow_mut().drain(..) { - io.push_fd(fd); - } - if let Err(e) = io.write_msg(msg).await { - log::error!("Could not write to the ol' forker: {}", ErrorFmt(e)); - self.clear(); - state.forker.set(None); - return; - } - } - } - - async fn check_process(self: Rc, state: Rc) { - if let Err(e) = state.ring.readable(&self.pidfd).await { - log::error!( - "Cannot wait for the forker pidfd to become readable: {}", - ErrorFmt(e) - ); - } - log::error!("The ol' forker died. Cannot spawn further processes."); - self.clear(); - state.forker.set(None); - } -} diff --git a/src/forker/error.rs b/src/forker/error.rs new file mode 100644 index 00000000..6eadab71 --- /dev/null +++ b/src/forker/error.rs @@ -0,0 +1,28 @@ +use { + crate::utils::buffd::BufFdError, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum ForkerError { + #[error("Could not create a socketpair")] + Socketpair(#[source] crate::utils::oserror::OsError), + #[error("Could not fork")] + Fork(#[source] crate::utils::oserror::OsError), + #[error("Could not read the next message")] + ReadFailed(#[source] BufFdError), + #[error("Could not write the next message")] + WriteFailed(#[source] BufFdError), + #[error("Could not decode the next message")] + DecodeFailed(#[source] bincode::Error), + #[error("Could not encode the next message")] + EncodeFailed(#[source] bincode::Error), + #[error("Could not fork")] + PidfdForkFailed, + #[error("Could not receive pidfd from child")] + RecvPidfd(#[source] crate::utils::oserror::OsError), + #[error("Could not read cmsg")] + CmsgRead(#[source] crate::utils::oserror::OsError), + #[error("Cmsg has an unexpected form")] + InvalidCmsg, +} diff --git a/src/forker/proxy.rs b/src/forker/proxy.rs new file mode 100644 index 00000000..270df0f4 --- /dev/null +++ b/src/forker/proxy.rs @@ -0,0 +1,260 @@ +use { + crate::{ + async_engine::SpawnedFuture, + compositor::LIBEI_SOCKET, + forker::{ + ForkerError, + io::{IoIn, IoOut}, + protocol::{ForkerMessage, ServerMessage}, + worker::Forker, + }, + state::State, + utils::{ + clone3::double_fork, + copyhashmap::CopyHashMap, + errorfmt::ErrorFmt, + numcell::NumCell, + oserror::OsErrorExt2, + queue::AsyncQueue, + }, + xwayland, + }, + log::Level, + std::{ + cell::{Cell, RefCell}, + rc::{Rc, Weak}, + task::{Poll, Waker}, + }, + uapi::{OwnedFd, c}, +}; + +pub struct ForkerProxy { + pidfd: Rc, + socket: Rc, + task_in: Cell>>, + task_out: Cell>>, + task_proc: Cell>>, + outgoing: AsyncQueue, + next_id: NumCell, + pending_pidfds: CopyHashMap>, + fds: RefCell>>, +} + +struct PidfdHandoff { + pidfd: Cell, c::pid_t), ForkerError>>>, + waiter: Cell>, +} + +impl ForkerProxy { + pub fn clear(&self) { + self.task_in.take(); + self.task_out.take(); + self.task_proc.take(); + self.outgoing.clear(); + } + + pub fn create(reaper_pid: c::pid_t) -> Result { + let (parent, child) = uapi::socketpair(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) + .map_os_err(ForkerError::Socketpair)?; + match double_fork()? { + Some(pidfd) => Ok(ForkerProxy { + pidfd: Rc::new(pidfd), + socket: Rc::new(parent), + task_in: Cell::new(None), + task_out: Cell::new(None), + task_proc: Cell::new(None), + outgoing: Default::default(), + next_id: Default::default(), + pending_pidfds: Default::default(), + fds: Default::default(), + }), + None => { + drop(parent); + Forker::handle(reaper_pid, child) + } + } + } + + pub fn install(self: &Rc, state: &Rc) { + state.forker.set(Some(self.clone())); + self.task_proc.set(Some(state.eng.spawn( + "forker check process", + self.clone().check_process(state.clone()), + ))); + self.task_in.set(Some( + state + .eng + .spawn("forker incoming", self.clone().incoming(state.clone())), + )); + self.task_out.set(Some( + state + .eng + .spawn("forker outgoing", self.clone().outgoing(state.clone())), + )); + } + + pub fn setenv(&self, key: &[u8], val: &[u8]) { + self.outgoing.push(ServerMessage::SetEnv { + var: key.to_vec(), + val: Some(val.to_vec()), + }) + } + + pub fn unsetenv(&self, key: &[u8]) { + self.outgoing.push(ServerMessage::SetEnv { + var: key.to_vec(), + val: None, + }) + } + + async fn pidfd(&self, id: u32) -> Result<(Rc, c::pid_t), ForkerError> { + let handoff = Rc::new(PidfdHandoff { + pidfd: Cell::new(None), + waiter: Cell::new(None), + }); + self.pending_pidfds.set(id, Rc::downgrade(&handoff)); + futures_util::future::poll_fn(|ctx| { + if let Some(pidfd) = handoff.pidfd.take() { + Poll::Ready(pidfd) + } else { + handoff.waiter.set(Some(ctx.waker().clone())); + Poll::Pending + } + }) + .await + } + + pub async fn xwayland( + &self, + state: &State, + stderr: Rc, + dfd: Rc, + listenfd: Rc, + wmfd: Rc, + waylandfd: Rc, + ) -> Result<(Rc, c::pid_t), ForkerError> { + let (prog, args) = xwayland::build_args(state, self).await; + let env = vec![ + ("WAYLAND_SOCKET".to_string(), Some("6".to_string())), + (LIBEI_SOCKET.to_string(), None), + ]; + let fds = vec![ + (2, stderr), + (3, dfd), + (4, listenfd), + (5, wmfd), + (6, waylandfd), + ]; + let pidfd_id = self.next_id.fetch_add(1); + self.spawn_(prog, args, env, fds, Some(pidfd_id)); + self.pidfd(pidfd_id).await + } + + pub fn spawn( + &self, + prog: String, + args: Vec, + env: Vec<(String, Option)>, + fds: Vec<(i32, Rc)>, + ) { + self.spawn_(prog, args, env, fds, None) + } + + fn spawn_( + &self, + prog: String, + args: Vec, + env: Vec<(String, Option)>, + fds: Vec<(i32, Rc)>, + pidfd_id: Option, + ) { + for (_, fd) in &fds { + self.fds.borrow_mut().push(fd.clone()); + } + let fds = fds.into_iter().map(|(a, _)| a).collect(); + self.outgoing.push(ServerMessage::Spawn { + prog, + args, + env, + fds, + pidfd_id, + }) + } + + async fn incoming(self: Rc, state: Rc) { + let mut io = IoIn::new(&self.socket, &state.ring); + loop { + let msg = match io.read_msg().await { + Ok(msg) => msg, + Err(e) => { + log::error!("Could not read from the ol' forker: {}", ErrorFmt(e)); + self.task_in.take(); + return; + } + }; + self.handle_msg(msg, &mut io); + } + } + + fn handle_msg(&self, msg: ForkerMessage, io: &mut IoIn) { + match msg { + ForkerMessage::Log { level, msg } => self.handle_log(level, &msg), + ForkerMessage::PidFd { id, success, pid } => self.handle_pidfd(id, success, io, pid), + } + } + + fn handle_pidfd(&self, id: u32, success: bool, io: &mut IoIn, pid: c::pid_t) { + let res = match success { + true => Ok((io.pop_fd().unwrap(), pid)), + _ => Err(ForkerError::PidfdForkFailed), + }; + if let Some(handoff) = self.pending_pidfds.remove(&id) + && let Some(handoff) = handoff.upgrade() + { + handoff.pidfd.set(Some(res)); + if let Some(w) = handoff.waiter.take() { + w.wake(); + } + } + } + + fn handle_log(&self, level: usize, msg: &str) { + let level = match level { + 1 => Level::Error, + 2 => Level::Warn, + 3 => Level::Info, + 4 => Level::Debug, + 5 => Level::Trace, + _ => Level::Error, + }; + log::log!(level, "{}", msg); + } + + async fn outgoing(self: Rc, state: Rc) { + let mut io = IoOut::new(&self.socket, &state.ring); + loop { + let msg = self.outgoing.pop().await; + for fd in self.fds.borrow_mut().drain(..) { + io.push_fd(fd); + } + if let Err(e) = io.write_msg(msg).await { + log::error!("Could not write to the ol' forker: {}", ErrorFmt(e)); + self.clear(); + state.forker.set(None); + return; + } + } + } + + async fn check_process(self: Rc, state: Rc) { + if let Err(e) = state.ring.readable(&self.pidfd).await { + log::error!( + "Cannot wait for the forker pidfd to become readable: {}", + ErrorFmt(e) + ); + } + log::error!("The ol' forker died. Cannot spawn further processes."); + self.clear(); + state.forker.set(None); + } +}