From a9aad0c6139303c41d5131c3ab2b235f02b114de Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Sat, 28 Sep 2024 22:26:01 +0200 Subject: [PATCH] it: wait for async engine and cpu worker to become idle --- src/async_engine.rs | 30 +++++++++++++++++++++++++- src/cpu_worker.rs | 17 +++++++++++++++ src/it/test_client.rs | 3 +-- src/it/test_ifs/test_jay_compositor.rs | 2 ++ src/state.rs | 10 +++++++++ 5 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/async_engine.rs b/src/async_engine.rs index ca118545..e936dd46 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -35,6 +35,8 @@ pub struct AsyncEngine { yield_stash: RefCell>, stopped: Cell, now: Cell>, + #[cfg(feature = "it")] + idle: Cell>, } impl AsyncEngine { @@ -48,6 +50,8 @@ impl AsyncEngine { yield_stash: Default::default(), stopped: Cell::new(false), now: Default::default(), + #[cfg(feature = "it")] + idle: Default::default(), }) } @@ -91,7 +95,15 @@ impl AsyncEngine { pub fn dispatch(&self) { let mut stash = self.stash.borrow_mut(); let mut yield_stash = self.yield_stash.borrow_mut(); - while self.num_queued.get() > 0 { + loop { + if self.num_queued.get() == 0 { + #[cfg(feature = "it")] + if let Some(idle) = self.idle.take() { + idle.wake(); + continue; + } + break; + } self.now.take(); self.iteration.fetch_add(1); let mut phase = 0; @@ -116,6 +128,22 @@ impl AsyncEngine { } } + #[cfg(feature = "it")] + pub async fn idle(&self) { + use std::{future::poll_fn, task::Poll}; + let mut register = true; + poll_fn(|ctx| { + if register { + self.idle.set(Some(ctx.waker().clone())); + register = false; + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await + } + fn push(&self, runnable: Runnable, phase: Phase) { self.queues[phase as usize].push(runnable); self.num_queued.fetch_add(1); diff --git a/src/cpu_worker.rs b/src/cpu_worker.rs index d693f534..fb88dfb4 100644 --- a/src/cpu_worker.rs +++ b/src/cpu_worker.rs @@ -321,6 +321,23 @@ impl CpuWorker { job_data, } } + + #[cfg(feature = "it")] + pub fn wait_idle(&self) -> bool { + let was_idle = self.data.pending_jobs.is_empty(); + loop { + self.data.dispatch_completions(); + if self.data.pending_jobs.is_empty() { + break; + } + let mut remote = self.data.completed_jobs_remote.lock(); + while remote.queue.is_empty() { + remote.condvar = Some(self.data.sync_wake_condvar.clone()); + self.data.sync_wake_condvar.wait(&mut remote); + } + } + was_idle + } } fn work( diff --git a/src/it/test_client.rs b/src/it/test_client.rs index 83db6ace..79f846d1 100644 --- a/src/it/test_client.rs +++ b/src/it/test_client.rs @@ -85,10 +85,9 @@ impl TestClient { } pub async fn sync(&self) { - self.run.state.eng.yield_now().await; self.run.sync().await; self.tran.sync().await; - self.run.state.eng.yield_now().await; + self.run.state.idle().await; } pub async fn take_screenshot(&self, include_cursor: bool) -> Result, TestError> { diff --git a/src/it/test_ifs/test_jay_compositor.rs b/src/it/test_ifs/test_jay_compositor.rs index 71298877..9e57540b 100644 --- a/src/it/test_ifs/test_jay_compositor.rs +++ b/src/it/test_ifs/test_jay_compositor.rs @@ -54,6 +54,8 @@ impl TestJayCompositor { &self, include_cursor: bool, ) -> Result<(DmaBuf, Option>), TestError> { + self.tran.sync().await; + self.tran.run.state.idle().await; let js = Rc::new(TestJayScreenshot { id: self.tran.id(), state: self.tran.run.state.clone(), diff --git a/src/state.rs b/src/state.rs index 45c30708..a4b0feaf 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1218,6 +1218,16 @@ impl State { output.vblank(); } } + + #[cfg(feature = "it")] + pub async fn idle(&self) { + loop { + self.eng.idle().await; + if self.cpu_worker.wait_idle() { + break; + } + } + } } #[derive(Debug, Error)]