1
0
Fork 0
forked from wry/wry
wry/src/dbus.rs

739 lines
20 KiB
Rust

pub use jay_dbus_core::*;
use {
crate::{
async_engine::{AsyncEngine, SpawnedFuture},
io_uring::IoUring,
utils::{
bufio::BufIo,
clonecell::CloneCell,
copyhashmap::CopyHashMap,
numcell::NumCell,
run_toplevel::RunToplevel,
stack::Stack,
vecstorage::VecStorage,
xrd::{XRD, xrd},
},
},
ahash::AHashMap,
std::{
borrow::{Borrow, Cow},
cell::{Cell, RefCell},
fmt::Debug,
future::Future,
marker::PhantomData,
mem,
ops::Deref,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
},
uapi::OwnedFd,
};
mod auth;
mod holder;
mod incoming;
mod outgoing;
mod socket;
const DBUS_SESSION_BUS_ADDRESS: &str = "DBUS_SESSION_BUS_ADDRESS";
pub struct Dbus {
eng: Rc<AsyncEngine>,
ring: Rc<IoUring>,
system: Rc<DbusHolder>,
session: Rc<DbusHolder>,
user_path: Option<String>,
}
impl Dbus {
pub fn new(eng: &Rc<AsyncEngine>, ring: &Rc<IoUring>, run_toplevel: &Rc<RunToplevel>) -> Self {
// https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
fn unescape_value(escaped: &str) -> Option<String> {
let mut unescaped = Vec::new();
let mut bytes = escaped.bytes();
while let Some(c) = bytes.next() {
match c {
b'-'
| b'0'..=b'9'
| b'A'..=b'Z'
| b'a'..=b'z'
| b'_'
| b'/'
| b'.'
| b'\\'
| b'*' => {
unescaped.push(c);
}
b'%' => {
let hi = (bytes.next()? as char).to_digit(16)?;
let lo = (bytes.next()? as char).to_digit(16)?;
unescaped.push((hi << 4 | lo) as u8);
}
_ => return None,
}
}
String::from_utf8(unescaped).ok()
}
let user_path = 'path: {
let Some(addr) = std::env::var(DBUS_SESSION_BUS_ADDRESS).ok() else {
if let Some(xrd) = xrd() {
break 'path Some(format!("{xrd}/bus"));
}
log::warn!("Neither {DBUS_SESSION_BUS_ADDRESS} nor {XRD} is set");
break 'path None;
};
let (first_addr, _) = addr.split_once(';').unwrap_or((&addr, ""));
let Some((transport, attrs)) = first_addr.split_once(':') else {
log::warn!("{DBUS_SESSION_BUS_ADDRESS} is invalid");
break 'path None;
};
if transport != "unix" {
log::warn!("{DBUS_SESSION_BUS_ADDRESS} has unsupported transport {transport}");
break 'path None;
}
for attr in attrs.split(',') {
let Some((k, v)) = attr.split_once("=") else {
log::warn!("{DBUS_SESSION_BUS_ADDRESS} is invalid");
break 'path None;
};
if k != "path" {
continue;
}
let Some(path) = unescape_value(v) else {
log::warn!("{DBUS_SESSION_BUS_ADDRESS} is invalid");
break 'path None;
};
break 'path Some(path);
}
log::warn!("{DBUS_SESSION_BUS_ADDRESS} is invalid");
None
};
log::info!("dbus path = {:?}", user_path);
Self {
eng: eng.clone(),
ring: ring.clone(),
system: Rc::new(DbusHolder::new(run_toplevel)),
session: Rc::new(DbusHolder::new(run_toplevel)),
user_path,
}
}
pub fn clear(&self) {
self.system.clear();
self.session.clear();
}
pub async fn system(&self) -> Result<Rc<DbusSocket>, DbusError> {
self.system
.get(
&self.eng,
&self.ring,
"/var/run/dbus/system_bus_socket",
"System bus",
)
.await
}
pub async fn session(&self) -> Result<Rc<DbusSocket>, DbusError> {
let sba = match self.user_path.as_deref() {
None => return Err(DbusError::SessionBusAddressNotSet),
Some(sba) => sba,
};
self.session
.get(&self.eng, &self.ring, sba, "Session bus")
.await
}
}
unsafe trait ReplyHandler {
fn signature(&self) -> &str;
fn handle_error(self: Box<Self>, socket: &Rc<DbusSocket>, error: DbusError);
fn handle(
self: Box<Self>,
socket: &Rc<DbusSocket>,
headers: &Headers,
parser: &mut Parser,
buf: Vec<u8>,
) -> Result<(), DbusError>;
}
pub struct DbusSocket {
bus_name: &'static str,
fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
in_bufs: Stack<Vec<u8>>,
bufio: Rc<BufIo>,
eng: Rc<AsyncEngine>,
next_serial: NumCell<u32>,
unique_name: CloneCell<Rc<String>>,
reply_handlers: CopyHashMap<u32, Box<dyn ReplyHandler>>,
incoming: Cell<Option<SpawnedFuture<()>>>,
outgoing_: Cell<Option<SpawnedFuture<()>>>,
auth: Cell<Option<SpawnedFuture<()>>>,
dead: Cell<bool>,
headers: RefCell<VecStorage<(u8, Variant<'static>)>>,
run_toplevel: Rc<RunToplevel>,
signal_handlers: RefCell<AHashMap<(&'static str, &'static str), InterfaceSignalHandlers>>,
objects: CopyHashMap<Cow<'static, str>, Rc<DbusObjectData>>,
}
#[derive(Hash, Eq, PartialEq)]
struct MemberHandlerOwnedKey {
key: MemberHandlerKey<'static>,
}
#[derive(Hash, Eq, PartialEq)]
struct MemberHandlerKey<'a> {
interface: &'a str,
member: &'a str,
}
impl<'a> Borrow<MemberHandlerKey<'a>> for MemberHandlerOwnedKey {
fn borrow(&self) -> &MemberHandlerKey<'a> {
&self.key
}
}
const HDR_PATH: u8 = 1;
const HDR_INTERFACE: u8 = 2;
const HDR_MEMBER: u8 = 3;
const HDR_ERROR_NAME: u8 = 4;
const HDR_REPLY_SERIAL: u8 = 5;
const HDR_DESTINATION: u8 = 6;
const HDR_SENDER: u8 = 7;
const HDR_SIGNATURE: u8 = 8;
const HDR_UNIX_FDS: u8 = 9;
const MSG_METHOD_CALL: u8 = 1;
const MSG_METHOD_RETURN: u8 = 2;
const MSG_ERROR: u8 = 3;
const MSG_SIGNAL: u8 = 4;
const NO_REPLY_EXPECTED: u8 = 0x1;
#[expect(dead_code)]
const NO_AUTO_START: u8 = 0x2;
#[expect(dead_code)]
const ALLOW_INTERACTIVE_AUTHORIZATION: u8 = 0x4;
#[expect(dead_code)]
pub const DBUS_NAME_FLAG_ALLOW_REPLACEMENT: u32 = 0x1;
#[expect(dead_code)]
pub const DBUS_NAME_FLAG_REPLACE_EXISTING: u32 = 0x2;
pub const DBUS_NAME_FLAG_DO_NOT_QUEUE: u32 = 0x4;
pub const DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER: u32 = 1;
#[expect(dead_code)]
pub const DBUS_REQUEST_NAME_REPLY_IN_QUEUE: u32 = 2;
#[expect(dead_code)]
pub const DBUS_REQUEST_NAME_REPLY_EXISTS: u32 = 3;
#[expect(dead_code)]
pub const DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER: u32 = 4;
pub const BUS_DEST: &str = "org.freedesktop.DBus";
pub const BUS_PATH: &str = "/org/freedesktop/DBus";
#[derive(Default, Debug)]
struct Headers<'a> {
path: Option<ObjectPath<'a>>,
interface: Option<Cow<'a, str>>,
member: Option<Cow<'a, str>>,
error_name: Option<Cow<'a, str>>,
reply_serial: Option<u32>,
destination: Option<Cow<'a, str>>,
sender: Option<Cow<'a, str>>,
signature: Option<Signature<'a>>,
unix_fds: Option<u32>,
}
struct DbusHolder {
socket: CloneCell<Option<Rc<DbusSocket>>>,
run_toplevel: Rc<RunToplevel>,
}
impl DbusHolder {
pub fn new(run_toplevel: &Rc<RunToplevel>) -> Self {
Self {
socket: Default::default(),
run_toplevel: run_toplevel.clone(),
}
}
pub fn clear(&self) {
if let Some(socket) = self.socket.take() {
socket.clear();
}
}
}
impl Drop for DbusHolder {
fn drop(&mut self) {
if let Some(socket) = self.socket.take() {
socket.auth.take();
socket.outgoing_.take();
socket.incoming.take();
}
}
}
pub struct Reply<T: Message<'static>> {
socket: Rc<DbusSocket>,
buf: Vec<u8>,
t: T::Generic<'static>,
}
pub struct PropertyValue<T: Property> {
reply: Reply<GetReply<'static, T::Type>>,
}
impl<T: Property> Debug for PropertyValue<T>
where
for<'a> <T::Type as DbusType<'static>>::Generic<'a>: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.get().fmt(f)
}
}
impl<T: Property> PropertyValue<T> {
pub fn get<'a>(&'a self) -> &'a <T::Type as DbusType<'static>>::Generic<'a> {
&self.reply.get().value
}
}
impl<T: Message<'static>> Debug for Reply<T>
where
for<'a> T::Generic<'a>: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.get().fmt(f)
}
}
impl<T: Message<'static>> Reply<T> {
pub fn get<'a>(&'a self) -> &'a T::Generic<'a> {
unsafe { mem::transmute(&self.t) }
}
}
impl<T: Message<'static>> Drop for Reply<T> {
fn drop(&mut self) {
self.socket.in_bufs.push(mem::take(&mut self.buf));
}
}
struct AsyncReplySlot<T: Message<'static>> {
data: Cell<Option<Result<Reply<T>, DbusError>>>,
waker: Cell<Option<Waker>>,
}
pub struct AsyncReply<T: Message<'static>> {
socket: Rc<DbusSocket>,
serial: u32,
slot: Rc<AsyncReplySlot<T>>,
}
#[pin_project::pin_project]
pub struct AsyncProperty<T: Property> {
#[pin]
reply: AsyncReply<GetReply<'static, T::Type>>,
}
impl<T: Message<'static>> Drop for AsyncReply<T> {
fn drop(&mut self) {
self.socket.reply_handlers.remove(&self.serial);
}
}
impl<T: Message<'static>> Future for AsyncReply<T> {
type Output = Result<Reply<T>, DbusError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(d) = self.slot.data.take() {
Poll::Ready(d)
} else {
self.slot.waker.set(Some(cx.waker().clone()));
Poll::Pending
}
}
}
impl<T: Property> Future for AsyncProperty<T> {
type Output = Result<PropertyValue<T>, DbusError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
AsyncProperty::project(self)
.reply
.poll(cx)
.map(|r| r.map(|v| PropertyValue { reply: v }))
}
}
struct SignalHandlerData<T, F> {
path: Option<String>,
rule: String,
handler: F,
_phantom: PhantomData<T>,
}
trait SignalHandlerApi {
fn interface(&self) -> &'static str;
fn member(&self) -> &'static str;
fn signature(&self) -> &'static str;
fn path(&self) -> Option<&str>;
fn rule(&self) -> &str;
fn handle(&self, parser: &mut Parser) -> Result<(), DbusError>;
}
impl<T, F> SignalHandlerApi for SignalHandlerData<T, F>
where
T: Signal<'static>,
F: for<'a> Fn(T::Generic<'a>),
{
fn interface(&self) -> &'static str {
T::INTERFACE
}
fn member(&self) -> &'static str {
T::MEMBER
}
fn signature(&self) -> &'static str {
T::SIGNATURE
}
fn path(&self) -> Option<&str> {
self.path.as_deref()
}
fn rule(&self) -> &str {
&self.rule
}
fn handle<'a>(&self, parser: &mut Parser<'a>) -> Result<(), DbusError> {
(self.handler)(T::Generic::<'a>::unmarshal(parser)?);
Ok(())
}
}
#[must_use]
pub struct SignalHandler {
socket: Rc<DbusSocket>,
data: Rc<dyn SignalHandlerApi>,
}
impl Drop for SignalHandler {
fn drop(&mut self) {
self.socket.remove_signal_handler(&*self.data);
}
}
struct InterfaceSignalHandlers {
unconditional: Option<Rc<dyn SignalHandlerApi>>,
conditional: AHashMap<String, Rc<dyn SignalHandlerApi>>,
}
struct DbusObjectData {
path: Cow<'static, str>,
methods: CopyHashMap<MemberHandlerOwnedKey, Rc<dyn MethodHandlerApi>>,
properties: CopyHashMap<MemberHandlerOwnedKey, Rc<dyn PropertyHandlerApi>>,
}
pub struct DbusObject {
socket: Rc<DbusSocket>,
data: Rc<DbusObjectData>,
}
impl Drop for DbusObject {
fn drop(&mut self) {
self.socket.objects.remove(&self.data.path);
}
}
impl DbusObject {
pub fn add_method<T, F>(&self, handler: F)
where
T: MethodCall<'static>,
F: for<'a> Fn(T::Generic<'a>, PendingReply<T::Reply>) + 'static,
{
let rhd = Rc::new(MethodHandlerData {
handler,
_phantom: Default::default(),
});
let key = MemberHandlerOwnedKey {
key: MemberHandlerKey {
interface: T::INTERFACE,
member: T::MEMBER,
},
};
self.data.methods.set(key, rhd);
}
pub fn set_property<T>(&self, value: Variant<'static>)
where
T: Property + 'static,
{
self.emit_signal(
&crate::wire_dbus::org::freedesktop::dbus::properties::PropertiesChanged {
interface_name: T::INTERFACE.into(),
changed_properties: Cow::Borrowed(&[DictEntry {
key: T::PROPERTY.into(),
value: borrow_variant(&value),
}]),
invalidated_properties: Default::default(),
},
);
let phd = Rc::new(PropertyHandlerData::<T> {
data: value,
_phantom: Default::default(),
});
let key = MemberHandlerOwnedKey {
key: MemberHandlerKey {
interface: T::INTERFACE,
member: T::PROPERTY,
},
};
self.data.properties.set(key, phd);
}
pub fn emit_signal<'a, T: Signal<'a>>(&self, signal: &T) {
self.socket.emit_signal(&self.data.path, signal);
}
pub fn path(&self) -> &str {
&self.data.path
}
}
trait PropertyHandlerApi {
fn interface(&self) -> &'static str;
fn member(&self) -> &'static str;
fn value<'a>(&'a self) -> Variant<'a>;
}
struct PropertyHandlerData<T> {
data: Variant<'static>,
_phantom: PhantomData<T>,
}
impl<T> PropertyHandlerApi for PropertyHandlerData<T>
where
T: Property,
{
fn interface(&self) -> &'static str {
T::INTERFACE
}
fn member(&self) -> &'static str {
T::PROPERTY
}
fn value<'a>(&'a self) -> Variant<'a> {
borrow_variant(&self.data)
}
}
fn borrow_variant<'a>(value: &'a Variant<'static>) -> Variant<'a> {
match value {
Variant::U8(v) => Variant::U8(*v),
Variant::Bool(v) => Variant::Bool(*v),
Variant::I16(v) => Variant::I16(*v),
Variant::U16(v) => Variant::U16(*v),
Variant::I32(v) => Variant::I32(*v),
Variant::U32(v) => Variant::U32(*v),
Variant::I64(v) => Variant::I64(*v),
Variant::U64(v) => Variant::U64(*v),
Variant::F64(v) => Variant::F64(*v),
Variant::String(v) => Variant::String(Cow::Borrowed(v.as_ref())),
Variant::ObjectPath(v) => Variant::ObjectPath(ObjectPath(Cow::Borrowed(v.0.as_ref()))),
Variant::Signature(v) => Variant::Signature(Signature(Cow::Borrowed(v.0.as_ref()))),
Variant::Variant(v) => Variant::Variant(Box::new(borrow_variant(v))),
Variant::Fd(v) => Variant::Fd(v.clone()),
Variant::Array(ty, values) => {
Variant::Array(ty.clone(), values.iter().map(borrow_variant).collect())
}
Variant::DictEntry(k, v) => {
Variant::DictEntry(Box::new(borrow_variant(k)), Box::new(borrow_variant(v)))
}
Variant::Struct(values) => Variant::Struct(values.iter().map(borrow_variant).collect()),
}
}
pub struct PendingReply<T> {
reply_expected: bool,
socket: Rc<DbusSocket>,
destination: String,
serial: u32,
_phantom: PhantomData<T>,
}
impl<T> PendingReply<T> {
#[expect(dead_code)]
pub fn reply_expected(&self) -> bool {
self.reply_expected
}
pub fn err(&self, msg: &str) {
if self.reply_expected {
self.socket.send_error(&self.destination, self.serial, msg);
}
}
}
impl<T> PendingReply<T>
where
T: Message<'static>,
{
pub fn ok<'a>(&self, msg: &T::Generic<'a>) {
if self.reply_expected {
self.socket.send_reply(&self.destination, self.serial, msg);
}
}
#[expect(dead_code)]
pub fn complete<'a>(&self, res: Result<&T::Generic<'a>, &str>) {
match res {
Ok(m) => self.ok(m),
Err(e) => self.err(e),
}
}
}
trait MethodHandlerApi {
fn signature(&self) -> &'static str;
fn handle(
&self,
object: &DbusObjectData,
socket: &Rc<DbusSocket>,
dest: &str,
serial: u32,
reply_expected: bool,
parser: &mut Parser,
) -> Result<(), DbusError>;
}
struct MethodHandlerData<T, F> {
handler: F,
_phantom: PhantomData<T>,
}
impl<T, F> MethodHandlerApi for MethodHandlerData<T, F>
where
T: MethodCall<'static>,
F: for<'a> Fn(T::Generic<'a>, PendingReply<T::Reply>) + 'static,
{
fn signature(&self) -> &'static str {
T::SIGNATURE
}
fn handle<'a>(
&self,
_object: &DbusObjectData,
socket: &Rc<DbusSocket>,
dest: &str,
serial: u32,
reply_expected: bool,
parser: &mut Parser<'a>,
) -> Result<(), DbusError> {
let msg = T::Generic::<'a>::unmarshal(parser)?;
let pr = PendingReply {
reply_expected,
socket: socket.clone(),
destination: dest.to_string(),
serial,
_phantom: Default::default(),
};
(self.handler)(msg, pr);
Ok(())
}
}
struct PropertyGetHandlerProxy;
impl MethodHandlerApi for PropertyGetHandlerProxy {
fn signature(&self) -> &'static str {
crate::wire_dbus::org::freedesktop::dbus::properties::Get::SIGNATURE
}
fn handle<'a>(
&self,
object: &DbusObjectData,
socket: &Rc<DbusSocket>,
dest: &str,
serial: u32,
reply_expected: bool,
parser: &mut Parser<'a>,
) -> Result<(), DbusError> {
if !reply_expected {
return Ok(());
}
let msg = crate::wire_dbus::org::freedesktop::dbus::properties::Get::unmarshal(parser)?;
let key = MemberHandlerKey {
interface: msg.interface_name.deref(),
member: msg.property_name.deref(),
};
match object.properties.get(&key) {
Some(h) => socket.send_reply(
dest,
serial,
&crate::wire_dbus::org::freedesktop::dbus::properties::GetReply {
value: h.value(),
},
),
_ => socket.send_error(dest, serial, "Property does not exist"),
};
Ok(())
}
}
struct PropertyGetAllHandlerProxy;
impl MethodHandlerApi for PropertyGetAllHandlerProxy {
fn signature(&self) -> &'static str {
crate::wire_dbus::org::freedesktop::dbus::properties::GetAll::SIGNATURE
}
fn handle<'a>(
&self,
object: &DbusObjectData,
socket: &Rc<DbusSocket>,
dest: &str,
serial: u32,
reply_expected: bool,
parser: &mut Parser<'a>,
) -> Result<(), DbusError> {
if !reply_expected {
return Ok(());
}
let msg = crate::wire_dbus::org::freedesktop::dbus::properties::GetAll::unmarshal(parser)?;
let all_props = object.properties.lock();
let mut props = vec![];
for property in all_props.values() {
if property.interface() == msg.interface_name {
props.push(DictEntry {
key: property.member().into(),
value: property.value(),
});
}
}
socket.send_reply(
dest,
serial,
&crate::wire_dbus::org::freedesktop::dbus::properties::GetAllReply {
props: props.into(),
},
);
Ok(())
}
}
pub mod prelude {
pub use {
super::{
DbusError, DbusType, Formatter, Message, MethodCall, Parser, Property, Signal,
types::{Bool, DictEntry, ObjectPath, Variant},
},
std::{borrow::Cow, rc::Rc},
uapi::OwnedFd,
};
}