From 22b7fb2ced450b4f4982a0badad31572910a1ffa Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Fri, 13 May 2022 19:26:55 +0200 Subject: [PATCH] tests: fix ol' forker dumping core in tests --- src/async_engine.rs | 17 ++++++++++++++++- src/forker.rs | 15 ++++++++++++--- src/io_uring.rs | 1 + src/io_uring/ops/async_cancel.rs | 5 ++++- src/it.rs | 2 +- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/async_engine.rs b/src/async_engine.rs index f8faf271..dfdbfb95 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -7,7 +7,13 @@ use { async_engine::ae_task::Runnable, utils::{array, numcell::NumCell, syncqueue::SyncQueue}, }, - std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker}, + std::{ + cell::{Cell, RefCell}, + collections::VecDeque, + future::Future, + rc::Rc, + task::Waker, + }, }; #[derive(Copy, Clone, Eq, PartialEq)] @@ -26,6 +32,7 @@ pub struct AsyncEngine { yields: SyncQueue, stash: RefCell>, yield_stash: RefCell>, + stopped: Cell, } impl AsyncEngine { @@ -37,9 +44,14 @@ impl AsyncEngine { yields: Default::default(), stash: Default::default(), yield_stash: Default::default(), + stopped: Cell::new(false), }) } + pub fn stop(&self) { + self.stopped.set(true); + } + pub fn clear(&self) { self.stash.borrow_mut().clear(); self.yield_stash.borrow_mut().clear(); @@ -83,6 +95,9 @@ impl AsyncEngine { self.num_queued.fetch_sub(stash.len()); for runnable in stash.drain(..) { runnable.run(); + if self.stopped.get() { + return; + } } } self.yields.swap(&mut *yield_stash); diff --git a/src/forker.rs b/src/forker.rs index cb3cb4a9..b97545d9 100644 --- a/src/forker.rs +++ b/src/forker.rs @@ -338,7 +338,7 @@ impl Forker { let _f1 = ae.spawn(forker.clone().incoming()); let _f2 = ae.spawn(forker.clone().outgoing()); let _ = ring.run(); - unreachable!(); + std::process::exit(1); } async fn outgoing(self: Rc) { @@ -348,14 +348,23 @@ impl Forker { for fd in self.fds.borrow_mut().drain(..) { io.push_fd(fd); } - io.write_msg(msg).await.unwrap(); + if io.write_msg(msg).await.is_err() { + self.ring.stop(); + return; + } } } async fn incoming(self: Rc) { let mut io = IoIn::new(&self.socket, &self.ring); loop { - let msg = io.read_msg().await.unwrap(); + let msg = match io.read_msg().await { + Ok(m) => m, + _ => { + self.ring.stop(); + return; + } + }; self.handle_msg(msg, &mut io); } } diff --git a/src/io_uring.rs b/src/io_uring.rs index 595211f6..bdae93f3 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -409,6 +409,7 @@ impl IoUringData { } fn kill(&self) { + self.eng.stop(); let mut to_cancel = vec![]; for task in self.tasks.lock().values() { if !task.is_cancel() { diff --git a/src/io_uring/ops/async_cancel.rs b/src/io_uring/ops/async_cancel.rs index 1fb514b3..4d26dca7 100644 --- a/src/io_uring/ops/async_cancel.rs +++ b/src/io_uring/ops/async_cancel.rs @@ -7,6 +7,7 @@ use { utils::errorfmt::ErrorFmt, }, std::cell::Cell, + uapi::c, }; pub struct AsyncCancelTask { @@ -36,7 +37,9 @@ unsafe impl Task for AsyncCancelTask { fn complete(self: Box, ring: &IoUringData, res: i32) { if let Err(e) = map_err!(res) { - log::debug!("Could not cancel task: {}", ErrorFmt(e)); + if e.0 != c::ENOENT { + log::debug!("Could not cancel task: {}", ErrorFmt(e)); + } } ring.cached_cancels.push(self); } diff --git a/src/it.rs b/src/it.rs index b0e14840..c727523a 100644 --- a/src/it.rs +++ b/src/it.rs @@ -152,7 +152,7 @@ fn run_test(it_run: &ItRun, test: &'static dyn TestCase, cfg: Rc) { } } errors.set(testrun.errors.take()); - state.el.stop(); + state.ring.stop(); pending().await }) }));