1
0
Fork 0
forked from wry/wry

autocommit 2022-02-22 14:17:48 CET

This commit is contained in:
Julian Orth 2022-02-22 14:17:48 +01:00
parent 145d1c15b7
commit 666e475032
5 changed files with 114 additions and 50 deletions

View file

@ -1,10 +1,10 @@
mod clone3;
mod io;
use crate::async_engine::{AsyncFd, SpawnedFuture};
use crate::forker::clone3::{fork_with_pidfd, Forked};
use crate::utils::buffd::{BufFdIn, BufFdOut};
use crate::utils::buffd::{BufFdError};
use crate::utils::copyhashmap::CopyHashMap;
use crate::utils::vec_ext::VecExt;
use crate::{AsyncEngine, AsyncQueue, ErrorFmt, EventLoop, State, Wheel};
use bincode::{Decode, Encode};
use i4config::_private::bincode_ops;
@ -16,8 +16,10 @@ use std::io::Read;
use std::io::Write;
use std::os::unix::ffi::OsStrExt;
use std::rc::Rc;
use bincode::error::{DecodeError, EncodeError};
use thiserror::Error;
use uapi::{c, pipe2, Fd, IntoUstr, OwnedFd, UstrPtr};
use crate::forker::io::{IoIn, IoOut};
pub struct ForkerProxy {
pidfd: Rc<OwnedFd>,
@ -35,6 +37,14 @@ pub enum ForkerError {
Socketpair(#[source] std::io::Error),
#[error("Could not fork")]
Fork(#[source] std::io::Error),
#[error("Could not read the next message")]
ReadFailed(#[source] BufFdError),
#[error("Could not write the next message")]
WriteFailed(#[source] BufFdError),
#[error("Could not decode the next message")]
DecodeFailed(#[source] DecodeError),
#[error("Could not encode the next message")]
EncodeFailed(#[source] EncodeError),
}
impl ForkerProxy {
@ -89,24 +99,16 @@ impl ForkerProxy {
}
async fn incoming(self: Rc<Self>, socket: AsyncFd) {
let mut buffd = BufFdIn::new(socket);
let mut buf = vec![];
let mut io = IoIn::new(socket);
loop {
let mut len = 0usize;
if let Err(e) = buffd.read_full(&mut len).await {
log::error!("Cannot read from the ol' forker: {}", ErrorFmt(e));
self.task_in.take();
return;
}
buf.clear();
buf.reserve(len);
let space = buf.split_at_spare_mut_ext().1;
buffd.read_full(&mut space[..len]).await.unwrap();
unsafe {
buf.set_len(len);
}
let (msg, _) =
bincode::decode_from_slice::<ForkerMessage, _>(&buf, bincode_ops()).unwrap();
let msg = match io.read_msg().await {
Ok(msg) => msg,
Err(e) => {
log::error!("Could not read from the ol' forker: {}", ErrorFmt(e));
self.task_in.take();
return;
}
};
self.handle_msg(msg);
}
}
@ -130,16 +132,10 @@ impl ForkerProxy {
}
async fn outgoing(self: Rc<Self>, state: Rc<State>, socket: AsyncFd) {
let mut buffd = BufFdOut::new(socket);
let mut buf = vec![];
let mut fds = vec![];
let mut io = IoOut::new(socket);
loop {
let msg = self.outgoing.pop().await;
buf.clear();
buf.extend_from_slice(uapi::as_bytes(&0usize));
let len = bincode::encode_into_std_write(&msg, &mut buf, bincode_ops()).unwrap();
let _ = (&mut buf[..]).write_all(uapi::as_bytes(&len));
if let Err(e) = buffd.flush2(&buf, &mut fds).await {
if let Err(e) = io.write_msg(msg).await {
log::error!("Could not write to the ol' forker: {}", ErrorFmt(e));
state.forker.set(None);
self.task_out.take();
@ -220,34 +216,17 @@ impl Forker {
}
async fn outgoing(self: Rc<Self>) {
let mut buffd = BufFdOut::new(self.socket.clone());
let mut buf = vec![];
let mut fds = vec![];
let mut io = IoOut::new(self.socket.clone());
loop {
let msg = self.outgoing.pop().await;
buf.clear();
buf.extend_from_slice(uapi::as_bytes(&0usize));
let len = bincode::encode_into_std_write(&msg, &mut buf, bincode_ops()).unwrap();
let _ = (&mut buf[..]).write_all(uapi::as_bytes(&len));
buffd.flush2(&buf, &mut fds).await.unwrap();
io.write_msg(msg).await.unwrap();
}
}
async fn incoming(self: Rc<Self>) {
let mut buffd = BufFdIn::new(self.socket.clone());
let mut buf = vec![];
let mut io = IoIn::new(self.socket.clone());
loop {
let mut len = 0usize;
buffd.read_full(&mut len).await.unwrap();
buf.clear();
buf.reserve(len);
let space = buf.split_at_spare_mut_ext().1;
buffd.read_full(&mut space[..len]).await.unwrap();
unsafe {
buf.set_len(len);
}
let (msg, _) =
bincode::decode_from_slice::<ServerMessage, _>(&buf, bincode_ops()).unwrap();
let msg = io.read_msg().await.unwrap();
self.handle_msg(msg);
}
}