use { crate::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion}, jay_async_engine::{AsyncEngine, SpawnedFuture}, jay_io_uring::IoUring, jay_utils::asyncevent::AsyncEvent, jay_wheel::Wheel, std::{future::pending, rc::Rc, sync::Arc}, uapi::{OwnedFd, c::EFD_CLOEXEC}, }; struct Job { ae: Rc, work: Work, cancel: bool, } struct Work(Arc); struct AsyncWork(Arc); impl CpuJob for Job { fn work(&mut self) -> &mut dyn CpuWork { &mut self.work } fn completed(self: Box) { if self.cancel { unreachable!(); } else { self.ae.trigger(); } } } impl Drop for Job { fn drop(&mut self) { if self.cancel { self.ae.trigger(); } } } impl CpuWork for Work { fn run(&mut self) -> Option> { Some(Box::new(AsyncWork(self.0.clone()))) } fn cancel_async(&mut self, _ring: &Rc) { uapi::eventfd_write(self.0.raw(), 1).unwrap(); } fn async_work_done(&mut self, work: Box) { let _ = work; } } impl AsyncCpuWork for AsyncWork { fn run( self: Box, eng: &Rc, ring: &Rc, completion: WorkCompletion, ) -> SpawnedFuture { let ring = ring.clone(); eng.spawn("", async move { let mut buf = [0; 8]; let res = ring .read_no_cancel(self.0.borrow(), 0, &mut buf, |_| ()) .await; res.unwrap(); completion.complete(self) }) } } fn run(cancel: bool) { let eng = AsyncEngine::new(); let ring = IoUring::new(&eng, 32).unwrap(); let ring2 = ring.clone(); let wheel = Wheel::new(&eng, &ring).unwrap(); let cpu = Rc::new(CpuWorker::new(&ring, &eng).unwrap()); let ae = Rc::new(AsyncEvent::default()); let eventfd = Arc::new(uapi::eventfd(0, EFD_CLOEXEC).unwrap()); let pending_job = cpu.submit(Box::new(Job { ae: ae.clone(), work: Work(eventfd.clone()), cancel, })); let _fut1 = eng.spawn("", async move { wheel.timeout(1).await.unwrap(); if cancel { drop(pending_job); } else { uapi::eventfd_write(eventfd.raw(), 1).unwrap(); pending::<()>().await; } }); let _fut2 = eng.spawn("", async move { ae.triggered().await; ring2.stop(); }); ring.run().unwrap(); } #[test] fn cancel() { run(true); } #[test] fn complete() { run(false); }