pub mod usr_ifs; pub mod usr_object; use { crate::{ async_engine::{AsyncEngine, SpawnedFuture}, client::{EventFormatter, MIN_SERVER_ID, RequestParser}, io_uring::{IoUring, IoUringError}, object::{Interface, ObjectId, Version, WL_DISPLAY_ID}, utils::{ asyncevent::AsyncEvent, bitfield::Bitfield, buffd::{ BufFdError, BufFdIn, BufFdOut, MsgFormatter, MsgParser, MsgParserError, OutBuffer, OutBufferSwapchain, }, clonecell::CloneCell, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, hash_map_ext::HashMapExt, oserror::OsError, vec_ext::VecExt, }, video::dmabuf::DmaBufIds, wheel::Wheel, wire::wl_display, wl_usr::{ usr_ifs::{ usr_wl_callback::UsrWlCallback, usr_wl_display::UsrWlDisplay, usr_wl_registry::UsrWlRegistry, }, usr_object::UsrObject, }, }, std::{ cell::{Cell, RefCell}, collections::VecDeque, error::Error, mem, rc::Rc, }, thiserror::Error, uapi::{OwnedFd, c}, }; #[derive(Debug, Error)] pub enum UsrConError { #[error("Could not create a socket")] CreateSocket(#[source] OsError), #[error("The socket path is too long")] SocketPathTooLong, #[error("Could not connect to the compositor")] Connect(#[source] IoUringError), #[error("The message length is smaller than 8 bytes")] MsgLenTooSmall, #[error("The size of the message is not a multiple of 4")] UnalignedMessage, #[error(transparent)] BufFdError(#[from] BufFdError), #[error("Could not read from the compositor")] Read(#[source] BufFdError), #[error("Could not write to the compositor")] Write(#[source] BufFdError), #[error("Server sent an event for object {0} that does not exist")] MissingObject(ObjectId), #[error("Could not process a `{}#{}.{}` event", .interface.name(), .id, .method)] MethodError { interface: Interface, id: ObjectId, method: &'static str, #[source] error: Box, }, #[error("Client tried to invoke a non-existent method")] InvalidMethod, } pub struct UsrCon { pub ring: Rc, pub _wheel: Rc, pub eng: Rc, pub server_id: u32, obj_ids: RefCell, objects: CopyHashMap>>, swapchain: Rc>, flush_request: AsyncEvent, incoming: Cell>>, outgoing: Cell>>, pub owner: CloneCell>>, dead: Cell, dma_buf_ids: Rc, } pub trait UsrConOwner { fn killed(&self); } impl UsrCon { pub async fn new( ring: &Rc, wheel: &Rc, eng: &Rc, dma_buf_ids: &Rc, path: &str, server_id: u32, ) -> Result, UsrConError> { let socket = match uapi::socket(c::AF_UNIX, c::SOCK_STREAM | c::SOCK_CLOEXEC, 0) { Ok(s) => Rc::new(s), Err(e) => return Err(UsrConError::CreateSocket(e.into())), }; let mut addr: c::sockaddr_un = uapi::pod_zeroed(); addr.sun_family = c::AF_UNIX as _; if path.len() >= addr.sun_path.len() { return Err(UsrConError::SocketPathTooLong); } let sun_path = uapi::as_bytes_mut(&mut addr.sun_path[..]); sun_path[..path.len()].copy_from_slice(path.as_bytes()); sun_path[path.len()] = 0; if let Err(e) = ring.connect(&socket, &addr).await { return Err(UsrConError::Connect(e)); } Ok(Self::from_socket( ring, wheel, eng, dma_buf_ids, &socket, server_id, )) } pub fn from_socket( ring: &Rc, wheel: &Rc, eng: &Rc, dma_buf_ids: &Rc, socket: &Rc, server_id: u32, ) -> Rc { let mut obj_ids = Bitfield::default(); obj_ids.take(0); obj_ids.take(1); let slf = Rc::new(Self { ring: ring.clone(), _wheel: wheel.clone(), eng: eng.clone(), server_id, obj_ids: RefCell::new(obj_ids), objects: Default::default(), swapchain: Default::default(), flush_request: Default::default(), incoming: Default::default(), outgoing: Default::default(), owner: Default::default(), dead: Cell::new(false), dma_buf_ids: dma_buf_ids.clone(), }); slf.objects.set( WL_DISPLAY_ID.into(), Some(Rc::new(UsrWlDisplay { id: WL_DISPLAY_ID, con: slf.clone(), version: Version(1), })), ); slf.incoming.set(Some( slf.eng.spawn( "wl_usr incoming", Incoming { con: slf.clone(), buf: BufFdIn::new(socket, &slf.ring), data: vec![], } .run(), ), )); slf.outgoing.set(Some( slf.eng.spawn( "wl_usr outgoing", Outgoing { con: slf.clone(), buf: BufFdOut::new(socket, &slf.ring), buffers: Default::default(), } .run(), ), )); slf } pub fn kill(&self) { self.dead.set(true); for obj in self.objects.lock().drain_values() { if let Some(obj) = obj { obj.break_loops(); } } self.incoming.take(); self.outgoing.take(); if let Some(owner) = self.owner.take() { owner.killed(); } } pub fn release_id(&self, id: u32) { self.obj_ids.borrow_mut().release(id); self.objects.remove(&ObjectId::from_raw(id)); } pub fn remove_obj(&self, obj: &impl UsrObject) { obj.destroy(); obj.break_loops(); if obj.id().raw() >= MIN_SERVER_ID { self.objects.remove(&obj.id()); } else { self.objects.set(obj.id(), None); } } pub fn add_object(&self, obj: Rc) { if !self.dead.get() { self.objects.set(obj.id(), Some(obj)); } } pub fn get_registry(self: &Rc) -> Rc { let registry = Rc::new(UsrWlRegistry { id: self.id(), con: self.clone(), owner: Default::default(), version: Version(1), }); self.request(wl_display::GetRegistry { self_id: WL_DISPLAY_ID, registry: registry.id, }); self.add_object(registry.clone()); registry } pub fn sync(self: &Rc, handler: F) where F: FnOnce() + 'static, { let callback = Rc::new(UsrWlCallback::new(self)); callback.owner.set(Some(Rc::new(Cell::new(Some(handler))))); self.request(wl_display::Sync { self_id: WL_DISPLAY_ID, callback: callback.id, }); self.add_object(callback); } pub fn parse<'a, R: RequestParser<'a>>( &self, obj: &impl UsrObject, mut parser: MsgParser<'_, 'a>, ) -> Result { let res = R::parse(&mut parser)?; parser.eof()?; log::trace!( "Server {} -> {}@{}.{:?}", self.server_id, obj.interface().name(), obj.id(), res ); Ok(res) } pub fn request(self: &Rc, event: T) { if self.dead.get() { return; } if log::log_enabled!(log::Level::Trace) { log::trace!( "Server {} <= {}@{}.{:?}", self.server_id, event.interface().name(), event.id(), event, ); } let mut fds = vec![]; let mut swapchain = self.swapchain.borrow_mut(); let mut fmt = MsgFormatter::new(&mut swapchain.cur, &mut fds); event.format(&mut fmt); fmt.write_len(); if swapchain.cur.is_full() { swapchain.commit(); } self.flush_request.trigger(); } pub fn id>(&self) -> T { let id = self.obj_ids.borrow_mut().acquire(); ObjectId::from_raw(id).into() } } struct Outgoing { con: Rc, buf: BufFdOut, buffers: VecDeque, } impl Outgoing { async fn run(mut self) { loop { self.con.flush_request.triggered().await; if let Err(e) = self.flush().await { log::error!( "Server {}: Could not process an outgoing message: {}", self.con.server_id, ErrorFmt(e) ); self.con.kill(); return; } } } async fn flush(&mut self) -> Result<(), UsrConError> { { let mut swapchain = self.con.swapchain.borrow_mut(); swapchain.commit(); mem::swap(&mut swapchain.pending, &mut self.buffers); } while let Some(mut cur) = self.buffers.pop_front() { if let Err(e) = self.buf.flush_no_timeout(&mut cur).await { return Err(UsrConError::Write(e)); } self.con.swapchain.borrow_mut().free.push(cur); } Ok(()) } } struct Incoming { con: Rc, buf: BufFdIn, data: Vec, } impl Incoming { async fn run(mut self) { loop { if let Err(e) = self.handle_msg().await { log::error!( "Server {}: Could not process an incoming message: {}", self.con.server_id, ErrorFmt(e) ); self.con.kill(); return; } } } async fn handle_msg(&mut self) -> Result<(), UsrConError> { let mut hdr = [0u32, 0]; if let Err(e) = self.buf.read_full(&mut hdr[..]).await { return Err(UsrConError::Read(e)); } let obj_id = ObjectId::from_raw(hdr[0]); let len = (hdr[1] >> 16) as usize; let event = hdr[1] & 0xffff; if len < 8 { return Err(UsrConError::MsgLenTooSmall); } if len % 4 != 0 { return Err(UsrConError::UnalignedMessage); } let len = len / 4 - 2; self.data.clear(); self.data.reserve(len); let unused = self.data.split_at_spare_mut_ext().1; if let Err(e) = self.buf.read_full(&mut unused[..len]).await { return Err(UsrConError::Read(e)); } unsafe { self.data.set_len(len); } if let Some(obj) = self.con.objects.get(&obj_id) { if let Some(obj) = obj { let parser = MsgParser::new(&mut self.buf, &self.data); obj.handle_event(&self.con, event, parser)?; } } else if obj_id.raw() < MIN_SERVER_ID { return Err(UsrConError::MissingObject(obj_id)); } else { // ignore events for server-created objects that were never added to the state } Ok(()) } }