autocommit 2022-02-24 16:30:11 CET
This commit is contained in:
parent
666e475032
commit
7d28d30666
39 changed files with 1670 additions and 209 deletions
248
src/forker.rs
248
src/forker.rs
|
|
@ -3,23 +3,24 @@ mod io;
|
|||
|
||||
use crate::async_engine::{AsyncFd, SpawnedFuture};
|
||||
use crate::forker::clone3::{fork_with_pidfd, Forked};
|
||||
use crate::utils::buffd::{BufFdError};
|
||||
use crate::forker::io::{IoIn, IoOut};
|
||||
use crate::utils::buffd::BufFdError;
|
||||
use crate::utils::copyhashmap::CopyHashMap;
|
||||
use crate::{AsyncEngine, AsyncQueue, ErrorFmt, EventLoop, State, Wheel};
|
||||
use crate::{xwayland, AsyncEngine, AsyncQueue, ErrorFmt, EventLoop, NumCell, State, Wheel};
|
||||
use bincode::error::{DecodeError, EncodeError};
|
||||
use bincode::{Decode, Encode};
|
||||
use i4config::_private::bincode_ops;
|
||||
use log::Level;
|
||||
use std::cell::Cell;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::env;
|
||||
use std::ffi::OsStr;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::rc::Rc;
|
||||
use bincode::error::{DecodeError, EncodeError};
|
||||
use std::rc::{Rc, Weak};
|
||||
use std::task::{Poll, Waker};
|
||||
use thiserror::Error;
|
||||
use uapi::{c, pipe2, Fd, IntoUstr, OwnedFd, UstrPtr};
|
||||
use crate::forker::io::{IoIn, IoOut};
|
||||
use uapi::{c, pipe2, Errno, Fd, IntoUstr, OwnedFd, UstrPtr};
|
||||
|
||||
pub struct ForkerProxy {
|
||||
pidfd: Rc<OwnedFd>,
|
||||
|
|
@ -29,6 +30,14 @@ pub struct ForkerProxy {
|
|||
task_out: Cell<Option<SpawnedFuture<()>>>,
|
||||
task_proc: Cell<Option<SpawnedFuture<()>>>,
|
||||
outgoing: AsyncQueue<ServerMessage>,
|
||||
next_id: NumCell<u32>,
|
||||
pending_pidfds: CopyHashMap<u32, Weak<PidfdHandoff>>,
|
||||
fds: RefCell<Vec<Rc<OwnedFd>>>,
|
||||
}
|
||||
|
||||
struct PidfdHandoff {
|
||||
pidfd: Cell<Option<Result<OwnedFd, ForkerError>>>,
|
||||
waiter: Cell<Option<Waker>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
|
@ -45,6 +54,8 @@ pub enum ForkerError {
|
|||
DecodeFailed(#[source] DecodeError),
|
||||
#[error("Could not encode the next message")]
|
||||
EncodeFailed(#[source] EncodeError),
|
||||
#[error("Could not fork")]
|
||||
PidfdForkFailed,
|
||||
}
|
||||
|
||||
impl ForkerProxy {
|
||||
|
|
@ -67,6 +78,9 @@ impl ForkerProxy {
|
|||
task_out: Cell::new(None),
|
||||
task_proc: Cell::new(None),
|
||||
outgoing: Default::default(),
|
||||
next_id: Default::default(),
|
||||
pending_pidfds: Default::default(),
|
||||
fds: Default::default(),
|
||||
}),
|
||||
Forked::Child { .. } => Forker::handle(pid, child),
|
||||
}
|
||||
|
|
@ -90,12 +104,65 @@ impl ForkerProxy {
|
|||
pub fn setenv(&self, key: &[u8], val: &[u8]) {
|
||||
self.outgoing.push(ServerMessage::SetEnv {
|
||||
var: key.to_vec(),
|
||||
val: val.to_vec(),
|
||||
val: Some(val.to_vec()),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spawn(&self, prog: String, args: Vec<String>, env: Vec<(String, String)>) {
|
||||
self.outgoing.push(ServerMessage::Spawn { prog, args, env })
|
||||
pub fn unsetenv(&self, key: &[u8]) {
|
||||
self.outgoing.push(ServerMessage::SetEnv {
|
||||
var: key.to_vec(),
|
||||
val: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn pidfd(&self, id: u32) -> Result<OwnedFd, ForkerError> {
|
||||
let handoff = Rc::new(PidfdHandoff {
|
||||
pidfd: Cell::new(None),
|
||||
waiter: Cell::new(None),
|
||||
});
|
||||
self.pending_pidfds.set(id, Rc::downgrade(&handoff));
|
||||
futures::future::poll_fn(|ctx| {
|
||||
if let Some(pidfd) = handoff.pidfd.take() {
|
||||
Poll::Ready(pidfd)
|
||||
} else {
|
||||
handoff.waiter.set(Some(ctx.waker().clone()));
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn xwayland(
|
||||
&self,
|
||||
stderr: Rc<OwnedFd>,
|
||||
dfd: Rc<OwnedFd>,
|
||||
listenfd: Rc<OwnedFd>,
|
||||
wmfd: Rc<OwnedFd>,
|
||||
waylandfd: Rc<OwnedFd>,
|
||||
) -> Result<OwnedFd, ForkerError> {
|
||||
self.fds.borrow_mut().extend([stderr, dfd, listenfd, wmfd, waylandfd]);
|
||||
let id = self.next_id.fetch_add(1);
|
||||
self.outgoing.push(ServerMessage::Xwayland { id });
|
||||
self.pidfd(id).await
|
||||
}
|
||||
|
||||
pub fn spawn(
|
||||
&self,
|
||||
prog: String,
|
||||
args: Vec<String>,
|
||||
env: Vec<(String, String)>,
|
||||
stderr: Option<Rc<OwnedFd>>,
|
||||
) {
|
||||
let have_stderr = stderr.is_some();
|
||||
if let Some(stderr) = stderr {
|
||||
self.fds.borrow_mut().push(stderr);
|
||||
}
|
||||
self.outgoing.push(ServerMessage::Spawn {
|
||||
prog,
|
||||
args,
|
||||
env,
|
||||
stderr: have_stderr,
|
||||
})
|
||||
}
|
||||
|
||||
async fn incoming(self: Rc<Self>, socket: AsyncFd) {
|
||||
|
|
@ -109,13 +176,29 @@ impl ForkerProxy {
|
|||
return;
|
||||
}
|
||||
};
|
||||
self.handle_msg(msg);
|
||||
self.handle_msg(msg, &mut io);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_msg(&self, msg: ForkerMessage) {
|
||||
fn handle_msg(&self, msg: ForkerMessage, io: &mut IoIn) {
|
||||
match msg {
|
||||
ForkerMessage::Log { level, msg } => self.handle_log(level, &msg),
|
||||
ForkerMessage::PidFd { id, success } => self.handle_pidfd(id, success, io),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pidfd(&self, id: u32, success: bool, io: &mut IoIn) {
|
||||
let res = match success {
|
||||
true => Ok(io.pop_fd().unwrap()),
|
||||
_ => Err(ForkerError::PidfdForkFailed),
|
||||
};
|
||||
if let Some(handoff) = self.pending_pidfds.remove(&id) {
|
||||
if let Some(handoff) = handoff.upgrade() {
|
||||
handoff.pidfd.set(Some(res));
|
||||
if let Some(w) = handoff.waiter.take() {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -135,6 +218,9 @@ impl ForkerProxy {
|
|||
let mut io = IoOut::new(socket);
|
||||
loop {
|
||||
let msg = self.outgoing.pop().await;
|
||||
for fd in self.fds.borrow_mut().drain(..) {
|
||||
io.push_fd(fd);
|
||||
}
|
||||
if let Err(e) = io.write_msg(msg).await {
|
||||
log::error!("Could not write to the ol' forker: {}", ErrorFmt(e));
|
||||
state.forker.set(None);
|
||||
|
|
@ -159,23 +245,29 @@ impl ForkerProxy {
|
|||
enum ServerMessage {
|
||||
SetEnv {
|
||||
var: Vec<u8>,
|
||||
val: Vec<u8>,
|
||||
val: Option<Vec<u8>>,
|
||||
},
|
||||
Spawn {
|
||||
prog: String,
|
||||
args: Vec<String>,
|
||||
env: Vec<(String, String)>,
|
||||
stderr: bool,
|
||||
},
|
||||
Xwayland {
|
||||
id: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Encode, Decode)]
|
||||
enum ForkerMessage {
|
||||
Log { level: usize, msg: String },
|
||||
PidFd { id: u32, success: bool },
|
||||
}
|
||||
|
||||
struct Forker {
|
||||
socket: AsyncFd,
|
||||
ae: Rc<AsyncEngine>,
|
||||
fds: RefCell<Vec<Rc<OwnedFd>>>,
|
||||
outgoing: AsyncQueue<ForkerMessage>,
|
||||
pending_spawns: CopyHashMap<c::pid_t, SpawnedFuture<()>>,
|
||||
}
|
||||
|
|
@ -206,6 +298,7 @@ impl Forker {
|
|||
let forker = Rc::new(Forker {
|
||||
socket: ae.fd(&socket).unwrap(),
|
||||
ae: ae.clone(),
|
||||
fds: RefCell::new(vec![]),
|
||||
outgoing: Default::default(),
|
||||
pending_spawns: Default::default(),
|
||||
});
|
||||
|
|
@ -219,6 +312,9 @@ impl Forker {
|
|||
let mut io = IoOut::new(self.socket.clone());
|
||||
loop {
|
||||
let msg = self.outgoing.pop().await;
|
||||
for fd in self.fds.borrow_mut().drain(..) {
|
||||
io.push_fd(fd);
|
||||
}
|
||||
io.write_msg(msg).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
@ -227,26 +323,76 @@ impl Forker {
|
|||
let mut io = IoIn::new(self.socket.clone());
|
||||
loop {
|
||||
let msg = io.read_msg().await.unwrap();
|
||||
self.handle_msg(msg);
|
||||
self.handle_msg(msg, &mut io);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_msg(self: &Rc<Self>, msg: ServerMessage) {
|
||||
fn handle_msg(self: &Rc<Self>, msg: ServerMessage, io: &mut IoIn) {
|
||||
match msg {
|
||||
ServerMessage::SetEnv { var, val } => self.handle_set_env(&var, &val),
|
||||
ServerMessage::Spawn { prog, args, env } => self.handle_spawn(prog, args, env),
|
||||
ServerMessage::SetEnv { var, val } => self.handle_set_env(&var, val),
|
||||
ServerMessage::Spawn {
|
||||
prog,
|
||||
args,
|
||||
env,
|
||||
stderr,
|
||||
} => self.handle_spawn(prog, args, env, stderr, io),
|
||||
ServerMessage::Xwayland { id } => self.handle_xwayland(io, id),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_set_env(self: &Rc<Self>, var: &[u8], val: &[u8]) {
|
||||
env::set_var(OsStr::from_bytes(var), OsStr::from_bytes(val));
|
||||
fn handle_set_env(self: &Rc<Self>, var: &[u8], val: Option<Vec<u8>>) {
|
||||
let var = OsStr::from_bytes(var);
|
||||
match val {
|
||||
Some(val) => env::set_var(var, OsStr::from_bytes(&val)),
|
||||
_ => env::remove_var(var),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_spawn(self: &Rc<Self>, prog: String, args: Vec<String>, env: Vec<(String, String)>) {
|
||||
fn handle_xwayland(self: &Rc<Self>, io: &mut IoIn, id: u32) {
|
||||
let stderr = io.pop_fd();
|
||||
let fds = vec![
|
||||
io.pop_fd().unwrap(),
|
||||
io.pop_fd().unwrap(),
|
||||
io.pop_fd().unwrap(),
|
||||
io.pop_fd().unwrap(),
|
||||
];
|
||||
let (prog, args) = xwayland::build_args(&fds);
|
||||
let env = vec![("WAYLAND_SOCKET".to_string(), fds[3].raw().to_string())];
|
||||
self.spawn(prog, args, env, stderr, fds, Some(id));
|
||||
}
|
||||
|
||||
fn handle_spawn(
|
||||
self: &Rc<Self>,
|
||||
prog: String,
|
||||
args: Vec<String>,
|
||||
env: Vec<(String, String)>,
|
||||
stderr: bool,
|
||||
io: &mut IoIn,
|
||||
) {
|
||||
let stderr = match stderr {
|
||||
true => io.pop_fd(),
|
||||
_ => None,
|
||||
};
|
||||
self.spawn(prog, args, env, stderr, vec![], None)
|
||||
}
|
||||
|
||||
fn spawn(
|
||||
self: &Rc<Self>,
|
||||
prog: String,
|
||||
args: Vec<String>,
|
||||
env: Vec<(String, String)>,
|
||||
stderr: Option<OwnedFd>,
|
||||
fds: Vec<OwnedFd>,
|
||||
pidfd_id: Option<u32>,
|
||||
) {
|
||||
let (read, mut write) = pipe2(c::O_CLOEXEC).unwrap();
|
||||
let res = match fork_with_pidfd(false) {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
if let Some(id) = pidfd_id {
|
||||
self.outgoing
|
||||
.push(ForkerMessage::PidFd { id, success: false });
|
||||
}
|
||||
self.outgoing.push(ForkerMessage::Log {
|
||||
level: log::Level::Error as usize,
|
||||
msg: ErrorFmt(e).to_string(),
|
||||
|
|
@ -255,7 +401,12 @@ impl Forker {
|
|||
}
|
||||
};
|
||||
match res {
|
||||
Forked::Parent { pid, .. } => {
|
||||
Forked::Parent { pid, pidfd } => {
|
||||
if let Some(id) = pidfd_id {
|
||||
self.fds.borrow_mut().push(Rc::new(pidfd));
|
||||
self.outgoing
|
||||
.push(ForkerMessage::PidFd { id, success: true });
|
||||
}
|
||||
drop(write);
|
||||
let slf = self.clone();
|
||||
let spawn = self.ae.spawn(async move {
|
||||
|
|
@ -274,20 +425,39 @@ impl Forker {
|
|||
self.pending_spawns.set(pid, spawn);
|
||||
}
|
||||
Forked::Child { .. } => {
|
||||
unsafe {
|
||||
c::signal(c::SIGCHLD, c::SIG_DFL);
|
||||
}
|
||||
for (key, val) in env {
|
||||
env::set_var(&key, &val);
|
||||
}
|
||||
let prog = prog.into_ustr();
|
||||
let mut argsnt = UstrPtr::new();
|
||||
argsnt.push(&prog);
|
||||
for arg in args {
|
||||
argsnt.push(arg);
|
||||
}
|
||||
if let Err(e) = uapi::execvp(&prog, &argsnt) {
|
||||
let _ = write.write_all(std::io::Error::from(e).to_string().as_bytes());
|
||||
let err = (|| {
|
||||
if let Some(stderr) = stderr {
|
||||
uapi::dup2(stderr.raw(), 2).unwrap();
|
||||
}
|
||||
for fd in fds {
|
||||
let fd = fd.unwrap();
|
||||
let res: Result<_, Errno> = (|| {
|
||||
uapi::fcntl_setfd(fd, uapi::fcntl_getfd(fd)? & !c::FD_CLOEXEC)?;
|
||||
Ok(())
|
||||
})();
|
||||
if let Err(e) = res {
|
||||
return Err(SpawnError::Cloexec(e.into()));
|
||||
}
|
||||
}
|
||||
unsafe {
|
||||
c::signal(c::SIGCHLD, c::SIG_DFL);
|
||||
}
|
||||
for (key, val) in env {
|
||||
env::set_var(&key, &val);
|
||||
}
|
||||
let prog = prog.into_ustr();
|
||||
let mut argsnt = UstrPtr::new();
|
||||
argsnt.push(&prog);
|
||||
for arg in args {
|
||||
argsnt.push(arg);
|
||||
}
|
||||
if let Err(e) = uapi::execvp(&prog, &argsnt) {
|
||||
return Err(SpawnError::Exec(e.into()));
|
||||
}
|
||||
Ok(())
|
||||
})();
|
||||
if let Err(e) = err {
|
||||
let _ = write.write_all(ErrorFmt(e).to_string().as_bytes());
|
||||
}
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
|
@ -295,6 +465,14 @@ impl Forker {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum SpawnError {
|
||||
#[error("exec failed")]
|
||||
Exec(#[source] std::io::Error),
|
||||
#[error("Could not unset cloexec flag")]
|
||||
Cloexec(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
fn setup_fds(mut socket: OwnedFd) -> OwnedFd {
|
||||
if socket.raw() != 0 {
|
||||
uapi::dup3(socket.unwrap(), 0, 0).unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue