use { crate::{ io_uring::{IoUring, IoUringError}, utils::{ buf::{Buf, DynamicBuf}, queue::AsyncQueue, stack::Stack, }, }, std::{ collections::VecDeque, mem::{self}, rc::Rc, }, thiserror::Error, uapi::{c, OwnedFd}, }; #[derive(Debug, Error)] pub enum BufIoError { #[error("Could not write to the socket")] FlushError(#[source] IoUringError), #[error("Could not read from the socket")] ReadError(#[source] IoUringError), #[error("The socket is closed")] Closed, } pub struct BufIoMessage { pub fds: Vec>, pub buf: Buf, } struct MessageOffset { msg: BufIoMessage, offset: usize, } pub struct BufIo { fd: Rc, ring: Rc, bufs: Stack, outgoing: AsyncQueue, } pub struct BufIoIncoming { bufio: Rc, buf: Buf, buf_start: usize, buf_end: usize, pub fds: VecDeque>, } struct Outgoing { bufio: Rc, msgs: VecDeque, bufs: Vec, } impl BufIo { pub fn new(fd: &Rc, ring: &Rc) -> Self { Self { fd: fd.clone(), ring: ring.clone(), bufs: Default::default(), outgoing: Default::default(), } } pub fn shutdown(&self) { let _ = uapi::shutdown(self.fd.raw(), c::SHUT_RDWR); } pub fn buf(&self) -> DynamicBuf { let buf = self.bufs.pop().unwrap_or_default(); DynamicBuf::from_buf(buf) } pub fn send(&self, msg: BufIoMessage) { self.outgoing.push(msg); } pub async fn outgoing(self: Rc) -> Result<(), BufIoError> { let mut outgoing = Outgoing { bufio: self, msgs: Default::default(), bufs: vec![], }; outgoing.run().await } pub fn incoming(self: &Rc) -> BufIoIncoming { BufIoIncoming { bufio: self.clone(), buf: Buf::new(4096), buf_start: 0, buf_end: 0, fds: Default::default(), } } } impl BufIoIncoming { pub async fn fill_msg_buf( &mut self, mut n: usize, buf: &mut Vec, ) -> Result<(), BufIoError> { while n > 0 { if self.buf_start == self.buf_end { self.buf_start = 0; self.buf_end = 0; let res = self .bufio .ring .recvmsg(&self.bufio.fd, &mut [self.buf.clone()], &mut self.fds) .await; match res { Ok(n) => self.buf_end = n, Err(e) => return Err(BufIoError::ReadError(e)), } if self.buf_start == self.buf_end { return Err(BufIoError::Closed); } } let read = n.min(self.buf_end - self.buf_start); let buf_start = self.buf_start; buf.extend_from_slice(&self.buf[buf_start..buf_start + read]); n -= read; self.buf_start += read; } Ok(()) } } impl Outgoing { async fn run(&mut self) -> Result<(), BufIoError> { loop { self.bufio.outgoing.non_empty().await; if let Err(e) = self.try_flush().await { return Err(BufIoError::FlushError(e)); } } } async fn try_flush(&mut self) -> Result<(), IoUringError> { loop { while let Some(msg) = self.bufio.outgoing.try_pop() { self.msgs.push_back(MessageOffset { msg, offset: 0 }); } if self.msgs.is_empty() { return Ok(()); } let mut fds = Vec::new(); for msg in &mut self.msgs { if msg.msg.fds.len() > 0 { if fds.len() > 0 || self.bufs.len() > 0 { break; } fds = mem::take(&mut msg.msg.fds); } self.bufs.push(msg.msg.buf.slice(msg.offset..)); } let res = self .bufio .ring .sendmsg(&self.bufio.fd, &mut self.bufs, fds, None) .await; self.bufs.clear(); let mut n = res?; while n > 0 { let len = self.msgs[0].msg.buf.len() - self.msgs[0].offset; if n < len { self.msgs[0].offset += n; break; } n -= len; let msg = self.msgs.pop_front().unwrap(); self.bufio.bufs.push(msg.msg.buf); } } } }