io_uring: move fd utility helpers into crate
This commit is contained in:
parent
9606e0892c
commit
e996d9528a
5 changed files with 21 additions and 19 deletions
12
src/utils.rs
12
src/utils.rs
|
|
@ -86,14 +86,20 @@ pub mod event_listener {
|
|||
}
|
||||
}
|
||||
pub mod linkedlist;
|
||||
pub mod line_logger;
|
||||
pub mod object_drop_queue;
|
||||
pub mod line_logger {
|
||||
pub use jay_io_uring::line_logger::*;
|
||||
}
|
||||
pub mod object_drop_queue {
|
||||
pub use jay_io_uring::object_drop_queue::*;
|
||||
}
|
||||
pub mod pending_serial;
|
||||
pub mod run_toplevel {
|
||||
pub use jay_async_engine::RunToplevel;
|
||||
}
|
||||
pub mod scroller;
|
||||
pub mod timer;
|
||||
pub mod timer {
|
||||
pub use jay_io_uring::timer::*;
|
||||
}
|
||||
|
||||
pub mod clonecell {
|
||||
pub use jay_utils::clonecell::*;
|
||||
|
|
|
|||
|
|
@ -1,36 +0,0 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{IoUring, IoUringError},
|
||||
utils::{buf::Buf, vecdeque_ext::VecDequeExt},
|
||||
},
|
||||
isnt::std_1::collections::IsntVecDequeExt,
|
||||
std::{collections::VecDeque, rc::Rc},
|
||||
uapi::OwnedFd,
|
||||
};
|
||||
|
||||
pub async fn log_lines(
|
||||
ring: &IoUring,
|
||||
fd: &Rc<OwnedFd>,
|
||||
mut f: impl FnMut(&[u8], &[u8]),
|
||||
) -> Result<(), IoUringError> {
|
||||
let mut buf = VecDeque::<u8>::new();
|
||||
let mut buf2 = Buf::new(1024);
|
||||
let mut done = false;
|
||||
while !done {
|
||||
let n = ring.read(fd, buf2.clone()).await?;
|
||||
buf.extend(&buf2[..n]);
|
||||
if n == 0 {
|
||||
done = true;
|
||||
}
|
||||
while let Some(pos) = buf.iter().position(|b| b == &b'\n') {
|
||||
let (left, right) = buf.get_slices(..pos);
|
||||
f(left, right);
|
||||
buf.drain(..=pos);
|
||||
}
|
||||
}
|
||||
if buf.is_not_empty() {
|
||||
let (left, right) = buf.as_slices();
|
||||
f(left, right);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1,85 +0,0 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{IoUring, PendingPoll, PollCallback},
|
||||
utils::{errorfmt::ErrorFmt, oserror::OsError, stack::Stack},
|
||||
},
|
||||
std::{
|
||||
cell::{Cell, RefCell},
|
||||
rc::Rc,
|
||||
},
|
||||
uapi::{OwnedFd, c::c_short},
|
||||
};
|
||||
|
||||
pub struct ObjectDropQueue<T> {
|
||||
#[allow(dead_code)]
|
||||
ring: Rc<IoUring>,
|
||||
killed: Cell<bool>,
|
||||
pending: RefCell<Vec<Option<(T, PendingPoll)>>>,
|
||||
stack: Stack<Rc<Pollable<T>>>,
|
||||
}
|
||||
|
||||
struct Pollable<T> {
|
||||
queue: Rc<ObjectDropQueue<T>>,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl<T> ObjectDropQueue<T> {
|
||||
pub fn new(ring: &Rc<IoUring>) -> Self {
|
||||
Self {
|
||||
ring: ring.clone(),
|
||||
killed: Default::default(),
|
||||
pending: Default::default(),
|
||||
stack: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn push(self: &Rc<Self>, fd: &Rc<OwnedFd>, t: T)
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
if self.killed.get() {
|
||||
return;
|
||||
}
|
||||
let pending = &mut *self.pending.borrow_mut();
|
||||
let pollable = match self.stack.pop() {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let pollable = Rc::new(Pollable {
|
||||
queue: self.clone(),
|
||||
idx: pending.len(),
|
||||
});
|
||||
pending.push(None);
|
||||
pollable
|
||||
}
|
||||
};
|
||||
let idx = pollable.idx;
|
||||
match self.ring.readable_external(fd, pollable) {
|
||||
Ok(p) => {
|
||||
pending[idx] = Some((t, p));
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Could not register object: {}", ErrorFmt(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn kill(&self) {
|
||||
self.killed.set(true);
|
||||
self.pending.take();
|
||||
self.stack.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> PollCallback for Pollable<T> {
|
||||
fn completed(self: Rc<Self>, res: Result<c_short, OsError>) {
|
||||
if let Err(e) = res {
|
||||
log::error!("Could not wait for fd to become readable: {}", ErrorFmt(e));
|
||||
}
|
||||
let q = &self.queue;
|
||||
if !q.killed.get() {
|
||||
q.pending.borrow_mut()[self.idx] = None;
|
||||
q.stack.push(self.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
use {
|
||||
crate::{
|
||||
io_uring::{IoUring, IoUringError},
|
||||
utils::{
|
||||
buf::TypedBuf,
|
||||
oserror::{OsError, OsErrorExt2},
|
||||
},
|
||||
},
|
||||
std::{cell::RefCell, rc::Rc, time::Duration},
|
||||
thiserror::Error,
|
||||
uapi::{OwnedFd, c},
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum TimerError {
|
||||
#[error("Could not create a timer")]
|
||||
CreateTimer(#[source] OsError),
|
||||
#[error("Could not read from a timer")]
|
||||
TimerReadError(#[source] IoUringError),
|
||||
#[error("Could not set a timer")]
|
||||
SetTimer(#[source] OsError),
|
||||
#[error("The io-uring returned an error")]
|
||||
IoUringError(#[from] IoUringError),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TimerFd {
|
||||
fd: Rc<OwnedFd>,
|
||||
buf: Rc<RefCell<TypedBuf<u64>>>,
|
||||
}
|
||||
|
||||
impl TimerFd {
|
||||
pub fn new(clock_id: c::c_int) -> Result<Self, TimerError> {
|
||||
let fd = uapi::timerfd_create(clock_id, c::TFD_CLOEXEC)
|
||||
.map(Rc::new)
|
||||
.map_os_err(TimerError::CreateTimer)?;
|
||||
Ok(Self {
|
||||
fd,
|
||||
buf: Rc::new(RefCell::new(TypedBuf::new())),
|
||||
})
|
||||
}
|
||||
|
||||
#[expect(clippy::await_holding_refcell_ref)]
|
||||
pub async fn expired(&self, ring: &IoUring) -> Result<u64, TimerError> {
|
||||
let mut buf = self.buf.borrow_mut();
|
||||
if let Err(e) = ring.read(&self.fd, buf.buf()).await {
|
||||
return Err(TimerError::TimerReadError(e));
|
||||
}
|
||||
Ok(buf.t())
|
||||
}
|
||||
|
||||
pub fn program(
|
||||
&self,
|
||||
initial: Option<Duration>,
|
||||
periodic: Option<Duration>,
|
||||
) -> Result<(), TimerError> {
|
||||
let mut timerspec: c::itimerspec = uapi::pod_zeroed();
|
||||
if let Some(init) = initial {
|
||||
timerspec.it_value.tv_sec = init.as_secs() as _;
|
||||
timerspec.it_value.tv_nsec = init.subsec_nanos() as _;
|
||||
if let Some(per) = periodic {
|
||||
timerspec.it_interval.tv_sec = per.as_secs() as _;
|
||||
timerspec.it_interval.tv_nsec = per.subsec_nanos() as _;
|
||||
}
|
||||
}
|
||||
uapi::timerfd_settime(self.fd.raw(), 0, &timerspec).map_os_err(TimerError::SetTimer)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue