diff --git a/Cargo.lock b/Cargo.lock index 15a9d18b..59b70408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,6 +695,7 @@ dependencies = [ "jay-bufio", "jay-cmm", "jay-config", + "jay-cpu-worker", "jay-criteria", "jay-dbus-core", "jay-edid", @@ -759,6 +760,22 @@ dependencies = [ name = "jay-config-schema" version = "0.1.0" +[[package]] +name = "jay-cpu-worker" +version = "0.1.0" +dependencies = [ + "jay-async-engine", + "jay-geometry", + "jay-io-uring", + "jay-tracy", + "jay-utils", + "jay-wheel", + "log", + "parking_lot", + "thiserror", + "uapi", +] + [[package]] name = "jay-criteria" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 711e9cd7..0bf04b70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "tree-types", "eventfd-cache", "wheel", + "cpu-worker", "toml-config", "algorithms", "toml-spec", @@ -75,6 +76,7 @@ jay-wire-buf = { version = "0.1.0", path = "wire-buf" } jay-tree-types = { version = "0.1.0", path = "tree-types" } jay-eventfd-cache = { version = "0.1.0", path = "eventfd-cache" } jay-wheel = { version = "0.1.0", path = "wheel" } +jay-cpu-worker = { version = "0.1.0", path = "cpu-worker" } uapi = "0.2.13" thiserror = "2.0.11" @@ -137,5 +139,5 @@ opt-level = 3 [features] rc_tracking = [] -it = ["jay-async-engine/it"] -tracy = ["jay-tracy/tracy", "jay-async-engine/tracy"] +it = ["jay-async-engine/it", "jay-cpu-worker/it"] +tracy = ["jay-tracy/tracy", "jay-async-engine/tracy", "jay-cpu-worker/tracy"] diff --git a/cpu-worker/Cargo.toml b/cpu-worker/Cargo.toml new file mode 100644 index 00000000..7ba8dbf5 --- /dev/null +++ b/cpu-worker/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "jay-cpu-worker" +version = "0.1.0" +edition = "2024" +license = "GPL-3.0-only" + +[dependencies] +jay-async-engine = { version = "0.1.0", path = "../async-engine" } +jay-geometry = { version = "0.1.0", path = "../geometry" } +jay-io-uring = { version = "0.1.0", path = "../io-uring" } +jay-tracy = { version = "0.1.0", path = "../tracy" } +jay-utils = { version = "0.1.0", path = "../utils" } + +log = { version = "0.4.20", features = ["std"] } +parking_lot = "0.12.1" +thiserror = "2.0.11" +uapi = "0.2.13" + +[dev-dependencies] +jay-wheel = { version = "0.1.0", path = "../wheel" } + +[features] +it = [] +tracy = ["jay-tracy/tracy", "jay-async-engine/tracy"] diff --git a/src/cpu_worker/jobs.rs b/cpu-worker/src/jobs.rs similarity index 100% rename from src/cpu_worker/jobs.rs rename to cpu-worker/src/jobs.rs diff --git a/src/cpu_worker/jobs/img_copy.rs b/cpu-worker/src/jobs/img_copy.rs similarity index 92% rename from src/cpu_worker/jobs/img_copy.rs rename to cpu-worker/src/jobs/img_copy.rs index f50a7b25..8162cf27 100644 --- a/src/cpu_worker/jobs/img_copy.rs +++ b/cpu-worker/src/jobs/img_copy.rs @@ -1,8 +1,6 @@ use { - crate::{ - cpu_worker::{AsyncCpuWork, CpuWork}, - rect::Rect, - }, + crate::{AsyncCpuWork, CpuWork}, + jay_geometry::Rect, std::ptr, }; @@ -34,7 +32,7 @@ impl ImgCopyWork { impl CpuWork for ImgCopyWork { fn run(&mut self) -> Option> { - zone!("ImgCopyWork"); + jay_tracy::zone!("ImgCopyWork"); for rect in &self.rects { let mut offset = rect.y1() * self.stride + rect.x1() * self.bpp; if rect.width() == self.width { diff --git a/src/cpu_worker/jobs/read_write.rs b/cpu-worker/src/jobs/read_write.rs similarity index 95% rename from src/cpu_worker/jobs/read_write.rs rename to cpu-worker/src/jobs/read_write.rs index 6ebcfba2..79d8cb29 100644 --- a/src/cpu_worker/jobs/read_write.rs +++ b/cpu-worker/src/jobs/read_write.rs @@ -1,9 +1,7 @@ use { - crate::{ - async_engine::{AsyncEngine, SpawnedFuture}, - cpu_worker::{AsyncCpuWork, CompletedWork, CpuWork, WorkCompletion}, - io_uring::{IoUring, IoUringError, IoUringTaskId}, - }, + crate::{AsyncCpuWork, CompletedWork, CpuWork, WorkCompletion}, + jay_async_engine::{AsyncEngine, SpawnedFuture}, + jay_io_uring::{IoUring, IoUringError, IoUringTaskId}, std::{ any::Any, ptr, diff --git a/cpu-worker/src/lib.rs b/cpu-worker/src/lib.rs new file mode 100644 index 00000000..51045df1 --- /dev/null +++ b/cpu-worker/src/lib.rs @@ -0,0 +1,509 @@ +pub mod jobs; +#[cfg(test)] +mod tests; + +use { + jay_async_engine::{AsyncEngine, SpawnedFuture}, + jay_io_uring::IoUring, + jay_utils::{ + buf::TypedBuf, + copyhashmap::CopyHashMap, + errorfmt::ErrorFmt, + numcell::NumCell, + oserror::{OsError, OsErrorExt2}, + pipe::{Pipe, pipe}, + ptr_ext::MutPtrExt, + queue::AsyncQueue, + stack::Stack, + }, + parking_lot::{Condvar, Mutex}, + std::{ + any::Any, + cell::{Cell, RefCell}, + collections::VecDeque, + mem, + ptr::NonNull, + rc::Rc, + sync::Arc, + thread, + }, + thiserror::Error, + uapi::{OwnedFd, c}, +}; + +pub trait CpuJob { + fn work(&mut self) -> &mut dyn CpuWork; + fn completed(self: Box); +} + +pub trait CpuWork: Send { + fn run(&mut self) -> Option>; + + fn cancel_async(&mut self, ring: &Rc) { + let _ = ring; + unreachable!(); + } + + fn async_work_done(&mut self, work: Box) { + let _ = work; + unreachable!(); + } +} + +pub trait AsyncCpuWork: Any { + fn run( + self: Box, + eng: &Rc, + ring: &Rc, + completion: WorkCompletion, + ) -> SpawnedFuture; +} + +pub struct WorkCompletion { + worker: Rc, + id: CpuJobId, +} + +pub struct CompletedWork(()); + +impl WorkCompletion { + pub fn complete(self, work: Box) -> CompletedWork { + let job = self.worker.async_jobs.remove(&self.id).unwrap(); + unsafe { + job.work.deref_mut().async_work_done(work); + } + self.worker.send_completion(self.id); + CompletedWork(()) + } +} + +pub struct CpuWorker { + data: Rc, + _completions_listener: SpawnedFuture<()>, + _job_enqueuer: SpawnedFuture<()>, +} + +#[must_use] +pub struct PendingJob { + id: CpuJobId, + thread_data: Rc, + job_data: Rc, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] +enum PendingJobState { + #[default] + Waiting, + Abandoned, + Completed, +} + +#[derive(Default)] +struct PendingJobData { + job: Cell>>, + state: Cell, +} + +enum Job { + New { + id: CpuJobId, + work: *mut dyn CpuWork, + }, + Cancel { + id: CpuJobId, + }, +} + +unsafe impl Send for Job {} + +#[derive(Default)] +struct CompletedJobsExchange { + queue: VecDeque, + condvar: Option>, +} + +struct CpuWorkerData { + next: CpuJobIds, + jobs_to_enqueue: AsyncQueue, + new_jobs: Arc>>, + have_new_jobs: Rc, + completed_jobs_remote: Arc>, + completed_jobs_local: RefCell>, + have_completed_jobs: Rc, + pending_jobs: CopyHashMap>, + ring: Rc, + _stop: OwnedFd, + pending_job_data_cache: Stack>, + sync_wake_condvar: Arc, +} + +#[derive(Debug)] +struct CpuJobIds { + next: NumCell, +} + +impl Default for CpuJobIds { + fn default() -> Self { + Self { + next: NumCell::new(1), + } + } +} + +impl CpuJobIds { + fn next(&self) -> CpuJobId { + CpuJobId(self.next.fetch_add(1)) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] +struct CpuJobId(u64); + +#[derive(Debug, Error)] +pub enum CpuWorkerError { + #[error("Could not create a pipe")] + Pipe(#[source] OsError), + #[error("Could not create an eventfd")] + EventFd(#[source] OsError), + #[error("Could not dup an eventfd")] + Dup(#[source] OsError), +} + +impl PendingJob { + pub fn detach(self) { + match self.job_data.state.get() { + PendingJobState::Waiting => { + self.job_data.state.set(PendingJobState::Abandoned); + } + PendingJobState::Abandoned => { + unreachable!(); + } + PendingJobState::Completed => {} + } + } +} + +impl Drop for CpuWorker { + fn drop(&mut self) { + self.data.do_equeue_jobs(); + if self.data.pending_jobs.is_not_empty() { + log::warn!("CpuWorker dropped with pending jobs. Completed jobs will not be triggered.") + } + } +} + +impl Drop for PendingJob { + fn drop(&mut self) { + match self.job_data.state.get() { + PendingJobState::Waiting => { + log::warn!("PendingJob dropped before completion. Blocking."); + let data = &self.thread_data; + let id = self.id; + self.job_data.state.set(PendingJobState::Abandoned); + data.jobs_to_enqueue.push(Job::Cancel { id }); + data.do_equeue_jobs(); + loop { + data.dispatch_completions(); + if !data.pending_jobs.contains(&id) { + break; + } + let mut remote = data.completed_jobs_remote.lock(); + while remote.queue.is_empty() { + remote.condvar = Some(data.sync_wake_condvar.clone()); + data.sync_wake_condvar.wait(&mut remote); + } + } + } + PendingJobState::Abandoned => {} + PendingJobState::Completed => { + self.thread_data + .pending_job_data_cache + .push(self.job_data.clone()); + } + } + } +} + +impl CpuWorkerData { + fn clear(&self) { + self.jobs_to_enqueue.clear(); + self.new_jobs.lock().clear(); + self.completed_jobs_remote.lock().queue.clear(); + self.completed_jobs_local.borrow_mut().clear(); + self.pending_jobs.clear(); + self.pending_job_data_cache.take(); + } + + async fn wait_for_completions(self: Rc) { + let mut buf = TypedBuf::::new(); + loop { + if let Err(e) = self.ring.read(&self.have_completed_jobs, buf.buf()).await { + log::error!("Could not wait for job completions: {}", ErrorFmt(e)); + return; + } + self.dispatch_completions(); + } + } + + fn dispatch_completions(&self) { + let completions = &mut *self.completed_jobs_local.borrow_mut(); + mem::swap(completions, &mut self.completed_jobs_remote.lock().queue); + while let Some(id) = completions.pop_front() { + let job_data = self.pending_jobs.remove(&id).unwrap(); + let job = job_data.job.take().unwrap(); + let job = unsafe { Box::from_raw(job.as_ptr()) }; + match job_data.state.get() { + PendingJobState::Waiting => { + job_data.state.set(PendingJobState::Completed); + job.completed(); + } + PendingJobState::Abandoned => { + self.pending_job_data_cache.push(job_data); + } + PendingJobState::Completed => { + unreachable!(); + } + } + } + } + + async fn equeue_jobs(self: Rc) { + loop { + self.jobs_to_enqueue.non_empty().await; + self.do_equeue_jobs(); + } + } + + fn do_equeue_jobs(&self) { + self.jobs_to_enqueue.move_to(&mut self.new_jobs.lock()); + if let Err(e) = uapi::eventfd_write(self.have_new_jobs.raw(), 1) { + panic!("Could not signal eventfd: {}", ErrorFmt(e)); + } + } +} + +impl CpuWorker { + pub fn new(ring: &Rc, eng: &Rc) -> Result { + let new_jobs: Arc>> = Default::default(); + let completed_jobs: Arc> = Default::default(); + let Pipe { + read: stop_read, + write: stop_write, + } = pipe().map_err(CpuWorkerError::Pipe)?; + let have_new_jobs = uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?; + let have_completed_jobs = + uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?; + thread::Builder::new() + .name("cpu worker".to_string()) + .spawn({ + let new_jobs = new_jobs.clone(); + let completed_jobs = completed_jobs.clone(); + let have_new_jobs = uapi::fcntl_dupfd_cloexec(have_new_jobs.raw(), 0) + .map_os_err(CpuWorkerError::Dup)?; + let have_completed_jobs = uapi::fcntl_dupfd_cloexec(have_completed_jobs.raw(), 0) + .map_os_err(CpuWorkerError::Dup)?; + move || { + work( + new_jobs, + completed_jobs, + stop_write, + have_new_jobs, + have_completed_jobs, + ) + } + }) + .unwrap(); + let data = Rc::new(CpuWorkerData { + next: Default::default(), + jobs_to_enqueue: Default::default(), + new_jobs, + have_new_jobs: Rc::new(have_new_jobs), + completed_jobs_remote: completed_jobs, + completed_jobs_local: Default::default(), + have_completed_jobs: Rc::new(have_completed_jobs), + pending_jobs: Default::default(), + ring: ring.clone(), + _stop: stop_read, + pending_job_data_cache: Default::default(), + sync_wake_condvar: Arc::new(Condvar::new()), + }); + Ok(Self { + _completions_listener: eng.spawn( + "cpu worker completions", + data.clone().wait_for_completions(), + ), + _job_enqueuer: eng.spawn("cpu worker enqueue", data.clone().equeue_jobs()), + data, + }) + } + + pub fn clear(&self) { + self.data.clear(); + } + + pub fn submit(&self, job: Box) -> PendingJob { + let mut job = NonNull::from(Box::leak(job)); + let id = self.data.next.next(); + self.data.jobs_to_enqueue.push(Job::New { + id, + work: unsafe { job.as_mut().work() }, + }); + let job_data = self.data.pending_job_data_cache.pop().unwrap_or_default(); + job_data.job.set(Some(job)); + job_data.state.set(PendingJobState::Waiting); + self.data.pending_jobs.set(id, job_data.clone()); + PendingJob { + id, + thread_data: self.data.clone(), + job_data, + } + } + + #[cfg(feature = "it")] + pub fn wait_idle(&self) -> bool { + let was_idle = self.data.pending_jobs.is_empty(); + loop { + self.data.dispatch_completions(); + if self.data.pending_jobs.is_empty() { + break; + } + let mut remote = self.data.completed_jobs_remote.lock(); + while remote.queue.is_empty() { + remote.condvar = Some(self.data.sync_wake_condvar.clone()); + self.data.sync_wake_condvar.wait(&mut remote); + } + } + was_idle + } +} + +fn work( + new_jobs: Arc>>, + completed_jobs: Arc>, + stop: OwnedFd, + have_new_jobs: OwnedFd, + have_completed_jobs: OwnedFd, +) { + let eng = AsyncEngine::new(); + let ring = IoUring::new(&eng, 32).unwrap(); + let worker = Rc::new(Worker { + eng, + ring, + completed_jobs, + have_completed_jobs, + async_jobs: Default::default(), + stopped: Cell::new(false), + }); + let _stop_listener = worker + .eng + .spawn("stop listener", worker.clone().handle_stop(stop)); + let _new_job_listener = worker.eng.spawn( + "new job listener", + worker.clone().handle_new_jobs(new_jobs, have_new_jobs), + ); + if let Err(e) = worker.ring.run() { + panic!("io_uring failed: {}", ErrorFmt(e)); + } +} + +struct Worker { + eng: Rc, + ring: Rc, + completed_jobs: Arc>, + have_completed_jobs: OwnedFd, + async_jobs: CopyHashMap, + stopped: Cell, +} + +struct AsyncJob { + _future: SpawnedFuture, + work: *mut dyn CpuWork, +} + +impl Worker { + async fn handle_stop(self: Rc, stop: OwnedFd) { + let stop = Rc::new(stop); + if let Err(e) = self.ring.poll(&stop, 0).await { + log::error!( + "Could not wait for stop fd to become readable: {}", + ErrorFmt(e) + ); + } else { + assert!(self.async_jobs.is_empty()); + self.stopped.set(true); + self.ring.stop(); + } + } + + async fn handle_new_jobs( + self: Rc, + jobs_remote: Arc>>, + new_jobs: OwnedFd, + ) { + let mut buf = TypedBuf::::new(); + let new_jobs = Rc::new(new_jobs); + let mut jobs = VecDeque::new(); + loop { + if let Err(e) = self.ring.read(&new_jobs, buf.buf()).await { + if self.stopped.get() { + return; + } + panic!( + "Could not wait for new jobs fd to be signaled: {}", + ErrorFmt(e), + ); + } + mem::swap(&mut jobs, &mut *jobs_remote.lock()); + while let Some(job) = jobs.pop_front() { + self.handle_new_job(job); + } + } + } + + fn handle_new_job(self: &Rc, job: Job) { + match job { + Job::Cancel { id } => { + let mut jobs = self.async_jobs.lock(); + if let Some(job) = jobs.get_mut(&id) { + unsafe { + job.work.deref_mut().cancel_async(&self.ring); + } + } + } + Job::New { id, work } => match unsafe { work.deref_mut() }.run() { + None => { + self.send_completion(id); + return; + } + Some(w) => { + let completion = WorkCompletion { + worker: self.clone(), + id, + }; + let future = w.run(&self.eng, &self.ring, completion); + self.async_jobs.set( + id, + AsyncJob { + _future: future, + work, + }, + ); + } + }, + } + } + + fn send_completion(&self, id: CpuJobId) { + let cv = { + let mut exchange = self.completed_jobs.lock(); + exchange.queue.push_back(id); + exchange.condvar.take() + }; + if let Some(cv) = cv { + cv.notify_all(); + } + if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) { + panic!("Could not signal job completion: {}", ErrorFmt(e)); + } + } +} diff --git a/src/cpu_worker/tests.rs b/cpu-worker/src/tests.rs similarity index 90% rename from src/cpu_worker/tests.rs rename to cpu-worker/src/tests.rs index c77a8620..00382e03 100644 --- a/src/cpu_worker/tests.rs +++ b/cpu-worker/src/tests.rs @@ -1,11 +1,9 @@ use { - crate::{ - async_engine::{AsyncEngine, SpawnedFuture}, - cpu_worker::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion}, - io_uring::IoUring, - utils::asyncevent::AsyncEvent, - wheel::Wheel, - }, + crate::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion}, + jay_async_engine::{AsyncEngine, SpawnedFuture}, + jay_io_uring::IoUring, + jay_utils::asyncevent::AsyncEvent, + jay_wheel::Wheel, std::{future::pending, rc::Rc, sync::Arc}, uapi::{OwnedFd, c::EFD_CLOEXEC}, }; diff --git a/src/cpu_worker.rs b/src/cpu_worker.rs index ff223806..e28586b1 100644 --- a/src/cpu_worker.rs +++ b/src/cpu_worker.rs @@ -1,490 +1 @@ -pub mod jobs; -#[cfg(test)] -mod tests; - -use { - crate::{ - async_engine::{AsyncEngine, SpawnedFuture}, - io_uring::IoUring, - utils::{ - buf::TypedBuf, - copyhashmap::CopyHashMap, - errorfmt::ErrorFmt, - oserror::{OsError, OsErrorExt2}, - pipe::{Pipe, pipe}, - ptr_ext::MutPtrExt, - queue::AsyncQueue, - stack::Stack, - }, - }, - parking_lot::{Condvar, Mutex}, - std::{ - any::Any, - cell::{Cell, RefCell}, - collections::VecDeque, - mem, - ptr::NonNull, - rc::Rc, - sync::Arc, - thread, - }, - thiserror::Error, - uapi::{OwnedFd, c}, -}; - -pub trait CpuJob { - fn work(&mut self) -> &mut dyn CpuWork; - fn completed(self: Box); -} - -pub trait CpuWork: Send { - fn run(&mut self) -> Option>; - - fn cancel_async(&mut self, ring: &Rc) { - let _ = ring; - unreachable!(); - } - - fn async_work_done(&mut self, work: Box) { - let _ = work; - unreachable!(); - } -} - -pub trait AsyncCpuWork: Any { - fn run( - self: Box, - eng: &Rc, - ring: &Rc, - completion: WorkCompletion, - ) -> SpawnedFuture; -} - -pub struct WorkCompletion { - worker: Rc, - id: CpuJobId, -} - -pub struct CompletedWork(()); - -impl WorkCompletion { - pub fn complete(self, work: Box) -> CompletedWork { - let job = self.worker.async_jobs.remove(&self.id).unwrap(); - unsafe { - job.work.deref_mut().async_work_done(work); - } - self.worker.send_completion(self.id); - CompletedWork(()) - } -} - -pub struct CpuWorker { - data: Rc, - _completions_listener: SpawnedFuture<()>, - _job_enqueuer: SpawnedFuture<()>, -} - -#[must_use] -pub struct PendingJob { - id: CpuJobId, - thread_data: Rc, - job_data: Rc, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] -enum PendingJobState { - #[default] - Waiting, - Abandoned, - Completed, -} - -#[derive(Default)] -struct PendingJobData { - job: Cell>>, - state: Cell, -} - -enum Job { - New { - id: CpuJobId, - work: *mut dyn CpuWork, - }, - Cancel { - id: CpuJobId, - }, -} - -unsafe impl Send for Job {} - -#[derive(Default)] -struct CompletedJobsExchange { - queue: VecDeque, - condvar: Option>, -} - -struct CpuWorkerData { - next: CpuJobIds, - jobs_to_enqueue: AsyncQueue, - new_jobs: Arc>>, - have_new_jobs: Rc, - completed_jobs_remote: Arc>, - completed_jobs_local: RefCell>, - have_completed_jobs: Rc, - pending_jobs: CopyHashMap>, - ring: Rc, - _stop: OwnedFd, - pending_job_data_cache: Stack>, - sync_wake_condvar: Arc, -} - -linear_ids!(CpuJobIds, CpuJobId, u64); - -#[derive(Debug, Error)] -pub enum CpuWorkerError { - #[error("Could not create a pipe")] - Pipe(#[source] OsError), - #[error("Could not create an eventfd")] - EventFd(#[source] OsError), - #[error("Could not dup an eventfd")] - Dup(#[source] OsError), -} - -impl PendingJob { - pub fn detach(self) { - match self.job_data.state.get() { - PendingJobState::Waiting => { - self.job_data.state.set(PendingJobState::Abandoned); - } - PendingJobState::Abandoned => { - unreachable!(); - } - PendingJobState::Completed => {} - } - } -} - -impl Drop for CpuWorker { - fn drop(&mut self) { - self.data.do_equeue_jobs(); - if self.data.pending_jobs.is_not_empty() { - log::warn!("CpuWorker dropped with pending jobs. Completed jobs will not be triggered.") - } - } -} - -impl Drop for PendingJob { - fn drop(&mut self) { - match self.job_data.state.get() { - PendingJobState::Waiting => { - log::warn!("PendingJob dropped before completion. Blocking."); - let data = &self.thread_data; - let id = self.id; - self.job_data.state.set(PendingJobState::Abandoned); - data.jobs_to_enqueue.push(Job::Cancel { id }); - data.do_equeue_jobs(); - loop { - data.dispatch_completions(); - if !data.pending_jobs.contains(&id) { - break; - } - let mut remote = data.completed_jobs_remote.lock(); - while remote.queue.is_empty() { - remote.condvar = Some(data.sync_wake_condvar.clone()); - data.sync_wake_condvar.wait(&mut remote); - } - } - } - PendingJobState::Abandoned => {} - PendingJobState::Completed => { - self.thread_data - .pending_job_data_cache - .push(self.job_data.clone()); - } - } - } -} - -impl CpuWorkerData { - fn clear(&self) { - self.jobs_to_enqueue.clear(); - self.new_jobs.lock().clear(); - self.completed_jobs_remote.lock().queue.clear(); - self.completed_jobs_local.borrow_mut().clear(); - self.pending_jobs.clear(); - self.pending_job_data_cache.take(); - } - - async fn wait_for_completions(self: Rc) { - let mut buf = TypedBuf::::new(); - loop { - if let Err(e) = self.ring.read(&self.have_completed_jobs, buf.buf()).await { - log::error!("Could not wait for job completions: {}", ErrorFmt(e)); - return; - } - self.dispatch_completions(); - } - } - - fn dispatch_completions(&self) { - let completions = &mut *self.completed_jobs_local.borrow_mut(); - mem::swap(completions, &mut self.completed_jobs_remote.lock().queue); - while let Some(id) = completions.pop_front() { - let job_data = self.pending_jobs.remove(&id).unwrap(); - let job = job_data.job.take().unwrap(); - let job = unsafe { Box::from_raw(job.as_ptr()) }; - match job_data.state.get() { - PendingJobState::Waiting => { - job_data.state.set(PendingJobState::Completed); - job.completed(); - } - PendingJobState::Abandoned => { - self.pending_job_data_cache.push(job_data); - } - PendingJobState::Completed => { - unreachable!(); - } - } - } - } - - async fn equeue_jobs(self: Rc) { - loop { - self.jobs_to_enqueue.non_empty().await; - self.do_equeue_jobs(); - } - } - - fn do_equeue_jobs(&self) { - self.jobs_to_enqueue.move_to(&mut self.new_jobs.lock()); - if let Err(e) = uapi::eventfd_write(self.have_new_jobs.raw(), 1) { - panic!("Could not signal eventfd: {}", ErrorFmt(e)); - } - } -} - -impl CpuWorker { - pub fn new(ring: &Rc, eng: &Rc) -> Result { - let new_jobs: Arc>> = Default::default(); - let completed_jobs: Arc> = Default::default(); - let Pipe { - read: stop_read, - write: stop_write, - } = pipe().map_err(CpuWorkerError::Pipe)?; - let have_new_jobs = uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?; - let have_completed_jobs = - uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?; - thread::Builder::new() - .name("cpu worker".to_string()) - .spawn({ - let new_jobs = new_jobs.clone(); - let completed_jobs = completed_jobs.clone(); - let have_new_jobs = uapi::fcntl_dupfd_cloexec(have_new_jobs.raw(), 0) - .map_os_err(CpuWorkerError::Dup)?; - let have_completed_jobs = uapi::fcntl_dupfd_cloexec(have_completed_jobs.raw(), 0) - .map_os_err(CpuWorkerError::Dup)?; - move || { - work( - new_jobs, - completed_jobs, - stop_write, - have_new_jobs, - have_completed_jobs, - ) - } - }) - .unwrap(); - let data = Rc::new(CpuWorkerData { - next: Default::default(), - jobs_to_enqueue: Default::default(), - new_jobs, - have_new_jobs: Rc::new(have_new_jobs), - completed_jobs_remote: completed_jobs, - completed_jobs_local: Default::default(), - have_completed_jobs: Rc::new(have_completed_jobs), - pending_jobs: Default::default(), - ring: ring.clone(), - _stop: stop_read, - pending_job_data_cache: Default::default(), - sync_wake_condvar: Arc::new(Condvar::new()), - }); - Ok(Self { - _completions_listener: eng.spawn( - "cpu worker completions", - data.clone().wait_for_completions(), - ), - _job_enqueuer: eng.spawn("cpu worker enqueue", data.clone().equeue_jobs()), - data, - }) - } - - pub fn clear(&self) { - self.data.clear(); - } - - pub fn submit(&self, job: Box) -> PendingJob { - let mut job = NonNull::from(Box::leak(job)); - let id = self.data.next.next(); - self.data.jobs_to_enqueue.push(Job::New { - id, - work: unsafe { job.as_mut().work() }, - }); - let job_data = self.data.pending_job_data_cache.pop().unwrap_or_default(); - job_data.job.set(Some(job)); - job_data.state.set(PendingJobState::Waiting); - self.data.pending_jobs.set(id, job_data.clone()); - PendingJob { - id, - thread_data: self.data.clone(), - job_data, - } - } - - #[cfg(feature = "it")] - pub fn wait_idle(&self) -> bool { - let was_idle = self.data.pending_jobs.is_empty(); - loop { - self.data.dispatch_completions(); - if self.data.pending_jobs.is_empty() { - break; - } - let mut remote = self.data.completed_jobs_remote.lock(); - while remote.queue.is_empty() { - remote.condvar = Some(self.data.sync_wake_condvar.clone()); - self.data.sync_wake_condvar.wait(&mut remote); - } - } - was_idle - } -} - -fn work( - new_jobs: Arc>>, - completed_jobs: Arc>, - stop: OwnedFd, - have_new_jobs: OwnedFd, - have_completed_jobs: OwnedFd, -) { - let eng = AsyncEngine::new(); - let ring = IoUring::new(&eng, 32).unwrap(); - let worker = Rc::new(Worker { - eng, - ring, - completed_jobs, - have_completed_jobs, - async_jobs: Default::default(), - stopped: Cell::new(false), - }); - let _stop_listener = worker - .eng - .spawn("stop listener", worker.clone().handle_stop(stop)); - let _new_job_listener = worker.eng.spawn( - "new job listener", - worker.clone().handle_new_jobs(new_jobs, have_new_jobs), - ); - if let Err(e) = worker.ring.run() { - panic!("io_uring failed: {}", ErrorFmt(e)); - } -} - -struct Worker { - eng: Rc, - ring: Rc, - completed_jobs: Arc>, - have_completed_jobs: OwnedFd, - async_jobs: CopyHashMap, - stopped: Cell, -} - -struct AsyncJob { - _future: SpawnedFuture, - work: *mut dyn CpuWork, -} - -impl Worker { - async fn handle_stop(self: Rc, stop: OwnedFd) { - let stop = Rc::new(stop); - if let Err(e) = self.ring.poll(&stop, 0).await { - log::error!( - "Could not wait for stop fd to become readable: {}", - ErrorFmt(e) - ); - } else { - assert!(self.async_jobs.is_empty()); - self.stopped.set(true); - self.ring.stop(); - } - } - - async fn handle_new_jobs( - self: Rc, - jobs_remote: Arc>>, - new_jobs: OwnedFd, - ) { - let mut buf = TypedBuf::::new(); - let new_jobs = Rc::new(new_jobs); - let mut jobs = VecDeque::new(); - loop { - if let Err(e) = self.ring.read(&new_jobs, buf.buf()).await { - if self.stopped.get() { - return; - } - panic!( - "Could not wait for new jobs fd to be signaled: {}", - ErrorFmt(e), - ); - } - mem::swap(&mut jobs, &mut *jobs_remote.lock()); - while let Some(job) = jobs.pop_front() { - self.handle_new_job(job); - } - } - } - - fn handle_new_job(self: &Rc, job: Job) { - match job { - Job::Cancel { id } => { - let mut jobs = self.async_jobs.lock(); - if let Some(job) = jobs.get_mut(&id) { - unsafe { - job.work.deref_mut().cancel_async(&self.ring); - } - } - } - Job::New { id, work } => match unsafe { work.deref_mut() }.run() { - None => { - self.send_completion(id); - return; - } - Some(w) => { - let completion = WorkCompletion { - worker: self.clone(), - id, - }; - let future = w.run(&self.eng, &self.ring, completion); - self.async_jobs.set( - id, - AsyncJob { - _future: future, - work, - }, - ); - } - }, - } - } - - fn send_completion(&self, id: CpuJobId) { - let cv = { - let mut exchange = self.completed_jobs.lock(); - exchange.queue.push_back(id); - exchange.condvar.take() - }; - if let Some(cv) = cv { - cv.notify_all(); - } - if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) { - panic!("Could not signal job completion: {}", ErrorFmt(e)); - } - } -} +pub use jay_cpu_worker::*;