1
0
Fork 0
forked from wry/wry

utils: add pipe util

This commit is contained in:
Julian Orth 2026-02-24 14:03:55 +01:00
parent 05476d68f3
commit 73451550ba
9 changed files with 107 additions and 30 deletions

View file

@ -7,8 +7,14 @@ use {
async_engine::{AsyncEngine, SpawnedFuture},
io_uring::IoUring,
utils::{
buf::TypedBuf, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, oserror::OsError,
ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack,
buf::TypedBuf,
copyhashmap::CopyHashMap,
errorfmt::ErrorFmt,
oserror::OsError,
pipe::{Pipe, pipe},
ptr_ext::MutPtrExt,
queue::AsyncQueue,
stack::Stack,
},
},
parking_lot::{Condvar, Mutex},
@ -261,8 +267,10 @@ impl CpuWorker {
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> {
let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default();
let completed_jobs: Arc<Mutex<CompletedJobsExchange>> = Default::default();
let (stop_read, stop_write) =
uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?;
let Pipe {
read: stop_read,
write: stop_write,
} = pipe().map_err(CpuWorkerError::Pipe)?;
let have_new_jobs =
uapi::eventfd(0, c::EFD_CLOEXEC).map_err(|e| CpuWorkerError::EventFd(e.into()))?;
let have_completed_jobs =

View file

@ -13,6 +13,7 @@ use {
copyhashmap::CopyHashMap,
errorfmt::ErrorFmt,
numcell::NumCell,
pipe::{Pipe, pipe},
process_name::set_process_name,
queue::AsyncQueue,
},
@ -33,7 +34,7 @@ use {
task::{Poll, Waker},
},
thiserror::Error,
uapi::{Errno, Fd, IntoUstr, OwnedFd, UstrPtr, c, pipe2},
uapi::{Errno, Fd, IntoUstr, OwnedFd, UstrPtr, c},
};
pub struct ForkerProxy {
@ -446,7 +447,7 @@ impl Forker {
fds: Vec<(i32, OwnedFd)>,
pidfd_id: Option<u32>,
) {
let (read, mut write) = pipe2(c::O_CLOEXEC).unwrap();
let Pipe { read, mut write } = pipe().unwrap();
let res = match fork_with_pidfd(false) {
Ok(o) => o,
Err(e) => {

View file

@ -7,12 +7,13 @@ use {
clone3::{Forked, fork_with_pidfd},
errorfmt::ErrorFmt,
oserror::OsError,
pipe::{Pipe, pipe},
},
wire::{JayReexecId, jay_reexec::*},
},
std::{array::from_mut, cell::RefCell, rc::Rc},
thiserror::Error,
uapi::{OwnedFd, UstrPtr, c, close_range, dup2, pipe2, waitpid},
uapi::{OwnedFd, UstrPtr, c, close_range, dup2, waitpid},
};
pub struct JayReexec {
@ -39,17 +40,23 @@ impl JayReexec {
}
macro_rules! pipe {
() => {
match pipe2(c::O_CLOEXEC) {
match pipe() {
Ok(p) => p,
Err(e) => {
log::error!("Could not create pipe: {}", ErrorFmt(OsError::from(e)));
log::error!("Could not create pipe: {}", ErrorFmt(e));
return None;
}
}
};
}
let (p1, c1) = pipe!();
let (c2, p2) = pipe!();
let Pipe {
read: p1,
write: c1,
} = pipe!();
let Pipe {
read: c2,
write: p2,
} = pipe!();
if let Ok(f) = fork_with_pidfd(false) {
match f {
Forked::Parent { pid, .. } => {

View file

@ -6,7 +6,10 @@ use {
test_transport::TestTransport,
testrun::ParseFull,
},
utils::buffd::MsgParser,
utils::{
buffd::MsgParser,
pipe::{Pipe, pipe},
},
wire::{ZwlrDataControlOfferV1Id, zwlr_data_control_offer_v1::*},
},
ahash::AHashSet,
@ -14,7 +17,7 @@ use {
cell::{Cell, RefCell},
rc::Rc,
},
uapi::{OwnedFd, c},
uapi::OwnedFd,
};
pub struct TestDataControlOffer {
@ -33,7 +36,7 @@ impl TestDataControlOffer {
}
pub fn receive(&self, mime_type: &str) -> TestResult<Rc<OwnedFd>> {
let (read, write) = uapi::pipe2(c::O_CLOEXEC)?;
let Pipe { read, write } = pipe()?;
self.tran.send(Receive {
self_id: self.id,
mime_type,

View file

@ -33,6 +33,7 @@ use {
line_logger::log_lines,
numcell::NumCell,
oserror::OsError,
pipe::{Pipe, pipe},
process_name::set_process_name,
run_toplevel::RunToplevel,
xrd::xrd,
@ -130,9 +131,9 @@ impl PortalStartup {
}
pub fn run_from_compositor(level: Level) -> Result<PortalStartup, PortalError> {
let (read, write) = match uapi::pipe2(c::O_CLOEXEC) {
let Pipe { read, write } = match pipe() {
Ok(p) => p,
Err(e) => return Err(PortalError::CreatePipe(e.into())),
Err(e) => return Err(PortalError::CreatePipe(e)),
};
let fork = match fork_with_pidfd(false) {
Ok(f) => f,
@ -153,10 +154,10 @@ pub fn run_from_compositor(level: Level) -> Result<PortalStartup, PortalError> {
}
fn run(logger: Arc<Logger>, freestanding: bool) -> ! {
let (read, write) = match uapi::pipe2(c::O_CLOEXEC) {
let Pipe { read, write } = match pipe() {
Ok(p) => p,
Err(e) => {
fatal!("Could not create a pipe: {}", ErrorFmt(OsError::from(e)));
fatal!("Could not create a pipe: {}", ErrorFmt(e));
}
};
let fork = match fork_with_pidfd(false) {

View file

@ -44,6 +44,7 @@ pub mod page_size;
pub mod pending_serial;
pub mod pid_info;
pub mod pidfd_send_signal;
pub mod pipe;
pub mod process_name;
pub mod ptr_ext;
pub mod queue;

32
src/utils/pipe.rs Normal file
View file

@ -0,0 +1,32 @@
use {
crate::utils::oserror::OsError,
uapi::{OwnedFd, c, pipe2},
};
pub struct Pipe<L, R> {
pub read: L,
pub write: R,
}
pub fn pipe() -> Result<Pipe<OwnedFd, OwnedFd>, OsError> {
let (read, write) = pipe2(c::O_CLOEXEC)?;
Ok(Pipe { read, write })
}
impl<L, R> Pipe<L, R> {
#[expect(dead_code)]
pub fn map_read<Lprime>(self, map: impl FnOnce(L) -> Lprime) -> Pipe<Lprime, R> {
Pipe {
read: map(self.read),
write: self.write,
}
}
#[expect(dead_code)]
pub fn map_write<Rprime>(self, map: impl FnOnce(R) -> Rprime) -> Pipe<L, Rprime> {
Pipe {
read: self.read,
write: map(self.write),
}
}
}

View file

@ -15,7 +15,13 @@ use {
security_context_acceptor::AcceptorMetadata,
state::State,
user_session::import_environment,
utils::{buf::Buf, errorfmt::ErrorFmt, line_logger::log_lines, oserror::OsError},
utils::{
buf::Buf,
errorfmt::ErrorFmt,
line_logger::log_lines,
oserror::OsError,
pipe::{Pipe, pipe},
},
wire::WlSurfaceId,
xcon::XconError,
xwayland::{
@ -27,7 +33,7 @@ use {
run_on_drop::on_drop,
std::{num::ParseIntError, rc::Rc},
thiserror::Error,
uapi::{OwnedFd, c, pipe2},
uapi::{OwnedFd, c},
};
#[derive(Debug, Error)]
@ -140,13 +146,19 @@ async fn run(
forker: &Rc<ForkerProxy>,
socket: Rc<OwnedFd>,
) -> Result<(), XWaylandError> {
let (dfdread, dfdwrite) = match pipe2(c::O_CLOEXEC) {
let Pipe {
read: dfdread,
write: dfdwrite,
} = match pipe() {
Ok(p) => p,
Err(e) => return Err(XWaylandError::Pipe(e.into())),
Err(e) => return Err(XWaylandError::Pipe(e)),
};
let (stderr_read, stderr_write) = match pipe2(c::O_CLOEXEC) {
let Pipe {
read: stderr_read,
write: stderr_write,
} = match pipe() {
Ok(p) => p,
Err(e) => return Err(XWaylandError::Pipe(e.into())),
Err(e) => return Err(XWaylandError::Pipe(e)),
};
let wm = uapi::socketpair(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0);
let (wm1, wm2) = match wm {
@ -244,7 +256,7 @@ struct XwaylandFeatures {
async fn detect_features(state: &State, forker: &ForkerProxy) -> XwaylandFeatures {
let mut features = Default::default();
let Ok((read, write)) = pipe2(c::O_CLOEXEC) else {
let Ok(Pipe { read, write }) = pipe() else {
return features;
};
forker.spawn(

View file

@ -25,9 +25,18 @@ use {
state::State,
tree::{Node, ToplevelNode},
utils::{
bitflags::BitflagsExt, buf::Buf, cell_ext::CellExt, clonecell::CloneCell,
copyhashmap::CopyHashMap, errorfmt::ErrorFmt, hash_map_ext::HashMapExt,
linkedlist::LinkedList, numcell::NumCell, oserror::OsError, rc_eq::rc_eq,
bitflags::BitflagsExt,
buf::Buf,
cell_ext::CellExt,
clonecell::CloneCell,
copyhashmap::CopyHashMap,
errorfmt::ErrorFmt,
hash_map_ext::HashMapExt,
linkedlist::LinkedList,
numcell::NumCell,
oserror::OsError,
pipe::{Pipe, pipe},
rc_eq::rc_eq,
},
wire::WlSurfaceId,
wire_xcon::{
@ -1687,10 +1696,13 @@ impl Wm {
log::error!("Peer requested unavailable target {}", mt);
break 'convert;
}
let (rx, tx) = match uapi::pipe2(c::O_CLOEXEC) {
let Pipe {
read: rx,
write: tx,
} = match pipe() {
Ok(p) => p,
Err(e) => {
log::error!("Could not create pipe: {}", OsError::from(e));
log::error!("Could not create pipe: {}", e);
break 'convert;
}
};