1
0
Fork 0
forked from wry/wry

autocommit 2022-01-25 16:45:44 CET

This commit is contained in:
Julian Orth 2022-01-25 16:45:44 +01:00
parent 0336bf3bde
commit c340df0d08
59 changed files with 3085 additions and 1710 deletions

View file

@ -35,7 +35,6 @@ use crate::ErrorFmt;
use ahash::AHashMap;
use std::cell::{Cell, RefCell, RefMut};
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::mem;
use std::rc::Rc;
use thiserror::Error;
@ -60,8 +59,6 @@ pub enum ClientError {
MessageSizeTooSmall,
#[error("The size of the message is not a multiple of 4")]
UnalignedMessage,
#[error("The outgoing buffer overflowed")]
OutBufferOverflow,
#[error("The requested client {0} does not exist")]
ClientDoesNotExist(ClientId),
#[error("There is no wl_region with id {0}")]
@ -204,6 +201,7 @@ impl Clients {
ClientId(self.next_client_id.fetch_add(1))
}
#[allow(dead_code)]
pub fn get(&self, id: ClientId) -> Result<Rc<Client>, ClientError> {
let clients = self.clients.borrow();
match clients.get(&id) {
@ -239,6 +237,7 @@ impl Clients {
let data = Rc::new(Client {
id,
state: global.clone(),
checking_queue_size: Cell::new(false),
socket: global.eng.fd(&Rc::new(socket))?,
objects: Objects::new(),
events: AsyncQueue::new(),
@ -332,6 +331,7 @@ pub enum WlEvent {
pub struct Client {
pub id: ClientId,
pub state: Rc<State>,
checking_queue_size: Cell<bool>,
socket: AsyncFd,
pub objects: Objects,
events: AsyncQueue<WlEvent>,
@ -401,39 +401,33 @@ impl Client {
self.state.clients.shutdown(self.id);
}
pub fn event_locked(&self, event: Box<dyn EventFormatter>) -> bool {
self.events.push(WlEvent::Event(event));
self.events.size() > MAX_PENDING_EVENTS
pub fn event(self: &Rc<Self>, event: Box<dyn EventFormatter>) {
self.event2(WlEvent::Event(event));
}
pub async fn event(&self, event: Box<dyn EventFormatter>) -> Result<(), ClientError> {
self.event2(WlEvent::Event(event)).await
pub fn flush(self: &Rc<Self>) {
self.event2(WlEvent::Flush);
}
pub async fn flush(&self) -> Result<(), ClientError> {
self.event2(WlEvent::Flush).await
}
async fn event2(&self, event: WlEvent) -> Result<(), ClientError> {
pub fn event2(self: &Rc<Self>, event: WlEvent) {
self.events.push(event);
self.check_queue_size().await
if self.events.size() > MAX_PENDING_EVENTS {
if !self.checking_queue_size.replace(true) {
self.state.slow_clients.push(self.clone());
}
}
}
pub fn event2_locked(&self, event: WlEvent) -> bool {
self.events.push(event);
self.events.size() > MAX_PENDING_EVENTS
}
pub async fn check_queue_size(&self) -> Result<(), ClientError> {
pub async fn check_queue_size(&self) {
if self.events.size() > MAX_PENDING_EVENTS {
self.state.eng.yield_now().await;
if self.events.size() > MAX_PENDING_EVENTS {
log::error!("Client {} is too slow at fetching events", self.id.0);
self.state.clients.kill(self.id);
return Err(ClientError::OutBufferOverflow);
return;
}
}
Ok(())
self.checking_queue_size.set(false);
}
pub fn get_buffer(&self, id: WlBufferId) -> Result<Rc<WlBuffer>, ClientError> {
@ -471,22 +465,6 @@ impl Client {
}
}
fn simple_add_obj<T: Object>(&self, obj: &Rc<T>, client: bool) -> Result<(), ClientError> {
if client {
self.objects.add_client_object(obj.clone())
} else {
self.objects.add_server_object(obj.clone());
Ok(())
}
}
fn simple_remove_obj<'a>(
&'a self,
id: ObjectId,
) -> impl Future<Output = Result<(), ClientError>> + 'a {
self.objects.remove_obj(self, id)
}
pub fn lock_registries(&self) -> RefMut<AHashMap<WlRegistryId, Rc<WlRegistry>>> {
self.objects.registries()
}
@ -501,36 +479,44 @@ impl Client {
event,
);
}
}
pub trait AddObj<T> {
type RemoveObj<'a>: Future<Output = Result<(), ClientError>> + 'a;
fn add_client_obj(&self, obj: &Rc<T>) -> Result<(), ClientError> {
pub fn add_client_obj<T: WaylandObject>(&self, obj: &Rc<T>) -> Result<(), ClientError> {
self.add_obj(obj, true)
}
fn add_server_obj(&self, obj: &Rc<T>) {
#[allow(dead_code)]
pub fn add_server_obj<T: WaylandObject>(&self, obj: &Rc<T>) {
self.add_obj(obj, false).expect("add_server_obj failed")
}
fn add_obj(&self, obj: &Rc<T>, client: bool) -> Result<(), ClientError>;
fn add_obj<T: WaylandObject>(&self, obj: &Rc<T>, client: bool) -> Result<(), ClientError> {
if client {
self.objects.add_client_object(obj.clone())?;
} else {
self.objects.add_server_object(obj.clone());
}
obj.clone().add(self);
Ok(())
}
fn remove_obj<'a>(&'a self, obj: &'a T) -> Self::RemoveObj<'a>;
pub fn remove_obj<T: WaylandObject>(self: &Rc<Self>, obj: &T) -> Result<(), ClientError> {
obj.remove(self);
self.objects.remove_obj(self, obj.id())
}
}
pub trait WaylandObject: Object {
fn add(self: Rc<Self>, client: &Client) {
let _ = client;
}
fn remove(&self, client: &Client) {
let _ = client;
}
}
macro_rules! simple_add_obj {
($ty:ty) => {
impl AddObj<$ty> for Client {
type RemoveObj<'a> = impl Future<Output = Result<(), ClientError>> + 'a;
fn add_obj(&self, obj: &Rc<$ty>, client: bool) -> Result<(), ClientError> {
self.simple_add_obj(obj, client)
}
fn remove_obj<'a>(&'a self, obj: &'a $ty) -> Self::RemoveObj<'a> {
self.simple_remove_obj(obj.id())
}
}
impl WaylandObject for $ty {}
};
}
@ -555,17 +541,12 @@ simple_add_obj!(WlDataSource);
macro_rules! dedicated_add_obj {
($ty:ty, $field:ident) => {
impl AddObj<$ty> for Client {
type RemoveObj<'a> = impl Future<Output = Result<(), ClientError>> + 'a;
fn add_obj(&self, obj: &Rc<$ty>, client: bool) -> Result<(), ClientError> {
self.simple_add_obj(obj, client)?;
self.objects.$field.set(obj.id().into(), obj.clone());
Ok(())
impl WaylandObject for $ty {
fn add(self: Rc<Self>, client: &Client) {
client.objects.$field.set(self.id().into(), self);
}
fn remove_obj<'a>(&'a self, obj: &'a $ty) -> Self::RemoveObj<'a> {
self.objects.$field.remove(&obj.id().into());
self.simple_remove_obj(obj.id())
fn remove(&self, client: &Client) {
client.objects.$field.remove(&self.id().into());
}
}
};

View file

@ -114,7 +114,7 @@ impl Objects {
Ok(())
}
pub async fn remove_obj(&self, client_data: &Client, id: ObjectId) -> Result<(), ClientError> {
pub fn remove_obj(&self, client_data: &Rc<Client>, id: ObjectId) -> Result<(), ClientError> {
let _obj = match self.registry.remove(&id) {
Some(o) => o,
_ => return Err(ClientError::UnknownId),
@ -129,9 +129,7 @@ impl Objects {
}
ids[pos] |= 1 << seg_offset;
}
client_data
.event(client_data.display()?.delete_id(id))
.await?;
client_data.event(client_data.display()?.delete_id(id));
Ok(())
}

View file

@ -1,4 +1,4 @@
use crate::client::{AddObj, Client, ClientError, WlEvent};
use crate::client::{Client, ClientError, WlEvent};
use crate::object::ObjectId;
use crate::utils::buffd::{BufFdIn, BufFdOut, MsgFormatter, MsgParser};
use crate::utils::oneshot::OneshotRx;
@ -38,11 +38,8 @@ async fn dispatch_fr(data: Rc<Client>) {
loop {
let mut fr = data.dispatch_frame_requests.pop().await;
loop {
if let Err(e) = data.event(fr.done()).await {
log::error!("Could not dispatch frame event: {}", ErrorFmt(e));
return;
}
if let Err(e) = data.remove_obj(&*fr).await {
data.event(fr.done());
if let Err(e) = data.remove_obj(&*fr) {
log::error!("Could not remove frame object: {}", ErrorFmt(e));
return;
}
@ -51,10 +48,7 @@ async fn dispatch_fr(data: Rc<Client>) {
_ => break,
};
}
if let Err(e) = data.event2(WlEvent::Flush).await {
log::error!("Could not dispatch frame event: {}", ErrorFmt(e));
return;
}
data.flush();
}
}
@ -97,10 +91,10 @@ async fn receive(data: Rc<Client>) {
}
// log::trace!("{:x?}", data_buf);
let parser = MsgParser::new(&mut buf, &data_buf[..]);
if let Err(e) = obj.handle_request(request, parser).await {
if let Err(e) = obj.handle_request(request, parser) {
return Err(ClientError::RequestError(Box::new(e)));
}
data.event2(WlEvent::Flush).await?;
data.flush();
}
};
let res: Result<(), ClientError> = recv.await;