1
0
Fork 0
forked from wry/wry

autocommit 2022-02-06 19:56:51 CET

This commit is contained in:
Julian Orth 2022-02-06 19:56:51 +01:00
parent 1fdff156ec
commit 3f50b0c75e
37 changed files with 452 additions and 439 deletions

View file

@ -6,7 +6,7 @@ use crate::ifs::wl_display::WlDisplay;
use crate::ifs::wl_registry::{WlRegistry};
use crate::object::{Interface, Object, ObjectId, WL_DISPLAY_ID};
use crate::state::State;
use crate::utils::buffd::{MsgFormatter, MsgParser, MsgParserError};
use crate::utils::buffd::{MsgFormatter, MsgParser, MsgParserError, OutBufferSwapchain};
use crate::utils::numcell::NumCell;
use crate::utils::oneshot::{oneshot, OneshotTx};
use crate::utils::queue::AsyncQueue;
@ -19,6 +19,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::mem;
use std::rc::Rc;
use uapi::{c, OwnedFd};
use crate::utils::asyncevent::AsyncEvent;
use crate::wire::WlRegistryId;
mod error;
@ -92,9 +93,9 @@ impl Clients {
checking_queue_size: Cell::new(false),
socket: global.eng.fd(&Rc::new(socket))?,
objects: Objects::new(),
events: AsyncQueue::new(),
swapchain: Default::default(),
flush_request: Default::default(),
shutdown: Cell::new(Some(send)),
shutdown_sent: Cell::new(false),
dispatch_frame_requests: AsyncQueue::new(),
});
let display = Rc::new(WlDisplay::new(&data));
@ -126,8 +127,7 @@ impl Clients {
if let Some(client) = self.clients.borrow_mut().remove(&client_id) {
log::info!("Shutting down client {}", client.data.id.0);
client.data.shutdown.replace(None).unwrap().send(());
client.data.events.push(WlEvent::Shutdown);
client.data.shutdown_sent.set(true);
client.data.flush_request.trigger();
self.shutdown_clients.borrow_mut().insert(client_id, client);
}
}
@ -158,42 +158,33 @@ pub struct ClientHolder {
impl Drop for ClientHolder {
fn drop(&mut self) {
self.data.objects.destroy();
self.data.events.clear();
self.data.dispatch_frame_requests.clear();
}
}
pub trait EventFormatter: Debug {
fn format(self: Box<Self>, fmt: &mut MsgFormatter<'_>);
fn format(self, fmt: &mut MsgFormatter<'_>);
fn id(&self) -> ObjectId;
fn interface(&self) -> Interface;
}
pub type DynEventFormatter = Box<dyn EventFormatter>;
pub trait RequestParser<'a>: Debug + Sized {
fn parse(parser: &mut MsgParser<'_, 'a>) -> Result<Self, MsgParserError>;
}
pub enum WlEvent {
Flush,
Shutdown,
Event(Box<dyn EventFormatter>),
}
pub struct Client {
pub id: ClientId,
pub state: Rc<State>,
checking_queue_size: Cell<bool>,
socket: AsyncFd,
pub objects: Objects,
events: AsyncQueue<WlEvent>,
swapchain: Rc<RefCell<OutBufferSwapchain>>,
flush_request: AsyncEvent,
shutdown: Cell<Option<OneshotTx<()>>>,
shutdown_sent: Cell<bool>,
pub dispatch_frame_requests: AsyncQueue<Rc<WlCallback>>,
}
const MAX_PENDING_EVENTS: usize = 10000;
const MAX_PENDING_BUFFERS: usize = 10;
impl Client {
pub fn invalid_request(&self, obj: &dyn Object, request: u32) {
@ -205,7 +196,10 @@ impl Client {
obj.interface().name(),
);
match self.display() {
Ok(d) => self.fatal_event(d.invalid_request(obj, request)),
Ok(d) => {
d.send_invalid_request(obj, request);
self.state.clients.shutdown(self.id);
},
Err(e) => {
log::error!(
"Could not retrieve display of client {}: {}",
@ -249,7 +243,10 @@ impl Client {
let msg = ErrorFmt(message).to_string();
log::error!("Client {}: A fatal error occurred: {}", self.id.0, msg,);
match self.display() {
Ok(d) => self.fatal_event(d.implementation_error(msg)),
Ok(d) => {
d.send_implementation_error(msg);
self.state.clients.shutdown(self.id);
},
Err(e) => {
log::error!(
"Could not retrieve display of client {}: {}",
@ -263,38 +260,39 @@ impl Client {
pub fn protocol_error(&self, obj: &dyn Object, code: u32, message: String) {
if let Ok(d) = self.display() {
self.fatal_event(d.error(obj.id(), code, message));
} else {
self.state.clients.shutdown(self.id);
d.send_error(obj.id(), code, message);
}
}
pub fn fatal_event(&self, event: Box<dyn EventFormatter>) {
self.events.push(WlEvent::Event(event));
self.state.clients.shutdown(self.id);
}
pub fn event(self: &Rc<Self>, event: Box<dyn EventFormatter>) {
self.event2(WlEvent::Event(event));
}
pub fn flush(self: &Rc<Self>) {
self.event2(WlEvent::Flush);
}
pub fn event2(self: &Rc<Self>, event: WlEvent) {
self.events.push(event);
if self.events.size() > MAX_PENDING_EVENTS {
if !self.checking_queue_size.replace(true) {
self.state.slow_clients.push(self.clone());
pub fn event<T: EventFormatter>(self: &Rc<Self>, event: T) {
if log::log_enabled!(log::Level::Trace) {
self.log_event(&event);
}
let mut fds = vec![];
let mut swapchain = self.swapchain.borrow_mut();
let mut fmt = MsgFormatter::new(&mut swapchain.cur, &mut fds);
event.format(&mut fmt);
fmt.write_len();
if swapchain.cur.is_full() {
swapchain.commit();
if swapchain.pending.len() > MAX_PENDING_BUFFERS {
if !self.checking_queue_size.replace(true) {
self.state.slow_clients.push(self.clone());
}
}
self.flush_request.trigger();
}
}
pub fn flush(&self) {
self.flush_request.trigger();
}
pub async fn check_queue_size(&self) {
if self.events.size() > MAX_PENDING_EVENTS {
if self.swapchain.borrow_mut().exceeds_limit() {
self.state.eng.yield_now().await;
if self.events.size() > MAX_PENDING_EVENTS {
if self.swapchain.borrow_mut().exceeds_limit() {
log::error!("Client {} is too slow at fetching events", self.id.0);
self.state.clients.kill(self.id);
return;
@ -307,7 +305,7 @@ impl Client {
self.objects.registries()
}
pub fn log_event(&self, event: &dyn EventFormatter) {
pub fn log_event<T: EventFormatter>(&self, event: &T) {
log::trace!(
"Client {} <= {}@{}.{:?}",
self.id,

View file

@ -156,7 +156,7 @@ impl Objects {
}
ids[pos] |= 1 << seg_offset;
} else {
client_data.event(client_data.display()?.delete_id(id));
client_data.display()?.send_delete_id(id);
}
Ok(())
}

View file

@ -1,6 +1,7 @@
use crate::client::{Client, ClientError, WlEvent};
use std::collections::VecDeque;
use crate::client::{Client, ClientError};
use crate::object::ObjectId;
use crate::utils::buffd::{BufFdIn, BufFdOut, MsgFormatter, MsgParser};
use crate::utils::buffd::{BufFdIn, BufFdOut, MsgParser};
use crate::utils::oneshot::OneshotRx;
use crate::utils::vec_ext::VecExt;
use crate::ErrorFmt;
@ -19,9 +20,7 @@ pub async fn client(data: Rc<Client>, shutdown: OneshotRx<()>) {
}
drop(recv);
drop(dispatch_fr);
if !data.shutdown_sent.get() {
data.events.push(WlEvent::Shutdown);
}
data.flush_request.trigger();
match data.state.eng.timeout(5000) {
Ok(timeout) => {
timeout.await;
@ -38,7 +37,7 @@ async fn dispatch_fr(data: Rc<Client>) {
loop {
let mut fr = data.dispatch_frame_requests.pop().await;
loop {
data.event(fr.done());
fr.send_done();
if let Err(e) = data.remove_obj(&*fr) {
log::error!("Could not remove frame object: {}", ErrorFmt(e));
return;
@ -66,7 +65,8 @@ async fn receive(data: Rc<Client>) {
let obj = match data.objects.get_obj(obj_id) {
Ok(obj) => obj,
_ => {
data.fatal_event(display.invalid_object(obj_id));
display.send_invalid_object(obj_id);
data.state.clients.shutdown(data.id);
return Err(ClientError::InvalidObject(obj_id));
}
};
@ -109,49 +109,27 @@ async fn receive(data: Rc<Client>) {
data.id.0,
e
);
if !data.shutdown_sent.get() {
data.fatal_event(display.implementation_error(e.to_string()));
}
display.send_implementation_error(e.to_string());
data.state.clients.shutdown(data.id);
}
}
}
async fn send(data: Rc<Client>) {
let send = async {
let mut buf = BufFdOut::new(data.socket.clone());
let mut flush_requested = false;
let mut out = BufFdOut::new(data.socket.clone());
let mut buffers = VecDeque::new();
loop {
let mut event = data.events.pop().await;
loop {
match event {
WlEvent::Flush => {
flush_requested = true;
}
WlEvent::Shutdown => {
buf.flush().await?;
return Ok(());
}
WlEvent::Event(e) => {
if log::log_enabled!(log::Level::Trace) {
data.log_event(&*e);
}
let mut fds = vec![];
let mut fmt = MsgFormatter::new(&mut buf, &mut fds);
e.format(&mut fmt);
fmt.write_len();
if buf.needs_flush() {
buf.flush().await?;
flush_requested = false;
}
}
}
event = match data.events.try_pop() {
Some(e) => e,
_ => break,
};
data.flush_request.triggered().await;
{
let mut swapchain = data.swapchain.borrow_mut();
swapchain.commit();
mem::swap(&mut swapchain.pending, &mut buffers);
}
if mem::take(&mut flush_requested) {
buf.flush().await?;
let mut timeout = None;
while let Some(mut cur) = buffers.pop_front() {
out.flush(&mut cur, &mut timeout).await?;
data.swapchain.borrow_mut().free.push(cur);
}
}
};