1
0
Fork 0
forked from wry/wry

autocommit 2022-03-22 23:24:17 CET

This commit is contained in:
Julian Orth 2022-03-22 23:24:17 +01:00
parent 18806a38fb
commit 2ff60ff817
36 changed files with 4934 additions and 237 deletions

View file

@ -1,5 +1,6 @@
use crate::dbus::auth::handle_auth;
use crate::dbus::{DbusError, DbusHolder, DbusSocket};
use crate::utils::bufio::BufIo;
use crate::{org, AsyncEngine, ErrorFmt, NumCell, RunToplevel};
use std::cell::Cell;
use std::rc::Rc;
@ -46,14 +47,14 @@ fn connect(
if let Err(e) = uapi::connect(socket.raw(), &sadr) {
return Err(DbusError::Connect(e.into()));
}
let fd = eng.fd(&Rc::new(socket))?;
let socket = Rc::new(DbusSocket {
bus_name: name,
fd: eng.fd(&Rc::new(socket))?,
fd: fd.clone(),
bufio: Rc::new(BufIo::new(fd)),
eng: eng.clone(),
next_serial: NumCell::new(1),
bufs: Default::default(),
unique_name: Default::default(),
outgoing: Default::default(),
reply_handlers: Default::default(),
incoming: Default::default(),
outgoing_: Default::default(),

View file

@ -5,35 +5,24 @@ use super::{
use crate::dbus::{
CallError, DbusError, DbusSocket, Headers, Parser, MSG_ERROR, MSG_METHOD_RETURN, MSG_SIGNAL,
};
use crate::utils::bufio::BufIoIncoming;
use crate::utils::ptr_ext::{MutPtrExt, PtrExt};
use crate::ErrorFmt;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::rc::Rc;
use uapi::{c, Errno, MaybeUninitSliceExt, MsghdrMut, OwnedFd};
pub async fn handle_incoming(socket: Rc<DbusSocket>) {
let mut incoming = Incoming {
incoming: socket.bufio.incoming(),
socket,
buf: Box::new([MaybeUninit::uninit(); 4096]),
buf_start: 0,
buf_end: 0,
fds: Default::default(),
cmsg: Box::new([MaybeUninit::uninit(); 256]),
};
incoming.run().await;
}
pub struct Incoming {
socket: Rc<DbusSocket>,
buf: Box<[MaybeUninit<u8>; 4096]>,
buf_start: usize,
buf_end: usize,
fds: VecDeque<Rc<OwnedFd>>,
cmsg: Box<[MaybeUninit<u8>; 256]>,
incoming: BufIoIncoming,
}
impl Incoming {
@ -55,11 +44,12 @@ impl Incoming {
}
async fn handle_msg(&mut self) -> Result<(), DbusError> {
let msg_buf_data = UnsafeCell::new(self.socket.bufs.pop().unwrap_or_default());
let msg_buf_data = UnsafeCell::new(self.socket.bufio.buf());
let msg_buf = unsafe { msg_buf_data.get().deref_mut() };
msg_buf.clear();
const FIXED_HEADER_SIZE: usize = 16;
self.fill_msg_buf(FIXED_HEADER_SIZE, msg_buf).await?;
self.incoming
.fill_msg_buf(FIXED_HEADER_SIZE, msg_buf)
.await?;
let endianess = msg_buf[0];
if (endianess == b'l') != cfg!(target_endian = "little") {
return Err(DbusError::InvalidEndianess);
@ -75,16 +65,18 @@ impl Incoming {
let [body_len, _serial, headers_len] = fields2;
let dyn_header_len = headers_len + (headers_len.wrapping_neg() & 7);
let remaining = dyn_header_len + body_len;
self.fill_msg_buf(remaining as usize, msg_buf).await?;
self.incoming
.fill_msg_buf(remaining as usize, msg_buf)
.await?;
drop(msg_buf);
let msg_buf = unsafe { msg_buf_data.get().deref().deref() };
let headers = &msg_buf[FIXED_HEADER_SIZE..FIXED_HEADER_SIZE + headers_len as usize];
let headers = self.parse_headers(headers)?;
let unix_fds = headers.unix_fds.unwrap_or(0) as usize;
if self.fds.len() < unix_fds {
if self.incoming.fds.len() < unix_fds {
return Err(DbusError::TooFewFds);
}
let fds: Vec<_> = self.fds.drain(..unix_fds).collect();
let fds: Vec<_> = self.incoming.fds.drain(..unix_fds).collect();
let mut parser = Parser {
buf: &msg_buf,
pos: FIXED_HEADER_SIZE + dyn_header_len as usize,
@ -172,7 +164,7 @@ impl Incoming {
}
let msg_buf = msg_buf_data.into_inner();
if msg_buf.capacity() > 0 {
self.socket.bufs.push(msg_buf);
self.socket.bufio.add_buf(msg_buf);
}
Ok(())
}
@ -199,54 +191,4 @@ impl Incoming {
}
Ok(headers)
}
async fn fill_msg_buf(&mut self, mut n: usize, buf: &mut Vec<u8>) -> Result<(), DbusError> {
while n > 0 {
if self.buf_start == self.buf_end {
while let Err(e) = self.recvmsg() {
if e.0 != c::EAGAIN {
return Err(DbusError::ReadError(e.into()));
}
self.socket.fd.readable().await?;
}
if self.buf_start == self.buf_end {
return Err(DbusError::Closed);
}
}
let read = n.min(self.buf_end - self.buf_start);
let buf_start = self.buf_start % self.buf.len();
unsafe {
buf.extend_from_slice(
self.buf[buf_start..buf_start + read].slice_assume_init_ref(),
);
}
n -= read;
self.buf_start += read;
}
Ok(())
}
fn recvmsg(&mut self) -> Result<(), Errno> {
self.buf_start = 0;
self.buf_end = 0;
let mut iov = [&mut self.buf[..]];
let mut hdr = MsghdrMut {
iov: &mut iov[..],
control: Some(&mut self.cmsg[..]),
name: uapi::sockaddr_none_mut(),
flags: 0,
};
let (ivec, _, mut cmsg) =
uapi::recvmsg(self.socket.fd.raw(), &mut hdr, c::MSG_CMSG_CLOEXEC)?;
self.buf_end += ivec.len();
while cmsg.len() > 0 {
let (_, hdr, body) = uapi::cmsg_read(&mut cmsg)?;
if hdr.cmsg_level == c::SOL_SOCKET && hdr.cmsg_type == c::SCM_RIGHTS {
for fd in uapi::pod_iter(body)? {
self.fds.push_back(Rc::new(OwnedFd::new(fd)));
}
}
}
Ok(())
}
}

View file

@ -1,118 +1,10 @@
use crate::dbus::{DbusMessage, DbusSocket};
use crate::utils::vec_ext::{UninitVecExt, VecExt};
use crate::utils::vecstorage::VecStorage;
use crate::dbus::DbusSocket;
use crate::ErrorFmt;
use std::collections::VecDeque;
use std::mem;
use std::mem::MaybeUninit;
use std::ptr::NonNull;
use std::rc::Rc;
use uapi::{c, Errno, Msghdr};
pub async fn handle_outgoing(socket: Rc<DbusSocket>) {
let mut outgoing = Outgoing {
socket,
msgs: Default::default(),
cmsg: vec![],
fds: vec![],
iovecs: Default::default(),
};
outgoing.run().await
}
struct DbusMessageOffset {
msg: DbusMessage,
offset: usize,
}
struct Outgoing {
socket: Rc<DbusSocket>,
msgs: VecDeque<DbusMessageOffset>,
cmsg: Vec<MaybeUninit<u8>>,
fds: Vec<c::c_int>,
iovecs: VecStorage<NonNull<[u8]>>,
}
impl Outgoing {
async fn run(&mut self) {
loop {
self.socket.outgoing.non_empty().await;
while let Err(e) = self.try_flush() {
if e != Errno(c::EAGAIN) {
log::error!(
"{}: Could not send a message to the bus: {}",
self.socket.bus_name,
ErrorFmt(e)
);
self.socket.kill();
return;
}
if let Err(e) = self.socket.fd.writable().await {
log::error!(
"{}: Cannot wait for fd to become readable: {}",
self.socket.bus_name,
ErrorFmt(e)
);
self.socket.kill();
return;
}
}
}
}
fn try_flush(&mut self) -> Result<(), Errno> {
loop {
while let Some(msg) = self.socket.outgoing.try_pop() {
self.msgs.push_back(DbusMessageOffset { msg, offset: 0 });
}
if self.msgs.is_empty() {
return Ok(());
}
let mut iovecs = self.iovecs.take_as();
let mut fds = &[][..];
for msg in &mut self.msgs {
if msg.msg.fds.len() > 0 {
if fds.len() > 0 || iovecs.len() > 0 {
break;
}
fds = &msg.msg.fds;
}
iovecs.push(&msg.msg.buf[msg.offset..]);
}
self.cmsg.clear();
if fds.len() > 0 {
self.fds.clear();
self.fds.extend(fds.iter().map(|f| f.raw()));
let cmsg_space = uapi::cmsg_space(fds.len() * mem::size_of::<c::c_int>());
self.cmsg.reserve(cmsg_space);
let (_, mut spare) = self.cmsg.split_at_spare_mut_bytes_ext();
let hdr = c::cmsghdr {
cmsg_len: 0,
cmsg_level: c::SOL_SOCKET,
cmsg_type: c::SCM_RIGHTS,
};
let len = uapi::cmsg_write(&mut spare, hdr, &self.fds).unwrap();
self.cmsg.set_len_safe(len);
}
let msg = Msghdr {
iov: &iovecs[..],
control: Some(&self.cmsg[..]),
name: uapi::sockaddr_none_ref(),
};
let mut n = uapi::sendmsg(self.socket.fd.raw(), &msg, c::MSG_DONTWAIT)?;
drop(iovecs);
self.msgs[0].msg.fds.clear();
while n > 0 {
let len = self.msgs[0].msg.buf.len() - self.msgs[0].offset;
if n < len {
self.msgs[0].offset += n;
break;
}
n -= len;
let msg = self.msgs.pop_front().unwrap();
self.socket.bufs.push(msg.msg.buf);
}
}
if let Err(e) = socket.bufio.clone().outgoing().await {
log::error!("{}: {}", socket.bus_name, ErrorFmt(e));
}
socket.kill();
}

View file

@ -1,12 +1,13 @@
use crate::dbus::property::Get;
use crate::dbus::types::{ObjectPath, Signature, Variant};
use crate::dbus::{
AsyncProperty, AsyncReply, AsyncReplySlot, DbusError, DbusMessage, DbusSocket, DbusType,
Formatter, Headers, InterfaceSignalHandlers, Message, MethodCall, Parser, Property, Reply,
ReplyHandler, Signal, SignalHandler, SignalHandlerApi, SignalHandlerData, BUS_DEST, BUS_PATH,
HDR_DESTINATION, HDR_INTERFACE, HDR_MEMBER, HDR_PATH, HDR_SIGNATURE, HDR_UNIX_FDS,
MSG_METHOD_CALL, NO_REPLY_EXPECTED,
AsyncProperty, AsyncReply, AsyncReplySlot, DbusError, DbusSocket, DbusType, Formatter, Headers,
InterfaceSignalHandlers, Message, MethodCall, Parser, Property, Reply, ReplyHandler, Signal,
SignalHandler, SignalHandlerApi, SignalHandlerData, BUS_DEST, BUS_PATH, HDR_DESTINATION,
HDR_INTERFACE, HDR_MEMBER, HDR_PATH, HDR_SIGNATURE, HDR_UNIX_FDS, MSG_METHOD_CALL,
NO_REPLY_EXPECTED,
};
use crate::utils::bufio::BufIoMessage;
use crate::{org, ErrorFmt};
use std::cell::Cell;
use std::collections::hash_map::Entry;
@ -243,7 +244,7 @@ impl DbusSocket {
msg: &T,
) -> u32 {
let (msg, serial) = self.format_call(path, destination, flags, msg);
self.outgoing.push(msg);
self.bufio.send(msg);
serial
}
@ -253,12 +254,11 @@ impl DbusSocket {
destination: &str,
flags: u8,
msg: &T,
) -> (DbusMessage, u32) {
) -> (BufIoMessage, u32) {
let num_fds = msg.num_fds();
let mut fds = Vec::with_capacity(num_fds as _);
let serial = self.serial();
let mut buf = self.bufs.pop().unwrap_or_default();
buf.clear();
let mut buf = self.bufio.buf();
let mut fmt = Formatter::new(&mut fds, &mut buf);
self.format_header(
&mut fmt,
@ -276,7 +276,7 @@ impl DbusSocket {
msg.marshal(&mut fmt);
let body_len = (buf.len() - body_start) as u32;
buf[4..8].copy_from_slice(uapi::as_bytes(&body_len));
(DbusMessage { fds, buf }, serial)
(BufIoMessage { fds, buf }, serial)
}
fn format_header(
@ -345,13 +345,22 @@ where
) -> Result<(), DbusError> {
let msg = <T::Generic<'a> as Message>::unmarshal(parser)?;
(self.0)(Ok(&msg));
socket.bufs.push(buf);
socket.bufio.add_buf(buf);
Ok(())
}
}
struct AsyncReplyHandler<T: Message<'static>>(Rc<AsyncReplySlot<T>>);
impl<T: Message<'static>> AsyncReplyHandler<T> {
fn complete(self: Box<Self>, res: Result<Reply<T>, DbusError>) {
self.0.data.set(Some(res));
if let Some(waker) = self.0.waker.take() {
waker.wake();
}
}
}
unsafe impl<T> ReplyHandler for AsyncReplyHandler<T>
where
T: Message<'static>,
@ -376,16 +385,21 @@ where
) -> Result<(), DbusError> {
let msg = <T::Generic<'static> as Message<'static>>::unmarshal(unsafe {
mem::transmute::<&mut Parser<'a>, &mut Parser<'static>>(parser)
})?;
});
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
let e = Rc::new(e);
self.complete(Err(DbusError::DbusError(e.clone())));
return Err(DbusError::DbusError(e.clone()));
}
};
let reply = Reply {
socket: socket.clone(),
buf,
t: msg,
};
self.0.data.set(Some(Ok(reply)));
if let Some(waker) = self.0.waker.take() {
waker.wake();
}
self.complete(Ok(reply));
Ok(())
}
}