cpu_worker: add worker-thread framework
This commit is contained in:
parent
874d0d0c59
commit
ed4ef3c8e7
10 changed files with 718 additions and 3 deletions
1
src/cpu_worker/jobs.rs
Normal file
1
src/cpu_worker/jobs.rs
Normal file
|
|
@ -0,0 +1 @@
|
|||
pub mod read_write;
|
||||
153
src/cpu_worker/jobs/read_write.rs
Normal file
153
src/cpu_worker/jobs/read_write.rs
Normal file
|
|
@ -0,0 +1,153 @@
|
|||
use {
|
||||
crate::{
|
||||
async_engine::{AsyncEngine, SpawnedFuture},
|
||||
cpu_worker::{AsyncCpuWork, CompletedWork, CpuWork, WorkCompletion},
|
||||
io_uring::{IoUring, IoUringError, IoUringTaskId},
|
||||
},
|
||||
std::{
|
||||
any::Any,
|
||||
ptr,
|
||||
rc::Rc,
|
||||
slice,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
|
||||
Arc,
|
||||
},
|
||||
},
|
||||
thiserror::Error,
|
||||
uapi::{c, Fd},
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ReadWriteJobError {
|
||||
#[error("An io_uring error occurred")]
|
||||
IoUring(#[source] IoUringError),
|
||||
#[error("The job was cancelled")]
|
||||
Cancelled,
|
||||
#[error("Tried to operate outside the bounds of the file descriptor")]
|
||||
OutOfBounds,
|
||||
}
|
||||
|
||||
pub struct ReadWriteWork {
|
||||
cancel: Arc<CancelState>,
|
||||
config: Option<Box<ReadWriteWorkConfig>>,
|
||||
}
|
||||
|
||||
unsafe impl Send for ReadWriteWork {}
|
||||
|
||||
impl ReadWriteWork {
|
||||
#[expect(dead_code)]
|
||||
pub unsafe fn new() -> Self {
|
||||
let cancel = Arc::new(CancelState::default());
|
||||
ReadWriteWork {
|
||||
cancel: cancel.clone(),
|
||||
config: Some(Box::new(ReadWriteWorkConfig {
|
||||
fd: -1,
|
||||
offset: 0,
|
||||
ptr: ptr::null_mut(),
|
||||
len: 0,
|
||||
write: false,
|
||||
cancel,
|
||||
result: None,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub fn config(&mut self) -> &mut ReadWriteWorkConfig {
|
||||
self.config.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadWriteWorkConfig {
|
||||
pub fd: c::c_int,
|
||||
pub offset: usize,
|
||||
pub ptr: *mut u8,
|
||||
pub len: usize,
|
||||
pub write: bool,
|
||||
pub result: Option<Result<(), ReadWriteJobError>>,
|
||||
cancel: Arc<CancelState>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct CancelState {
|
||||
cancelled: AtomicBool,
|
||||
cancel_id: AtomicU64,
|
||||
}
|
||||
|
||||
impl CpuWork for ReadWriteWork {
|
||||
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>> {
|
||||
self.cancel.cancelled.store(false, Relaxed);
|
||||
self.cancel.cancel_id.store(0, Relaxed);
|
||||
self.config.take().map(|b| b as _)
|
||||
}
|
||||
|
||||
fn cancel_async(&mut self, ring: &Rc<IoUring>) {
|
||||
self.cancel.cancelled.store(true, Relaxed);
|
||||
let id = self.cancel.cancel_id.load(Relaxed);
|
||||
if id != 0 {
|
||||
ring.cancel(IoUringTaskId::from_raw(id));
|
||||
}
|
||||
}
|
||||
|
||||
fn async_work_done(&mut self, work: Box<dyn AsyncCpuWork>) {
|
||||
let work = work.into_any().downcast().unwrap();
|
||||
self.config = Some(work);
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncCpuWork for ReadWriteWorkConfig {
|
||||
fn run(
|
||||
mut self: Box<Self>,
|
||||
eng: &Rc<AsyncEngine>,
|
||||
ring: &Rc<IoUring>,
|
||||
completion: WorkCompletion,
|
||||
) -> SpawnedFuture<CompletedWork> {
|
||||
let ring = ring.clone();
|
||||
eng.spawn(async move {
|
||||
let res = loop {
|
||||
if self.cancel.cancelled.load(Relaxed) {
|
||||
break Err(ReadWriteJobError::Cancelled);
|
||||
}
|
||||
if self.len == 0 {
|
||||
break Ok(());
|
||||
};
|
||||
let res = if self.write {
|
||||
ring.write_no_cancel(
|
||||
Fd::new(self.fd),
|
||||
self.offset,
|
||||
unsafe { slice::from_raw_parts(self.ptr, self.len) },
|
||||
None,
|
||||
|id| self.cancel.cancel_id.store(id.raw(), Relaxed),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
ring.read_no_cancel(
|
||||
Fd::new(self.fd),
|
||||
self.offset,
|
||||
unsafe { slice::from_raw_parts_mut(self.ptr, self.len) },
|
||||
|id| self.cancel.cancel_id.store(id.raw(), Relaxed),
|
||||
)
|
||||
.await
|
||||
};
|
||||
match res {
|
||||
Ok(0) => break Err(ReadWriteJobError::OutOfBounds),
|
||||
Ok(n) => {
|
||||
self.len -= n;
|
||||
self.offset += n;
|
||||
unsafe {
|
||||
self.ptr = self.ptr.add(n);
|
||||
}
|
||||
}
|
||||
Err(e) => break Err(ReadWriteJobError::IoUring(e)),
|
||||
}
|
||||
};
|
||||
self.result = Some(res);
|
||||
completion.complete(self)
|
||||
})
|
||||
}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
||||
117
src/cpu_worker/tests.rs
Normal file
117
src/cpu_worker/tests.rs
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
use {
|
||||
crate::{
|
||||
async_engine::{AsyncEngine, SpawnedFuture},
|
||||
cpu_worker::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion},
|
||||
io_uring::IoUring,
|
||||
utils::asyncevent::AsyncEvent,
|
||||
wheel::Wheel,
|
||||
},
|
||||
std::{any::Any, future::pending, rc::Rc, sync::Arc},
|
||||
uapi::{c::EFD_CLOEXEC, OwnedFd},
|
||||
};
|
||||
|
||||
struct Job {
|
||||
ae: Rc<AsyncEvent>,
|
||||
work: Work,
|
||||
cancel: bool,
|
||||
}
|
||||
struct Work(Arc<OwnedFd>);
|
||||
struct AsyncWork(Arc<OwnedFd>);
|
||||
|
||||
impl CpuJob for Job {
|
||||
fn work(&mut self) -> &mut dyn CpuWork {
|
||||
&mut self.work
|
||||
}
|
||||
|
||||
fn completed(self: Box<Self>) {
|
||||
if self.cancel {
|
||||
unreachable!();
|
||||
} else {
|
||||
self.ae.trigger();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Job {
|
||||
fn drop(&mut self) {
|
||||
if self.cancel {
|
||||
self.ae.trigger();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CpuWork for Work {
|
||||
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>> {
|
||||
Some(Box::new(AsyncWork(self.0.clone())))
|
||||
}
|
||||
|
||||
fn cancel_async(&mut self, _ring: &Rc<IoUring>) {
|
||||
uapi::eventfd_write(self.0.raw(), 1).unwrap();
|
||||
}
|
||||
|
||||
fn async_work_done(&mut self, work: Box<dyn AsyncCpuWork>) {
|
||||
let _ = work;
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncCpuWork for AsyncWork {
|
||||
fn run(
|
||||
self: Box<Self>,
|
||||
eng: &Rc<AsyncEngine>,
|
||||
ring: &Rc<IoUring>,
|
||||
completion: WorkCompletion,
|
||||
) -> SpawnedFuture<CompletedWork> {
|
||||
let ring = ring.clone();
|
||||
eng.spawn(async move {
|
||||
let mut buf = [0; 8];
|
||||
let res = ring
|
||||
.read_no_cancel(self.0.borrow(), 0, &mut buf, |_| ())
|
||||
.await;
|
||||
res.unwrap();
|
||||
completion.complete(self)
|
||||
})
|
||||
}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
fn run(cancel: bool) {
|
||||
let eng = AsyncEngine::new();
|
||||
let ring = IoUring::new(&eng, 32).unwrap();
|
||||
let ring2 = ring.clone();
|
||||
let wheel = Wheel::new(&eng, &ring).unwrap();
|
||||
let cpu = Rc::new(CpuWorker::new(&ring, &eng).unwrap());
|
||||
let ae = Rc::new(AsyncEvent::default());
|
||||
let eventfd = Arc::new(uapi::eventfd(0, EFD_CLOEXEC).unwrap());
|
||||
let pending_job = cpu.submit(Box::new(Job {
|
||||
ae: ae.clone(),
|
||||
work: Work(eventfd.clone()),
|
||||
cancel,
|
||||
}));
|
||||
let _fut1 = eng.spawn(async move {
|
||||
wheel.timeout(1).await.unwrap();
|
||||
if cancel {
|
||||
drop(pending_job);
|
||||
} else {
|
||||
uapi::eventfd_write(eventfd.raw(), 1).unwrap();
|
||||
pending::<()>().await;
|
||||
}
|
||||
});
|
||||
let _fut2 = eng.spawn(async move {
|
||||
ae.triggered().await;
|
||||
ring2.stop();
|
||||
});
|
||||
ring.run().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel() {
|
||||
run(true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn complete() {
|
||||
run(false);
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue