diff --git a/src/cpu_worker.rs b/src/cpu_worker.rs index a17888b7..d693f534 100644 --- a/src/cpu_worker.rs +++ b/src/cpu_worker.rs @@ -11,7 +11,7 @@ use { ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack, }, }, - parking_lot::Mutex, + parking_lot::{Condvar, Mutex}, std::{ any::Any, cell::{Cell, RefCell}, @@ -113,18 +113,25 @@ enum Job { unsafe impl Send for Job {} +#[derive(Default)] +struct CompletedJobsExchange { + queue: VecDeque, + condvar: Option>, +} + struct CpuWorkerData { next: CpuJobIds, jobs_to_enqueue: AsyncQueue, new_jobs: Arc>>, have_new_jobs: Rc, - completed_jobs_remote: Arc>>, + completed_jobs_remote: Arc>, completed_jobs_local: RefCell>, have_completed_jobs: Rc, pending_jobs: CopyHashMap>, ring: Rc, _stop: OwnedFd, pending_job_data_cache: Stack>, + sync_wake_condvar: Arc, } linear_ids!(CpuJobIds, CpuJobId, u64); @@ -172,12 +179,16 @@ impl Drop for PendingJob { self.job_data.state.set(PendingJobState::Abandoned); data.jobs_to_enqueue.push(Job::Cancel { id }); data.do_equeue_jobs(); - let mut buf = 0u64; - while data.pending_jobs.contains(&id) { - if let Err(e) = uapi::read(data.have_completed_jobs.raw(), &mut buf) { - panic!("Could not wait for job completions: {}", ErrorFmt(e)); - } + loop { data.dispatch_completions(); + if !data.pending_jobs.contains(&id) { + break; + } + let mut remote = data.completed_jobs_remote.lock(); + while remote.queue.is_empty() { + remote.condvar = Some(data.sync_wake_condvar.clone()); + data.sync_wake_condvar.wait(&mut remote); + } } } PendingJobState::Abandoned => {} @@ -204,7 +215,7 @@ impl CpuWorkerData { fn dispatch_completions(&self) { let completions = &mut *self.completed_jobs_local.borrow_mut(); - mem::swap(completions, &mut *self.completed_jobs_remote.lock()); + mem::swap(completions, &mut self.completed_jobs_remote.lock().queue); while let Some(id) = completions.pop_front() { let job_data = self.pending_jobs.remove(&id).unwrap(); let job = job_data.job.take().unwrap(); @@ -242,7 +253,7 @@ impl CpuWorkerData { impl CpuWorker { pub fn new(ring: &Rc, eng: &Rc) -> Result { let new_jobs: Arc>> = Default::default(); - let completed_jobs: Arc>> = Default::default(); + let completed_jobs: Arc> = Default::default(); let (stop_read, stop_write) = uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?; let have_new_jobs = @@ -281,6 +292,7 @@ impl CpuWorker { ring: ring.clone(), _stop: stop_read, pending_job_data_cache: Default::default(), + sync_wake_condvar: Arc::new(Condvar::new()), }); Ok(Self { _completions_listener: eng.spawn( @@ -313,7 +325,7 @@ impl CpuWorker { fn work( new_jobs: Arc>>, - completed_jobs: Arc>>, + completed_jobs: Arc>, stop: OwnedFd, have_new_jobs: OwnedFd, have_completed_jobs: OwnedFd, @@ -343,7 +355,7 @@ fn work( struct Worker { eng: Rc, ring: Rc, - completed_jobs: Arc>>, + completed_jobs: Arc>, have_completed_jobs: OwnedFd, async_jobs: CopyHashMap, stopped: Cell, @@ -428,7 +440,14 @@ impl Worker { } fn send_completion(&self, id: CpuJobId) { - self.completed_jobs.lock().push_back(id); + let cv = { + let mut exchange = self.completed_jobs.lock(); + exchange.queue.push_back(id); + exchange.condvar.take() + }; + if let Some(cv) = cv { + cv.notify_all(); + } if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) { panic!("Could not signal job completion: {}", ErrorFmt(e)); }