cpu_worker: fix blocking wait for completions
io_uring reads of eventfds must not be mixed with blocking reads since io_uring might be reading in a thread, stealing events from the blocking read.
This commit is contained in:
parent
f004afdebe
commit
58f82d889b
1 changed files with 31 additions and 12 deletions
|
|
@ -11,7 +11,7 @@ use {
|
||||||
ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack,
|
ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
parking_lot::Mutex,
|
parking_lot::{Condvar, Mutex},
|
||||||
std::{
|
std::{
|
||||||
any::Any,
|
any::Any,
|
||||||
cell::{Cell, RefCell},
|
cell::{Cell, RefCell},
|
||||||
|
|
@ -113,18 +113,25 @@ enum Job {
|
||||||
|
|
||||||
unsafe impl Send for Job {}
|
unsafe impl Send for Job {}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct CompletedJobsExchange {
|
||||||
|
queue: VecDeque<CpuJobId>,
|
||||||
|
condvar: Option<Arc<Condvar>>,
|
||||||
|
}
|
||||||
|
|
||||||
struct CpuWorkerData {
|
struct CpuWorkerData {
|
||||||
next: CpuJobIds,
|
next: CpuJobIds,
|
||||||
jobs_to_enqueue: AsyncQueue<Job>,
|
jobs_to_enqueue: AsyncQueue<Job>,
|
||||||
new_jobs: Arc<Mutex<VecDeque<Job>>>,
|
new_jobs: Arc<Mutex<VecDeque<Job>>>,
|
||||||
have_new_jobs: Rc<OwnedFd>,
|
have_new_jobs: Rc<OwnedFd>,
|
||||||
completed_jobs_remote: Arc<Mutex<VecDeque<CpuJobId>>>,
|
completed_jobs_remote: Arc<Mutex<CompletedJobsExchange>>,
|
||||||
completed_jobs_local: RefCell<VecDeque<CpuJobId>>,
|
completed_jobs_local: RefCell<VecDeque<CpuJobId>>,
|
||||||
have_completed_jobs: Rc<OwnedFd>,
|
have_completed_jobs: Rc<OwnedFd>,
|
||||||
pending_jobs: CopyHashMap<CpuJobId, Rc<PendingJobData>>,
|
pending_jobs: CopyHashMap<CpuJobId, Rc<PendingJobData>>,
|
||||||
ring: Rc<IoUring>,
|
ring: Rc<IoUring>,
|
||||||
_stop: OwnedFd,
|
_stop: OwnedFd,
|
||||||
pending_job_data_cache: Stack<Rc<PendingJobData>>,
|
pending_job_data_cache: Stack<Rc<PendingJobData>>,
|
||||||
|
sync_wake_condvar: Arc<Condvar>,
|
||||||
}
|
}
|
||||||
|
|
||||||
linear_ids!(CpuJobIds, CpuJobId, u64);
|
linear_ids!(CpuJobIds, CpuJobId, u64);
|
||||||
|
|
@ -172,12 +179,16 @@ impl Drop for PendingJob {
|
||||||
self.job_data.state.set(PendingJobState::Abandoned);
|
self.job_data.state.set(PendingJobState::Abandoned);
|
||||||
data.jobs_to_enqueue.push(Job::Cancel { id });
|
data.jobs_to_enqueue.push(Job::Cancel { id });
|
||||||
data.do_equeue_jobs();
|
data.do_equeue_jobs();
|
||||||
let mut buf = 0u64;
|
loop {
|
||||||
while data.pending_jobs.contains(&id) {
|
|
||||||
if let Err(e) = uapi::read(data.have_completed_jobs.raw(), &mut buf) {
|
|
||||||
panic!("Could not wait for job completions: {}", ErrorFmt(e));
|
|
||||||
}
|
|
||||||
data.dispatch_completions();
|
data.dispatch_completions();
|
||||||
|
if !data.pending_jobs.contains(&id) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut remote = data.completed_jobs_remote.lock();
|
||||||
|
while remote.queue.is_empty() {
|
||||||
|
remote.condvar = Some(data.sync_wake_condvar.clone());
|
||||||
|
data.sync_wake_condvar.wait(&mut remote);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PendingJobState::Abandoned => {}
|
PendingJobState::Abandoned => {}
|
||||||
|
|
@ -204,7 +215,7 @@ impl CpuWorkerData {
|
||||||
|
|
||||||
fn dispatch_completions(&self) {
|
fn dispatch_completions(&self) {
|
||||||
let completions = &mut *self.completed_jobs_local.borrow_mut();
|
let completions = &mut *self.completed_jobs_local.borrow_mut();
|
||||||
mem::swap(completions, &mut *self.completed_jobs_remote.lock());
|
mem::swap(completions, &mut self.completed_jobs_remote.lock().queue);
|
||||||
while let Some(id) = completions.pop_front() {
|
while let Some(id) = completions.pop_front() {
|
||||||
let job_data = self.pending_jobs.remove(&id).unwrap();
|
let job_data = self.pending_jobs.remove(&id).unwrap();
|
||||||
let job = job_data.job.take().unwrap();
|
let job = job_data.job.take().unwrap();
|
||||||
|
|
@ -242,7 +253,7 @@ impl CpuWorkerData {
|
||||||
impl CpuWorker {
|
impl CpuWorker {
|
||||||
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> {
|
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> {
|
||||||
let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default();
|
let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default();
|
||||||
let completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>> = Default::default();
|
let completed_jobs: Arc<Mutex<CompletedJobsExchange>> = Default::default();
|
||||||
let (stop_read, stop_write) =
|
let (stop_read, stop_write) =
|
||||||
uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?;
|
uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?;
|
||||||
let have_new_jobs =
|
let have_new_jobs =
|
||||||
|
|
@ -281,6 +292,7 @@ impl CpuWorker {
|
||||||
ring: ring.clone(),
|
ring: ring.clone(),
|
||||||
_stop: stop_read,
|
_stop: stop_read,
|
||||||
pending_job_data_cache: Default::default(),
|
pending_job_data_cache: Default::default(),
|
||||||
|
sync_wake_condvar: Arc::new(Condvar::new()),
|
||||||
});
|
});
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
_completions_listener: eng.spawn(
|
_completions_listener: eng.spawn(
|
||||||
|
|
@ -313,7 +325,7 @@ impl CpuWorker {
|
||||||
|
|
||||||
fn work(
|
fn work(
|
||||||
new_jobs: Arc<Mutex<VecDeque<Job>>>,
|
new_jobs: Arc<Mutex<VecDeque<Job>>>,
|
||||||
completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>,
|
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
|
||||||
stop: OwnedFd,
|
stop: OwnedFd,
|
||||||
have_new_jobs: OwnedFd,
|
have_new_jobs: OwnedFd,
|
||||||
have_completed_jobs: OwnedFd,
|
have_completed_jobs: OwnedFd,
|
||||||
|
|
@ -343,7 +355,7 @@ fn work(
|
||||||
struct Worker {
|
struct Worker {
|
||||||
eng: Rc<AsyncEngine>,
|
eng: Rc<AsyncEngine>,
|
||||||
ring: Rc<IoUring>,
|
ring: Rc<IoUring>,
|
||||||
completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>,
|
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
|
||||||
have_completed_jobs: OwnedFd,
|
have_completed_jobs: OwnedFd,
|
||||||
async_jobs: CopyHashMap<CpuJobId, AsyncJob>,
|
async_jobs: CopyHashMap<CpuJobId, AsyncJob>,
|
||||||
stopped: Cell<bool>,
|
stopped: Cell<bool>,
|
||||||
|
|
@ -428,7 +440,14 @@ impl Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_completion(&self, id: CpuJobId) {
|
fn send_completion(&self, id: CpuJobId) {
|
||||||
self.completed_jobs.lock().push_back(id);
|
let cv = {
|
||||||
|
let mut exchange = self.completed_jobs.lock();
|
||||||
|
exchange.queue.push_back(id);
|
||||||
|
exchange.condvar.take()
|
||||||
|
};
|
||||||
|
if let Some(cv) = cv {
|
||||||
|
cv.notify_all();
|
||||||
|
}
|
||||||
if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) {
|
if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) {
|
||||||
panic!("Could not signal job completion: {}", ErrorFmt(e));
|
panic!("Could not signal job completion: {}", ErrorFmt(e));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue