From 666e475032cc349b63fc7068dd4d25cbf3fe6d04 Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Tue, 22 Feb 2022 14:17:48 +0100 Subject: [PATCH] autocommit 2022-02-22 14:17:48 CET --- src/forker.rs | 75 ++++++++++++---------------------- src/forker/io.rs | 84 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/utils/buffd/buf_out.rs | 4 +- src/xwayland.rs | 0 5 files changed, 114 insertions(+), 50 deletions(-) create mode 100644 src/forker/io.rs create mode 100644 src/xwayland.rs diff --git a/src/forker.rs b/src/forker.rs index 0bdf08c1..d4707b03 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -1,10 +1,10 @@ mod clone3; +mod io; use crate::async_engine::{AsyncFd, SpawnedFuture}; use crate::forker::clone3::{fork_with_pidfd, Forked}; -use crate::utils::buffd::{BufFdIn, BufFdOut}; +use crate::utils::buffd::{BufFdError}; use crate::utils::copyhashmap::CopyHashMap; -use crate::utils::vec_ext::VecExt; use crate::{AsyncEngine, AsyncQueue, ErrorFmt, EventLoop, State, Wheel}; use bincode::{Decode, Encode}; use i4config::_private::bincode_ops; @@ -16,8 +16,10 @@ use std::io::Read; use std::io::Write; use std::os::unix::ffi::OsStrExt; use std::rc::Rc; +use bincode::error::{DecodeError, EncodeError}; use thiserror::Error; use uapi::{c, pipe2, Fd, IntoUstr, OwnedFd, UstrPtr}; +use crate::forker::io::{IoIn, IoOut}; pub struct ForkerProxy { pidfd: Rc, @@ -35,6 +37,14 @@ pub enum ForkerError { Socketpair(#[source] std::io::Error), #[error("Could not fork")] Fork(#[source] std::io::Error), + #[error("Could not read the next message")] + ReadFailed(#[source] BufFdError), + #[error("Could not write the next message")] + WriteFailed(#[source] BufFdError), + #[error("Could not decode the next message")] + DecodeFailed(#[source] DecodeError), + #[error("Could not encode the next message")] + EncodeFailed(#[source] EncodeError), } impl ForkerProxy { @@ -89,24 +99,16 @@ impl ForkerProxy { } async fn incoming(self: Rc, socket: AsyncFd) { - let mut buffd = BufFdIn::new(socket); - let mut buf = vec![]; + let mut io = IoIn::new(socket); loop { - let mut len = 0usize; - if let Err(e) = buffd.read_full(&mut len).await { - log::error!("Cannot read from the ol' forker: {}", ErrorFmt(e)); - self.task_in.take(); - return; - } - buf.clear(); - buf.reserve(len); - let space = buf.split_at_spare_mut_ext().1; - buffd.read_full(&mut space[..len]).await.unwrap(); - unsafe { - buf.set_len(len); - } - let (msg, _) = - bincode::decode_from_slice::(&buf, bincode_ops()).unwrap(); + let msg = match io.read_msg().await { + Ok(msg) => msg, + Err(e) => { + log::error!("Could not read from the ol' forker: {}", ErrorFmt(e)); + self.task_in.take(); + return; + } + }; self.handle_msg(msg); } } @@ -130,16 +132,10 @@ impl ForkerProxy { } async fn outgoing(self: Rc, state: Rc, socket: AsyncFd) { - let mut buffd = BufFdOut::new(socket); - let mut buf = vec![]; - let mut fds = vec![]; + let mut io = IoOut::new(socket); loop { let msg = self.outgoing.pop().await; - buf.clear(); - buf.extend_from_slice(uapi::as_bytes(&0usize)); - let len = bincode::encode_into_std_write(&msg, &mut buf, bincode_ops()).unwrap(); - let _ = (&mut buf[..]).write_all(uapi::as_bytes(&len)); - if let Err(e) = buffd.flush2(&buf, &mut fds).await { + if let Err(e) = io.write_msg(msg).await { log::error!("Could not write to the ol' forker: {}", ErrorFmt(e)); state.forker.set(None); self.task_out.take(); @@ -220,34 +216,17 @@ impl Forker { } async fn outgoing(self: Rc) { - let mut buffd = BufFdOut::new(self.socket.clone()); - let mut buf = vec![]; - let mut fds = vec![]; + let mut io = IoOut::new(self.socket.clone()); loop { let msg = self.outgoing.pop().await; - buf.clear(); - buf.extend_from_slice(uapi::as_bytes(&0usize)); - let len = bincode::encode_into_std_write(&msg, &mut buf, bincode_ops()).unwrap(); - let _ = (&mut buf[..]).write_all(uapi::as_bytes(&len)); - buffd.flush2(&buf, &mut fds).await.unwrap(); + io.write_msg(msg).await.unwrap(); } } async fn incoming(self: Rc) { - let mut buffd = BufFdIn::new(self.socket.clone()); - let mut buf = vec![]; + let mut io = IoIn::new(self.socket.clone()); loop { - let mut len = 0usize; - buffd.read_full(&mut len).await.unwrap(); - buf.clear(); - buf.reserve(len); - let space = buf.split_at_spare_mut_ext().1; - buffd.read_full(&mut space[..len]).await.unwrap(); - unsafe { - buf.set_len(len); - } - let (msg, _) = - bincode::decode_from_slice::(&buf, bincode_ops()).unwrap(); + let msg = io.read_msg().await.unwrap(); self.handle_msg(msg); } } diff --git a/src/forker/io.rs b/src/forker/io.rs new file mode 100644 index 00000000..f70f5257 --- /dev/null +++ b/src/forker/io.rs @@ -0,0 +1,84 @@ +use std::mem; +use std::rc::Rc; +use bincode::{Decode, Encode}; +use bincode::error::EncodeError; +use uapi::OwnedFd; +use i4config::_private::bincode_ops; +use crate::async_engine::AsyncFd; +use crate::ForkerError; +use crate::utils::buffd::{BufFdIn, BufFdOut}; +use crate::utils::vec_ext::VecExt; + +pub struct IoIn { + incoming: BufFdIn, + scratch: Vec, +} + +impl IoIn { + pub fn new(fd: AsyncFd) -> Self { + Self { + incoming: BufFdIn::new(fd), + scratch: vec![] + } + } + + pub fn pop_fd(&mut self) -> Option { + self.incoming.get_fd().ok() + } + + pub async fn read_msg(&mut self) -> Result { + let mut len = 0usize; + if let Err(e) = self.incoming.read_full(&mut len).await { + return Err(ForkerError::ReadFailed(e)); + } + self.scratch.clear(); + self.scratch.reserve(len); + let space = self.scratch.split_at_spare_mut_ext().1; + if let Err(e) = self.incoming.read_full(&mut space[..len]).await { + return Err(ForkerError::ReadFailed(e)); + } + unsafe { + self.scratch.set_len(len); + } + let res = bincode::decode_from_slice::(&&self.scratch, bincode_ops()); + match res { + Ok((msg, _)) => Ok(msg), + Err(e) => Err(ForkerError::DecodeFailed(e)), + } + } +} + +pub struct IoOut { + outgoing: BufFdOut, + scratch: Vec, + fds: Vec> +} + +impl IoOut { + pub fn new(fd: AsyncFd) -> Self { + Self { + outgoing: BufFdOut::new(fd), + scratch: vec![], + fds: vec![] + } + } + + pub fn push_fd(&mut self, fd: Rc) { + self.fds.push(fd); + } + + pub async fn write_msg(&mut self, msg: T) -> Result<(), ForkerError> { + self.scratch.clear(); + self.scratch.extend_from_slice(uapi::as_bytes(&0usize)); + let res = bincode::encode_into_std_write(&msg, &mut self.scratch, bincode_ops()); + let len = match res { + Ok(l) => l, + Err(e) => return Err(ForkerError::EncodeFailed(e)), + }; + self.scratch[..mem::size_of_val(&len)].copy_from_slice(uapi::as_bytes(&len)); + match self.outgoing.flush2(&self.scratch, &mut self.fds).await { + Ok(()) => Ok(()), + Err(e) => Err(ForkerError::WriteFailed(e)), + } + } +} diff --git a/src/main.rs b/src/main.rs index e5e1d222..32fe2f11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,6 +81,7 @@ mod utils; mod wheel; mod wire; mod xkbcommon; +mod xwayland; fn main() { env_logger::builder() diff --git a/src/utils/buffd/buf_out.rs b/src/utils/buffd/buf_out.rs index 8deae34b..7a66dc44 100644 --- a/src/utils/buffd/buf_out.rs +++ b/src/utils/buffd/buf_out.rs @@ -157,7 +157,7 @@ impl BufFdOut { Ok(false) } - pub async fn flush2(&mut self, buf: &[u8], fds: &mut Vec) -> Result<(), BufFdError> { + pub async fn flush2(&mut self, buf: &[u8], fds: &mut Vec>) -> Result<(), BufFdError> { let mut read_pos = 0; while read_pos < buf.len() { if self.flush_sync2(&mut read_pos, buf, fds)? { @@ -171,7 +171,7 @@ impl BufFdOut { &mut self, read_pos: &mut usize, buf: &[u8], - fds: &mut Vec, + fds: &mut Vec>, ) -> Result { let mut cmsg_len = 0; let mut fds_opt = None; diff --git a/src/xwayland.rs b/src/xwayland.rs new file mode 100644 index 00000000..e69de29b