diff --git a/Cargo.lock b/Cargo.lock index 827cce6b..c27b3b6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,6 +648,16 @@ dependencies = [ "jay-utils", ] +[[package]] +name = "jay-bufio" +version = "0.1.0" +dependencies = [ + "jay-io-uring", + "jay-utils", + "thiserror", + "uapi", +] + [[package]] name = "jay-cmm" version = "0.1.0" @@ -682,6 +692,7 @@ dependencies = [ "jay-algorithms", "jay-ash", "jay-async-engine", + "jay-bufio", "jay-cmm", "jay-config", "jay-criteria", diff --git a/Cargo.toml b/Cargo.toml index ca160a6b..b4424131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "tracy", "async-engine", "io-uring", + "bufio", "toml-config", "algorithms", "toml-spec", @@ -59,6 +60,7 @@ jay-time = { version = "0.1.0", path = "time" } jay-tracy = { version = "0.1.0", path = "tracy" } jay-async-engine = { version = "0.1.0", path = "async-engine" } jay-io-uring = { version = "0.1.0", path = "io-uring" } +jay-bufio = { version = "0.1.0", path = "bufio" } uapi = "0.2.13" thiserror = "2.0.11" diff --git a/bufio/Cargo.toml b/bufio/Cargo.toml new file mode 100644 index 00000000..4b214e77 --- /dev/null +++ b/bufio/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "jay-bufio" +version = "0.1.0" +edition = "2024" +license = "GPL-3.0-only" + +[dependencies] +jay-io-uring = { version = "0.1.0", path = "../io-uring" } +jay-utils = { version = "0.1.0", path = "../utils" } + +thiserror = "2.0.11" +uapi = "0.2.13" diff --git a/bufio/src/lib.rs b/bufio/src/lib.rs new file mode 100644 index 00000000..879b2be5 --- /dev/null +++ b/bufio/src/lib.rs @@ -0,0 +1,183 @@ +use { + jay_io_uring::{IoUring, IoUringError}, + jay_utils::{ + buf::{Buf, DynamicBuf}, + queue::AsyncQueue, + stack::Stack, + }, + std::{ + collections::VecDeque, + mem::{self}, + rc::Rc, + }, + thiserror::Error, + uapi::{OwnedFd, c}, +}; + +#[derive(Debug, Error)] +pub enum BufIoError { + #[error("Could not write to the socket")] + FlushError(#[source] IoUringError), + #[error("Could not read from the socket")] + ReadError(#[source] IoUringError), + #[error("The socket is closed")] + Closed, +} + +pub struct BufIoMessage { + pub fds: Vec>, + pub buf: Buf, +} + +struct MessageOffset { + msg: BufIoMessage, + offset: usize, +} + +pub struct BufIo { + fd: Rc, + ring: Rc, + bufs: Stack, + outgoing: AsyncQueue, +} + +pub struct BufIoIncoming { + bufio: Rc, + + buf: Buf, + buf_start: usize, + buf_end: usize, + pub fds: VecDeque>, +} + +struct Outgoing { + bufio: Rc, + + msgs: VecDeque, + bufs: Vec, +} + +impl BufIo { + pub fn new(fd: &Rc, ring: &Rc) -> Self { + Self { + fd: fd.clone(), + ring: ring.clone(), + bufs: Default::default(), + outgoing: Default::default(), + } + } + + pub fn shutdown(&self) { + let _ = uapi::shutdown(self.fd.raw(), c::SHUT_RDWR); + } + + pub fn buf(&self) -> DynamicBuf { + let buf = self.bufs.pop().unwrap_or_default(); + DynamicBuf::from_buf(buf) + } + + pub fn send(&self, msg: BufIoMessage) { + self.outgoing.push(msg); + } + + pub async fn outgoing(self: Rc) -> Result<(), BufIoError> { + let mut outgoing = Outgoing { + bufio: self, + msgs: Default::default(), + bufs: vec![], + }; + outgoing.run().await + } + + pub fn incoming(self: &Rc) -> BufIoIncoming { + BufIoIncoming { + bufio: self.clone(), + buf: Buf::new(4096), + buf_start: 0, + buf_end: 0, + fds: Default::default(), + } + } +} + +impl BufIoIncoming { + pub async fn fill_msg_buf( + &mut self, + mut n: usize, + buf: &mut Vec, + ) -> Result<(), BufIoError> { + while n > 0 { + if self.buf_start == self.buf_end { + self.buf_start = 0; + self.buf_end = 0; + let res = self + .bufio + .ring + .recvmsg(&self.bufio.fd, &mut [self.buf.clone()], &mut self.fds) + .await; + match res { + Ok(n) => self.buf_end = n, + Err(e) => return Err(BufIoError::ReadError(e)), + } + if self.buf_start == self.buf_end { + return Err(BufIoError::Closed); + } + } + let read = n.min(self.buf_end - self.buf_start); + let buf_start = self.buf_start; + buf.extend_from_slice(&self.buf[buf_start..buf_start + read]); + n -= read; + self.buf_start += read; + } + Ok(()) + } +} + +impl Outgoing { + async fn run(&mut self) -> Result<(), BufIoError> { + loop { + self.bufio.outgoing.non_empty().await; + if let Err(e) = self.try_flush().await { + return Err(BufIoError::FlushError(e)); + } + } + } + + async fn try_flush(&mut self) -> Result<(), IoUringError> { + loop { + while let Some(msg) = self.bufio.outgoing.try_pop() { + self.msgs.push_back(MessageOffset { msg, offset: 0 }); + } + if self.msgs.is_empty() { + return Ok(()); + } + let mut fds = Vec::new(); + for msg in &mut self.msgs { + if msg.msg.fds.len() > 0 { + if fds.len() > 0 || self.bufs.len() > 0 { + break; + } + fds = mem::take(&mut msg.msg.fds); + } + self.bufs.push(msg.msg.buf.slice(msg.offset..)); + } + let res = self + .bufio + .ring + .sendmsg(&self.bufio.fd, &mut self.bufs, fds, None) + .await; + self.bufs.clear(); + let mut n = res?; + while n > 0 { + let len = self.msgs[0].msg.buf.len() - self.msgs[0].offset; + if n < len { + self.msgs[0].offset += n; + break; + } + n -= len; + let msg = self.msgs.pop_front().unwrap(); + self.bufio.bufs.push(msg.msg.buf); + } + } + } +} diff --git a/src/utils/bufio.rs b/src/utils/bufio.rs index 5cbd5260..4df727e5 100644 --- a/src/utils/bufio.rs +++ b/src/utils/bufio.rs @@ -1,185 +1 @@ -use { - crate::{ - io_uring::{IoUring, IoUringError}, - utils::{ - buf::{Buf, DynamicBuf}, - queue::AsyncQueue, - stack::Stack, - }, - }, - std::{ - collections::VecDeque, - mem::{self}, - rc::Rc, - }, - thiserror::Error, - uapi::{OwnedFd, c}, -}; - -#[derive(Debug, Error)] -pub enum BufIoError { - #[error("Could not write to the socket")] - FlushError(#[source] IoUringError), - #[error("Could not read from the socket")] - ReadError(#[source] IoUringError), - #[error("The socket is closed")] - Closed, -} - -pub struct BufIoMessage { - pub fds: Vec>, - pub buf: Buf, -} - -struct MessageOffset { - msg: BufIoMessage, - offset: usize, -} - -pub struct BufIo { - fd: Rc, - ring: Rc, - bufs: Stack, - outgoing: AsyncQueue, -} - -pub struct BufIoIncoming { - bufio: Rc, - - buf: Buf, - buf_start: usize, - buf_end: usize, - pub fds: VecDeque>, -} - -struct Outgoing { - bufio: Rc, - - msgs: VecDeque, - bufs: Vec, -} - -impl BufIo { - pub fn new(fd: &Rc, ring: &Rc) -> Self { - Self { - fd: fd.clone(), - ring: ring.clone(), - bufs: Default::default(), - outgoing: Default::default(), - } - } - - pub fn shutdown(&self) { - let _ = uapi::shutdown(self.fd.raw(), c::SHUT_RDWR); - } - - pub fn buf(&self) -> DynamicBuf { - let buf = self.bufs.pop().unwrap_or_default(); - DynamicBuf::from_buf(buf) - } - - pub fn send(&self, msg: BufIoMessage) { - self.outgoing.push(msg); - } - - pub async fn outgoing(self: Rc) -> Result<(), BufIoError> { - let mut outgoing = Outgoing { - bufio: self, - msgs: Default::default(), - bufs: vec![], - }; - outgoing.run().await - } - - pub fn incoming(self: &Rc) -> BufIoIncoming { - BufIoIncoming { - bufio: self.clone(), - buf: Buf::new(4096), - buf_start: 0, - buf_end: 0, - fds: Default::default(), - } - } -} - -impl BufIoIncoming { - pub async fn fill_msg_buf( - &mut self, - mut n: usize, - buf: &mut Vec, - ) -> Result<(), BufIoError> { - while n > 0 { - if self.buf_start == self.buf_end { - self.buf_start = 0; - self.buf_end = 0; - let res = self - .bufio - .ring - .recvmsg(&self.bufio.fd, &mut [self.buf.clone()], &mut self.fds) - .await; - match res { - Ok(n) => self.buf_end = n, - Err(e) => return Err(BufIoError::ReadError(e)), - } - if self.buf_start == self.buf_end { - return Err(BufIoError::Closed); - } - } - let read = n.min(self.buf_end - self.buf_start); - let buf_start = self.buf_start; - buf.extend_from_slice(&self.buf[buf_start..buf_start + read]); - n -= read; - self.buf_start += read; - } - Ok(()) - } -} - -impl Outgoing { - async fn run(&mut self) -> Result<(), BufIoError> { - loop { - self.bufio.outgoing.non_empty().await; - if let Err(e) = self.try_flush().await { - return Err(BufIoError::FlushError(e)); - } - } - } - - async fn try_flush(&mut self) -> Result<(), IoUringError> { - loop { - while let Some(msg) = self.bufio.outgoing.try_pop() { - self.msgs.push_back(MessageOffset { msg, offset: 0 }); - } - if self.msgs.is_empty() { - return Ok(()); - } - let mut fds = Vec::new(); - for msg in &mut self.msgs { - if msg.msg.fds.len() > 0 { - if fds.len() > 0 || self.bufs.len() > 0 { - break; - } - fds = mem::take(&mut msg.msg.fds); - } - self.bufs.push(msg.msg.buf.slice(msg.offset..)); - } - let res = self - .bufio - .ring - .sendmsg(&self.bufio.fd, &mut self.bufs, fds, None) - .await; - self.bufs.clear(); - let mut n = res?; - while n > 0 { - let len = self.msgs[0].msg.buf.len() - self.msgs[0].offset; - if n < len { - self.msgs[0].offset += n; - break; - } - n -= len; - let msg = self.msgs.pop_front().unwrap(); - self.bufio.bufs.push(msg.msg.buf); - } - } - } -} +pub use jay_bufio::*;