1
0
Fork 0
forked from wry/wry

ei: add support for libei

This commit is contained in:
Julian Orth 2024-07-24 01:38:05 +02:00
parent 084fe50259
commit 40e87f8f91
69 changed files with 4340 additions and 72 deletions

View file

@ -0,0 +1,45 @@
use {
crate::{
ei::ei_object::{EiInterface, EiObjectId},
utils::buffd::{BufFdError, EiMsgParserError},
},
std::error::Error,
thiserror::Error,
};
#[derive(Debug, Error)]
pub enum EiClientError {
#[error("An error occurred reading from/writing to the client")]
Io(#[from] BufFdError),
#[error("An error occurred while processing a request")]
RequestError(#[source] Box<EiClientError>),
#[error("Client tried to invoke a non-existent method")]
InvalidMethod,
#[error("The message size is < 16")]
MessageSizeTooSmall,
#[error("The message size is > 2^16")]
MessageSizeTooLarge,
#[error("The size of the message is not a multiple of 4")]
UnalignedMessage,
#[error("The object id is unknown")]
UnknownId,
#[error("Client tried to access non-existent object {0}")]
InvalidObject(EiObjectId),
#[error("The id is already in use")]
IdAlreadyInUse,
#[error("The client object id is out of bounds")]
ClientIdOutOfBounds,
#[error("Could not process a `{}.{}` request", .interface.name(), .method)]
MethodError {
interface: EiInterface,
method: &'static str,
#[source]
error: Box<dyn Error + 'static>,
},
#[error("Could not add object {0} to the client")]
AddObjectError(EiObjectId, #[source] Box<EiClientError>),
}
#[derive(Debug, Error)]
#[error("Parsing failed")]
pub struct EiParserError(#[source] pub EiMsgParserError);

View file

@ -0,0 +1,83 @@
use {
crate::{
ei::{
ei_client::ei_error::EiClientError,
ei_ifs::ei_handshake::EiHandshake,
ei_object::{EiObject, EiObjectId},
},
utils::{copyhashmap::CopyHashMap, numcell::NumCell},
},
std::rc::Rc,
};
pub struct EiObjects {
registry: CopyHashMap<EiObjectId, Rc<dyn EiObject>>,
next_sever_id: NumCell<u64>,
}
pub const MIN_SERVER_ID: u64 = 0xff00_0000_0000_0000;
impl EiObjects {
pub fn new() -> Self {
Self {
registry: Default::default(),
next_sever_id: NumCell::new(MIN_SERVER_ID),
}
}
pub fn destroy(&self) {
for obj in self.registry.lock().values_mut() {
obj.break_loops();
}
self.registry.clear();
}
pub fn id<T>(&self) -> T
where
EiObjectId: Into<T>,
{
EiObjectId::from_raw(self.next_sever_id.fetch_add(1)).into()
}
pub fn get_obj(&self, id: EiObjectId) -> Option<Rc<dyn EiObject>> {
self.registry.get(&id)
}
pub fn add_server_object(&self, obj: Rc<dyn EiObject>) {
let id = obj.id();
assert!(id.raw() >= MIN_SERVER_ID);
assert!(!self.registry.contains(&id));
self.registry.set(id, obj.clone());
}
pub fn add_handshake(&self, obj: &Rc<EiHandshake>) {
assert_eq!(obj.id.raw(), 0);
assert!(self.registry.is_empty());
self.registry.set(obj.id.into(), obj.clone());
}
pub fn add_client_object(&self, obj: Rc<dyn EiObject>) -> Result<(), EiClientError> {
let id = obj.id();
let res = (|| {
if id.raw() == 0 || id.raw() >= MIN_SERVER_ID {
return Err(EiClientError::ClientIdOutOfBounds);
}
if self.registry.contains(&id) {
return Err(EiClientError::IdAlreadyInUse);
}
self.registry.set(id, obj.clone());
Ok(())
})();
if let Err(e) = res {
return Err(EiClientError::AddObjectError(id, Box::new(e)));
}
Ok(())
}
pub fn remove_obj(&self, id: EiObjectId) -> Result<(), EiClientError> {
match self.registry.remove(&id) {
Some(_) => Ok(()),
_ => Err(EiClientError::UnknownId),
}
}
}

View file

@ -0,0 +1,141 @@
use {
crate::{
async_engine::Phase,
ei::{
ei_client::{ei_error::EiClientError, EiClient},
ei_object::EiObjectId,
},
utils::{
buffd::{BufFdIn, BufFdOut, EiMsgParser},
errorfmt::ErrorFmt,
vec_ext::VecExt,
},
},
futures_util::{select, FutureExt},
std::{collections::VecDeque, mem, rc::Rc, time::Duration},
};
pub async fn ei_client(data: Rc<EiClient>) {
let mut recv = data.state.eng.spawn(receive(data.clone())).fuse();
let mut shutdown = data.shutdown.triggered().fuse();
let _send = data.state.eng.spawn2(Phase::PostLayout, send(data.clone()));
select! {
_ = recv => { },
_ = shutdown => { },
}
drop(recv);
data.flush_request.trigger();
match data.state.wheel.timeout(5000).await {
Ok(_) => {
log::error!("Could not shut down client {} within 5 seconds", data.id);
}
Err(e) => {
log::error!("Could not create a timeout: {}", ErrorFmt(e));
}
}
data.state.ei_clients.kill(data.id);
}
async fn receive(data: Rc<EiClient>) {
let recv = async {
let mut buf = BufFdIn::new(&data.socket, &data.state.ring);
let mut data_buf = Vec::<u32>::new();
loop {
let mut hdr = [0u32; 4];
buf.read_full(&mut hdr[..]).await?;
#[cfg(target_endian = "little")]
let obj_id = (hdr[0] as u64) | ((hdr[1] as u64) << 32);
#[cfg(target_endian = "big")]
let obj_id = (hdr[1] as u64) | ((hdr[0] as u64) << 32);
let obj_id = EiObjectId::from_raw(obj_id);
let len = hdr[2] as usize;
let request = hdr[3];
if len < 16 {
return Err(EiClientError::MessageSizeTooSmall);
}
if len > (u16::MAX as usize) {
return Err(EiClientError::MessageSizeTooLarge);
}
if len % 4 != 0 {
return Err(EiClientError::UnalignedMessage);
}
let len = (len - 16) / 4;
data_buf.clear();
data_buf.reserve(len);
let unused = data_buf.split_at_spare_mut_ext().1;
buf.read_full(&mut unused[..len]).await?;
unsafe {
data_buf.set_len(len);
}
let obj = match data.objects.get_obj(obj_id) {
Some(obj) => obj,
_ => match data.connection.get() {
None => {
return Err(EiClientError::InvalidObject(obj_id));
}
Some(c) => {
c.send_invalid_object(obj_id);
continue;
}
},
};
let parser = EiMsgParser::new(&mut buf, &data_buf[..]);
if let Err(e) = obj.handle_request(&data, request, parser) {
return Err(EiClientError::RequestError(Box::new(e)));
}
}
};
let res: Result<(), EiClientError> = recv.await;
if let Err(e) = res {
if data.disconnect_announced.get() {
log::info!("Client {} terminated the connection", data.id);
data.state.ei_clients.kill(data.id);
} else {
let e = ErrorFmt(e);
log::error!(
"An error occurred while trying to handle a message from client {}: {}",
data.id,
e
);
if let Some(c) = data.connection.get() {
c.send_disconnected(Some(&e.to_string()));
data.state.ei_clients.shutdown(data.id);
} else {
data.state.ei_clients.kill(data.id);
}
}
}
}
async fn send(data: Rc<EiClient>) {
let send = async {
let mut out = BufFdOut::new(&data.socket, &data.state.ring);
let mut buffers = VecDeque::new();
loop {
data.flush_request.triggered().await;
{
let mut swapchain = data.swapchain.borrow_mut();
swapchain.commit();
mem::swap(&mut swapchain.pending, &mut buffers);
}
let timeout = data.state.now() + Duration::from_millis(5000);
while let Some(mut cur) = buffers.pop_front() {
out.flush(&mut cur, timeout).await?;
data.swapchain.borrow_mut().free.push(cur);
}
}
};
let res: Result<(), EiClientError> = send.await;
if let Err(e) = res {
if data.disconnect_announced.get() {
log::info!("Client {} terminated the connection", data.id);
} else {
log::error!(
"An error occurred while sending data to client {}: {}",
data.id,
ErrorFmt(e)
);
}
}
data.state.ei_clients.kill(data.id);
}