From b0603f06393b236869050c24ac6c3b2e15e32dad Mon Sep 17 00:00:00 2001 From: kossLAN Date: Fri, 29 May 2026 19:31:48 -0400 Subject: [PATCH] forker: split worker implementation --- src/forker.rs | 306 +---------------------------------------- src/forker/worker.rs | 315 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+), 299 deletions(-) create mode 100644 src/forker/worker.rs diff --git a/src/forker.rs b/src/forker.rs index ad195ec1..5db6289d 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -1,41 +1,34 @@ mod io; mod protocol; +mod worker; use { crate::{ - async_engine::{AsyncEngine, SpawnedFuture}, - compositor::{DISPLAY, LIBEI_SOCKET, WAYLAND_DISPLAY}, + async_engine::SpawnedFuture, + compositor::LIBEI_SOCKET, forker::io::{IoIn, IoOut}, - forker::protocol::{ForkerMessage, ServerMessage, bincode_ops}, - io_uring::IoUring, + forker::protocol::{ForkerMessage, ServerMessage}, + forker::worker::Forker, state::State, utils::{ buffd::BufFdError, - clone3::{Forked, double_fork, fork_with_pidfd}, + clone3::double_fork, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsErrorExt2, - pipe::{Pipe, pipe}, - process_name::set_process_name, queue::AsyncQueue, }, xwayland, }, - ahash::AHashMap, - bincode::Options, log::Level, std::{ cell::{Cell, RefCell}, - env, - ffi::OsStr, - io::{Read, Write}, - os::unix::ffi::OsStrExt, rc::{Rc, Weak}, task::{Poll, Waker}, }, thiserror::Error, - uapi::{Errno, Fd, IntoUstr, OwnedFd, UstrPtr, c}, + uapi::{OwnedFd, c}, }; pub struct ForkerProxy { @@ -292,288 +285,3 @@ impl ForkerProxy { state.forker.set(None); } } - -struct Forker { - socket: Rc, - ae: Rc, - ring: Rc, - fds: RefCell>>, - outgoing: AsyncQueue, - pending_spawns: CopyHashMap>, -} - -impl Forker { - fn handle(ppid: c::pid_t, socket: OwnedFd) -> ! { - unsafe { - env::set_var("XDG_SESSION_TYPE", "wayland"); - env::remove_var(DISPLAY); - env::remove_var(WAYLAND_DISPLAY); - } - set_process_name("the ol' forker"); - setup_deathsig(ppid); - unsafe { - c::signal(c::SIGCHLD, c::SIG_IGN); - } - let socket = Rc::new(setup_fds(socket)); - std::panic::set_hook({ - let socket = socket.raw(); - Box::new(move |pi| { - let msg = ForkerMessage::Log { - level: log::Level::Error as _, - msg: format!("The ol' forker panicked: {}", pi), - }; - let msg = bincode_ops().serialize(&msg).unwrap(); - let _ = Fd::new(socket).write_all(&msg); - }) - }); - let ae = AsyncEngine::new(); - let ring = IoUring::new(&ae, 32).unwrap(); - let forker = Rc::new(Forker { - socket, - ae: ae.clone(), - ring: ring.clone(), - fds: RefCell::new(vec![]), - outgoing: Default::default(), - pending_spawns: Default::default(), - }); - let _f1 = ae.spawn("forker incoming", forker.clone().incoming()); - let _f2 = ae.spawn("forker outgoing", forker.clone().outgoing()); - let _ = ring.run(); - std::process::exit(1); - } - - async fn outgoing(self: Rc) { - let mut io = IoOut::new(&self.socket, &self.ring); - loop { - let msg = self.outgoing.pop().await; - for fd in self.fds.borrow_mut().drain(..) { - io.push_fd(fd); - } - if io.write_msg(msg).await.is_err() { - self.ring.stop(); - return; - } - } - } - - async fn incoming(self: Rc) { - let mut io = IoIn::new(&self.socket, &self.ring); - loop { - let msg = match io.read_msg().await { - Ok(m) => m, - _ => { - self.ring.stop(); - return; - } - }; - self.handle_msg(msg, &mut io); - } - } - - fn handle_msg(self: &Rc, msg: ServerMessage, io: &mut IoIn) { - match msg { - ServerMessage::SetEnv { var, val } => self.handle_set_env(&var, val), - ServerMessage::Spawn { - prog, - args, - env, - fds, - pidfd_id, - } => self.handle_spawn(prog, args, env, fds, io, pidfd_id), - } - } - - fn handle_set_env(self: &Rc, var: &[u8], val: Option>) { - let var = OsStr::from_bytes(var); - unsafe { - match val { - Some(val) => env::set_var(var, OsStr::from_bytes(&val)), - _ => env::remove_var(var), - } - } - } - - fn handle_spawn( - self: &Rc, - prog: String, - args: Vec, - env: Vec<(String, Option)>, - fds: Vec, - io: &mut IoIn, - pidfd_id: Option, - ) { - let fds = fds - .into_iter() - .map(|a| (a, Rc::try_unwrap(io.pop_fd().unwrap()).unwrap())) - .collect(); - self.spawn(prog, args, env, fds, pidfd_id) - } - - fn spawn( - self: &Rc, - prog: String, - args: Vec, - env: Vec<(String, Option)>, - fds: Vec<(i32, OwnedFd)>, - pidfd_id: Option, - ) { - let Pipe { read, mut write } = pipe().unwrap(); - let res = match fork_with_pidfd(false) { - Ok(o) => o, - Err(e) => { - self.fail_pidfd(pidfd_id); - self.outgoing.push(ForkerMessage::Log { - level: log::Level::Error as usize, - msg: ErrorFmt(e).to_string(), - }); - return; - } - }; - match res { - Forked::Parent { pid, pidfd } => { - drop(write); - let slf = self.clone(); - let spawn = self.ae.spawn("await spawn", async move { - let read = Rc::new(read); - if let Err(e) = slf.ring.readable(&read).await { - log::error!( - "Cannot wait for the child fd to become readable: {}", - ErrorFmt(e) - ); - slf.fail_pidfd(pidfd_id); - } else { - let mut s = String::new(); - let _ = Fd::new(read.raw()).read_to_string(&mut s); - if s.len() > 0 { - slf.outgoing.push(ForkerMessage::Log { - level: log::Level::Error as _, - msg: format!("Could not spawn `{}`: {}", prog, s), - }); - slf.fail_pidfd(pidfd_id); - } else { - if let Some(id) = pidfd_id { - slf.fds.borrow_mut().push(Rc::new(pidfd)); - slf.outgoing.push(ForkerMessage::PidFd { - id, - success: true, - pid, - }); - } - } - } - slf.pending_spawns.remove(&pid); - }); - self.pending_spawns.set(pid, spawn); - } - Forked::Child { .. } => { - let err: Result<(), SpawnError> = (|| { - if let Some(max_desired) = fds.iter().map(|v| v.0).max() { - write = uapi::fcntl_dupfd_cloexec(write.raw(), max_desired.wrapping_add(1)) - .map_os_err(SpawnError::Dupfd)?; - } - let fds = map_fds(fds)?; - for fd in fds { - let fd = fd.unwrap(); - let res: Result<_, Errno> = (|| { - uapi::fcntl_setfd(fd, uapi::fcntl_getfd(fd)? & !c::FD_CLOEXEC)?; - Ok(()) - })(); - res.map_os_err(SpawnError::Cloexec)?; - } - unsafe { - c::signal(c::SIGCHLD, c::SIG_DFL); - } - for (key, val) in env { - unsafe { - match val { - None => env::remove_var(&key), - Some(val) => env::set_var(&key, &val), - } - } - } - let prog = prog.into_ustr(); - let mut argsnt = UstrPtr::new(); - argsnt.push(&prog); - for arg in args { - argsnt.push(arg); - } - uapi::execvp(&prog, &argsnt).map_os_err(SpawnError::Exec)?; - Ok(()) - })(); - if let Err(e) = err { - let _ = write.write_all(ErrorFmt(e).to_string().as_bytes()); - } - std::process::exit(1); - } - } - } - - fn fail_pidfd(&self, pidfd_id: Option) { - if let Some(id) = pidfd_id { - self.outgoing.push(ForkerMessage::PidFd { - id, - success: false, - pid: 0, - }); - } - } -} - -#[derive(Debug, Error)] -enum SpawnError { - #[error("exec failed")] - Exec(#[source] crate::utils::oserror::OsError), - #[error("Could not unset cloexec flag")] - Cloexec(#[source] crate::utils::oserror::OsError), - #[error("dupfd faild")] - Dupfd(#[source] crate::utils::oserror::OsError), -} - -fn setup_fds(mut socket: OwnedFd) -> OwnedFd { - if socket.raw() != 0 { - uapi::dup3(socket.unwrap(), 0, 0).unwrap(); - socket = OwnedFd::new(0); - } - uapi::close_range(1, c::c_uint::MAX, 0).unwrap(); - uapi::dup3(socket.raw(), 3, c::O_CLOEXEC).unwrap(); - socket = OwnedFd::new(3); - let fd = uapi::open("/dev/null", c::O_RDWR, 0).unwrap().unwrap(); - assert!(fd == 0); - uapi::dup2(0, 1).unwrap(); - uapi::dup2(0, 2).unwrap(); - socket -} - -fn setup_deathsig(ppid: c::pid_t) { - unsafe { - let res = c::prctl(c::PR_SET_PDEATHSIG, c::SIGKILL as c::c_ulong); - uapi::map_err!(res).unwrap(); - if ppid != uapi::getppid() { - std::process::exit(0); - } - } -} - -fn map_fds(fds: Vec<(i32, OwnedFd)>) -> Result, SpawnError> { - let mut desired: Vec<_> = fds.iter().map(|v| v.0).collect(); - desired.sort_by(|a, b| b.cmp(a)); - let mut existing_to_desired: AHashMap<_, _> = fds.iter().map(|v| (v.1.raw(), v.0)).collect(); - let mut desired_to_existing: AHashMap<_, _> = fds.into_iter().map(|v| (v.0, v.1)).collect(); - for desired in desired { - let existing = desired_to_existing.get(&desired).unwrap().raw(); - if existing == desired { - continue; - } - if let Some(conflict_desired) = existing_to_desired.get(&desired).copied() { - let new = uapi::fcntl_dupfd_cloexec(desired, 0).map_os_err(SpawnError::Dupfd)?; - existing_to_desired.remove(&desired); - existing_to_desired.insert(new.raw(), conflict_desired); - desired_to_existing.insert(conflict_desired, new); - } - uapi::dup3(existing, desired, c::O_CLOEXEC).map_os_err(SpawnError::Dupfd)?; - existing_to_desired.remove(&existing); - existing_to_desired.insert(desired, desired); - desired_to_existing.insert(desired, OwnedFd::new(desired)); - } - Ok(desired_to_existing.into_values().collect()) -} diff --git a/src/forker/worker.rs b/src/forker/worker.rs new file mode 100644 index 00000000..584df6a0 --- /dev/null +++ b/src/forker/worker.rs @@ -0,0 +1,315 @@ +use { + crate::{ + async_engine::{AsyncEngine, SpawnedFuture}, + compositor::{DISPLAY, WAYLAND_DISPLAY}, + forker::{ + io::{IoIn, IoOut}, + protocol::{ForkerMessage, ServerMessage, bincode_ops}, + }, + io_uring::IoUring, + utils::{ + clone3::{Forked, fork_with_pidfd}, + copyhashmap::CopyHashMap, + errorfmt::ErrorFmt, + oserror::OsErrorExt2, + pipe::{Pipe, pipe}, + process_name::set_process_name, + queue::AsyncQueue, + }, + }, + ahash::AHashMap, + bincode::Options, + std::{ + cell::RefCell, + env, + ffi::OsStr, + io::{Read, Write}, + os::unix::ffi::OsStrExt, + rc::Rc, + }, + thiserror::Error, + uapi::{Errno, Fd, IntoUstr, OwnedFd, UstrPtr, c}, +}; + +pub(super) struct Forker { + socket: Rc, + ae: Rc, + ring: Rc, + fds: RefCell>>, + outgoing: AsyncQueue, + pending_spawns: CopyHashMap>, +} + +impl Forker { + pub(super) fn handle(ppid: c::pid_t, socket: OwnedFd) -> ! { + unsafe { + env::set_var("XDG_SESSION_TYPE", "wayland"); + env::remove_var(DISPLAY); + env::remove_var(WAYLAND_DISPLAY); + } + set_process_name("the ol' forker"); + setup_deathsig(ppid); + unsafe { + c::signal(c::SIGCHLD, c::SIG_IGN); + } + let socket = Rc::new(setup_fds(socket)); + std::panic::set_hook({ + let socket = socket.raw(); + Box::new(move |pi| { + let msg = ForkerMessage::Log { + level: log::Level::Error as _, + msg: format!("The ol' forker panicked: {}", pi), + }; + let msg = bincode_ops().serialize(&msg).unwrap(); + let _ = Fd::new(socket).write_all(&msg); + }) + }); + let ae = AsyncEngine::new(); + let ring = IoUring::new(&ae, 32).unwrap(); + let forker = Rc::new(Forker { + socket, + ae: ae.clone(), + ring: ring.clone(), + fds: RefCell::new(vec![]), + outgoing: Default::default(), + pending_spawns: Default::default(), + }); + let _f1 = ae.spawn("forker incoming", forker.clone().incoming()); + let _f2 = ae.spawn("forker outgoing", forker.clone().outgoing()); + let _ = ring.run(); + std::process::exit(1); + } + + async fn outgoing(self: Rc) { + let mut io = IoOut::new(&self.socket, &self.ring); + loop { + let msg = self.outgoing.pop().await; + for fd in self.fds.borrow_mut().drain(..) { + io.push_fd(fd); + } + if io.write_msg(msg).await.is_err() { + self.ring.stop(); + return; + } + } + } + + async fn incoming(self: Rc) { + let mut io = IoIn::new(&self.socket, &self.ring); + loop { + let msg = match io.read_msg().await { + Ok(m) => m, + _ => { + self.ring.stop(); + return; + } + }; + self.handle_msg(msg, &mut io); + } + } + + fn handle_msg(self: &Rc, msg: ServerMessage, io: &mut IoIn) { + match msg { + ServerMessage::SetEnv { var, val } => self.handle_set_env(&var, val), + ServerMessage::Spawn { + prog, + args, + env, + fds, + pidfd_id, + } => self.handle_spawn(prog, args, env, fds, io, pidfd_id), + } + } + + fn handle_set_env(self: &Rc, var: &[u8], val: Option>) { + let var = OsStr::from_bytes(var); + unsafe { + match val { + Some(val) => env::set_var(var, OsStr::from_bytes(&val)), + _ => env::remove_var(var), + } + } + } + + fn handle_spawn( + self: &Rc, + prog: String, + args: Vec, + env: Vec<(String, Option)>, + fds: Vec, + io: &mut IoIn, + pidfd_id: Option, + ) { + let fds = fds + .into_iter() + .map(|a| (a, Rc::try_unwrap(io.pop_fd().unwrap()).unwrap())) + .collect(); + self.spawn(prog, args, env, fds, pidfd_id) + } + + fn spawn( + self: &Rc, + prog: String, + args: Vec, + env: Vec<(String, Option)>, + fds: Vec<(i32, OwnedFd)>, + pidfd_id: Option, + ) { + let Pipe { read, mut write } = pipe().unwrap(); + let res = match fork_with_pidfd(false) { + Ok(o) => o, + Err(e) => { + self.fail_pidfd(pidfd_id); + self.outgoing.push(ForkerMessage::Log { + level: log::Level::Error as usize, + msg: ErrorFmt(e).to_string(), + }); + return; + } + }; + match res { + Forked::Parent { pid, pidfd } => { + drop(write); + let slf = self.clone(); + let spawn = self.ae.spawn("await spawn", async move { + let read = Rc::new(read); + if let Err(e) = slf.ring.readable(&read).await { + log::error!( + "Cannot wait for the child fd to become readable: {}", + ErrorFmt(e) + ); + slf.fail_pidfd(pidfd_id); + } else { + let mut s = String::new(); + let _ = Fd::new(read.raw()).read_to_string(&mut s); + if s.len() > 0 { + slf.outgoing.push(ForkerMessage::Log { + level: log::Level::Error as _, + msg: format!("Could not spawn `{}`: {}", prog, s), + }); + slf.fail_pidfd(pidfd_id); + } else if let Some(id) = pidfd_id { + slf.fds.borrow_mut().push(Rc::new(pidfd)); + slf.outgoing.push(ForkerMessage::PidFd { + id, + success: true, + pid, + }); + } + } + slf.pending_spawns.remove(&pid); + }); + self.pending_spawns.set(pid, spawn); + } + Forked::Child { .. } => { + let err: Result<(), SpawnError> = (|| { + if let Some(max_desired) = fds.iter().map(|v| v.0).max() { + write = uapi::fcntl_dupfd_cloexec(write.raw(), max_desired.wrapping_add(1)) + .map_os_err(SpawnError::Dupfd)?; + } + let fds = map_fds(fds)?; + for fd in fds { + let fd = fd.unwrap(); + let res: Result<_, Errno> = (|| { + uapi::fcntl_setfd(fd, uapi::fcntl_getfd(fd)? & !c::FD_CLOEXEC)?; + Ok(()) + })(); + res.map_os_err(SpawnError::Cloexec)?; + } + unsafe { + c::signal(c::SIGCHLD, c::SIG_DFL); + } + for (key, val) in env { + unsafe { + match val { + None => env::remove_var(&key), + Some(val) => env::set_var(&key, &val), + } + } + } + let prog = prog.into_ustr(); + let mut argsnt = UstrPtr::new(); + argsnt.push(&prog); + for arg in args { + argsnt.push(arg); + } + uapi::execvp(&prog, &argsnt).map_os_err(SpawnError::Exec)?; + Ok(()) + })(); + if let Err(e) = err { + let _ = write.write_all(ErrorFmt(e).to_string().as_bytes()); + } + std::process::exit(1); + } + } + } + + fn fail_pidfd(&self, pidfd_id: Option) { + if let Some(id) = pidfd_id { + self.outgoing.push(ForkerMessage::PidFd { + id, + success: false, + pid: 0, + }); + } + } +} + +#[derive(Debug, Error)] +enum SpawnError { + #[error("exec failed")] + Exec(#[source] crate::utils::oserror::OsError), + #[error("Could not unset cloexec flag")] + Cloexec(#[source] crate::utils::oserror::OsError), + #[error("dupfd faild")] + Dupfd(#[source] crate::utils::oserror::OsError), +} + +fn setup_fds(mut socket: OwnedFd) -> OwnedFd { + if socket.raw() != 0 { + uapi::dup3(socket.unwrap(), 0, 0).unwrap(); + socket = OwnedFd::new(0); + } + uapi::close_range(1, c::c_uint::MAX, 0).unwrap(); + uapi::dup3(socket.raw(), 3, c::O_CLOEXEC).unwrap(); + socket = OwnedFd::new(3); + let fd = uapi::open("/dev/null", c::O_RDWR, 0).unwrap().unwrap(); + assert!(fd == 0); + uapi::dup2(0, 1).unwrap(); + uapi::dup2(0, 2).unwrap(); + socket +} + +fn setup_deathsig(ppid: c::pid_t) { + unsafe { + let res = c::prctl(c::PR_SET_PDEATHSIG, c::SIGKILL as c::c_ulong); + uapi::map_err!(res).unwrap(); + if ppid != uapi::getppid() { + std::process::exit(0); + } + } +} + +fn map_fds(fds: Vec<(i32, OwnedFd)>) -> Result, SpawnError> { + let mut desired: Vec<_> = fds.iter().map(|v| v.0).collect(); + desired.sort_by(|a, b| b.cmp(a)); + let mut existing_to_desired: AHashMap<_, _> = fds.iter().map(|v| (v.1.raw(), v.0)).collect(); + let mut desired_to_existing: AHashMap<_, _> = fds.into_iter().map(|v| (v.0, v.1)).collect(); + for desired in desired { + let existing = desired_to_existing.get(&desired).unwrap().raw(); + if existing == desired { + continue; + } + if let Some(conflict_desired) = existing_to_desired.get(&desired).copied() { + let new = uapi::fcntl_dupfd_cloexec(desired, 0).map_os_err(SpawnError::Dupfd)?; + existing_to_desired.remove(&desired); + existing_to_desired.insert(new.raw(), conflict_desired); + desired_to_existing.insert(conflict_desired, new); + } + uapi::dup3(existing, desired, c::O_CLOEXEC).map_os_err(SpawnError::Dupfd)?; + existing_to_desired.remove(&existing); + existing_to_desired.insert(desired, desired); + desired_to_existing.insert(desired, OwnedFd::new(desired)); + } + Ok(desired_to_existing.into_values().collect()) +}