1
0
Fork 0
forked from wry/wry

refactor: split cargo workspace

This commit is contained in:
kossLAN 2026-06-05 11:56:21 -04:00
parent 5db14936e7
commit 1c21bd1259
695 changed files with 32023 additions and 44964 deletions

View file

@ -0,0 +1,24 @@
[package]
name = "jay-cpu-worker"
version = "0.1.0"
edition = "2024"
license = "GPL-3.0-only"
[dependencies]
jay-async-engine = { version = "0.1.0", path = "../async-engine" }
jay-geometry = { version = "0.1.0", path = "../geometry" }
jay-io-uring = { version = "0.1.0", path = "../io-uring" }
jay-tracy = { version = "0.1.0", path = "../tracy" }
jay-utils = { version = "0.1.0", path = "../utils" }
log = { version = "0.4.20", features = ["std"] }
parking_lot = "0.12.1"
thiserror = "2.0.11"
uapi = "0.2.13"
[dev-dependencies]
jay-wheel = { version = "0.1.0", path = "../wheel" }
[features]
it = []
tracy = ["jay-tracy/tracy", "jay-async-engine/tracy"]

View file

@ -0,0 +1,2 @@
pub mod img_copy;
pub mod read_write;

View file

@ -0,0 +1,60 @@
use {
crate::{AsyncCpuWork, CpuWork},
jay_geometry::Rect,
std::ptr,
};
pub struct ImgCopyWork {
pub src: *mut u8,
pub dst: *mut u8,
pub width: i32,
pub stride: i32,
pub bpp: i32,
pub rects: Vec<Rect>,
_priv: (),
}
unsafe impl Send for ImgCopyWork {}
impl ImgCopyWork {
pub unsafe fn new() -> Self {
Self {
src: ptr::null_mut(),
dst: ptr::null_mut(),
width: 0,
stride: 0,
bpp: 0,
rects: vec![],
_priv: (),
}
}
}
impl CpuWork for ImgCopyWork {
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>> {
jay_tracy::zone!("ImgCopyWork");
for rect in &self.rects {
let mut offset = rect.y1() * self.stride + rect.x1() * self.bpp;
if rect.width() == self.width {
let offset = offset as usize;
let len = rect.height() * self.stride;
unsafe {
ptr::copy_nonoverlapping(self.src.add(offset), self.dst.add(offset), len as _);
}
} else {
let len = rect.width() * self.bpp;
for _ in 0..rect.height() {
unsafe {
ptr::copy_nonoverlapping(
self.src.add(offset as _),
self.dst.add(offset as _),
len as _,
);
}
offset += self.stride;
}
}
}
None
}
}

View file

@ -0,0 +1,145 @@
use {
crate::{AsyncCpuWork, CompletedWork, CpuWork, WorkCompletion},
jay_async_engine::{AsyncEngine, SpawnedFuture},
jay_io_uring::{IoUring, IoUringError, IoUringTaskId},
std::{
any::Any,
ptr,
rc::Rc,
slice,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
},
},
thiserror::Error,
uapi::{Fd, c},
};
#[derive(Debug, Error)]
pub enum ReadWriteJobError {
#[error("An io_uring error occurred")]
IoUring(#[source] IoUringError),
#[error("The job was cancelled")]
Cancelled,
#[error("Tried to operate outside the bounds of the file descriptor")]
OutOfBounds,
}
pub struct ReadWriteWork {
cancel: Arc<CancelState>,
config: Option<Box<ReadWriteWorkConfig>>,
}
unsafe impl Send for ReadWriteWork {}
impl ReadWriteWork {
pub unsafe fn new() -> Self {
let cancel = Arc::new(CancelState::default());
ReadWriteWork {
cancel: cancel.clone(),
config: Some(Box::new(ReadWriteWorkConfig {
fd: -1,
offset: 0,
ptr: ptr::null_mut(),
len: 0,
write: false,
cancel,
result: None,
})),
}
}
pub fn config(&mut self) -> &mut ReadWriteWorkConfig {
self.config.as_mut().unwrap()
}
}
pub struct ReadWriteWorkConfig {
pub fd: c::c_int,
pub offset: usize,
pub ptr: *mut u8,
pub len: usize,
pub write: bool,
pub result: Option<Result<(), ReadWriteJobError>>,
cancel: Arc<CancelState>,
}
#[derive(Default)]
struct CancelState {
cancelled: AtomicBool,
cancel_id: AtomicU64,
}
impl CpuWork for ReadWriteWork {
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>> {
self.cancel.cancelled.store(false, Relaxed);
self.cancel.cancel_id.store(0, Relaxed);
self.config.take().map(|b| b as _)
}
fn cancel_async(&mut self, ring: &Rc<IoUring>) {
self.cancel.cancelled.store(true, Relaxed);
let id = self.cancel.cancel_id.load(Relaxed);
if id != 0 {
ring.cancel(IoUringTaskId::from_raw(id));
}
}
fn async_work_done(&mut self, work: Box<dyn AsyncCpuWork>) {
let work = (work as Box<dyn Any>).downcast().unwrap();
self.config = Some(work);
}
}
impl AsyncCpuWork for ReadWriteWorkConfig {
fn run(
mut self: Box<Self>,
eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
completion: WorkCompletion,
) -> SpawnedFuture<CompletedWork> {
let ring = ring.clone();
eng.spawn("shm read/write", async move {
let res = loop {
if self.cancel.cancelled.load(Relaxed) {
break Err(ReadWriteJobError::Cancelled);
}
if self.len == 0 {
break Ok(());
};
let res = if self.write {
ring.write_no_cancel(
Fd::new(self.fd),
self.offset,
unsafe { slice::from_raw_parts(self.ptr, self.len) },
None,
|id| self.cancel.cancel_id.store(id.raw(), Relaxed),
)
.await
} else {
ring.read_no_cancel(
Fd::new(self.fd),
self.offset,
unsafe { slice::from_raw_parts_mut(self.ptr, self.len) },
|id| self.cancel.cancel_id.store(id.raw(), Relaxed),
)
.await
};
match res {
Ok(0) => break Err(ReadWriteJobError::OutOfBounds),
Ok(n) => {
self.len -= n;
self.offset += n;
unsafe {
self.ptr = self.ptr.add(n);
}
}
Err(e) => break Err(ReadWriteJobError::IoUring(e)),
}
};
self.result = Some(res);
completion.complete(self)
})
}
}

View file

@ -0,0 +1,509 @@
pub mod jobs;
#[cfg(test)]
mod tests;
use {
jay_async_engine::{AsyncEngine, SpawnedFuture},
jay_io_uring::IoUring,
jay_utils::{
buf::TypedBuf,
copyhashmap::CopyHashMap,
errorfmt::ErrorFmt,
numcell::NumCell,
oserror::{OsError, OsErrorExt2},
pipe::{Pipe, pipe},
ptr_ext::MutPtrExt,
queue::AsyncQueue,
stack::Stack,
},
parking_lot::{Condvar, Mutex},
std::{
any::Any,
cell::{Cell, RefCell},
collections::VecDeque,
mem,
ptr::NonNull,
rc::Rc,
sync::Arc,
thread,
},
thiserror::Error,
uapi::{OwnedFd, c},
};
pub trait CpuJob {
fn work(&mut self) -> &mut dyn CpuWork;
fn completed(self: Box<Self>);
}
pub trait CpuWork: Send {
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>>;
fn cancel_async(&mut self, ring: &Rc<IoUring>) {
let _ = ring;
unreachable!();
}
fn async_work_done(&mut self, work: Box<dyn AsyncCpuWork>) {
let _ = work;
unreachable!();
}
}
pub trait AsyncCpuWork: Any {
fn run(
self: Box<Self>,
eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
completion: WorkCompletion,
) -> SpawnedFuture<CompletedWork>;
}
pub struct WorkCompletion {
worker: Rc<Worker>,
id: CpuJobId,
}
pub struct CompletedWork(());
impl WorkCompletion {
pub fn complete(self, work: Box<dyn AsyncCpuWork>) -> CompletedWork {
let job = self.worker.async_jobs.remove(&self.id).unwrap();
unsafe {
job.work.deref_mut().async_work_done(work);
}
self.worker.send_completion(self.id);
CompletedWork(())
}
}
pub struct CpuWorker {
data: Rc<CpuWorkerData>,
_completions_listener: SpawnedFuture<()>,
_job_enqueuer: SpawnedFuture<()>,
}
#[must_use]
pub struct PendingJob {
id: CpuJobId,
thread_data: Rc<CpuWorkerData>,
job_data: Rc<PendingJobData>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
enum PendingJobState {
#[default]
Waiting,
Abandoned,
Completed,
}
#[derive(Default)]
struct PendingJobData {
job: Cell<Option<NonNull<dyn CpuJob>>>,
state: Cell<PendingJobState>,
}
enum Job {
New {
id: CpuJobId,
work: *mut dyn CpuWork,
},
Cancel {
id: CpuJobId,
},
}
unsafe impl Send for Job {}
#[derive(Default)]
struct CompletedJobsExchange {
queue: VecDeque<CpuJobId>,
condvar: Option<Arc<Condvar>>,
}
struct CpuWorkerData {
next: CpuJobIds,
jobs_to_enqueue: AsyncQueue<Job>,
new_jobs: Arc<Mutex<VecDeque<Job>>>,
have_new_jobs: Rc<OwnedFd>,
completed_jobs_remote: Arc<Mutex<CompletedJobsExchange>>,
completed_jobs_local: RefCell<VecDeque<CpuJobId>>,
have_completed_jobs: Rc<OwnedFd>,
pending_jobs: CopyHashMap<CpuJobId, Rc<PendingJobData>>,
ring: Rc<IoUring>,
_stop: OwnedFd,
pending_job_data_cache: Stack<Rc<PendingJobData>>,
sync_wake_condvar: Arc<Condvar>,
}
#[derive(Debug)]
struct CpuJobIds {
next: NumCell<u64>,
}
impl Default for CpuJobIds {
fn default() -> Self {
Self {
next: NumCell::new(1),
}
}
}
impl CpuJobIds {
fn next(&self) -> CpuJobId {
CpuJobId(self.next.fetch_add(1))
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
struct CpuJobId(u64);
#[derive(Debug, Error)]
pub enum CpuWorkerError {
#[error("Could not create a pipe")]
Pipe(#[source] OsError),
#[error("Could not create an eventfd")]
EventFd(#[source] OsError),
#[error("Could not dup an eventfd")]
Dup(#[source] OsError),
}
impl PendingJob {
pub fn detach(self) {
match self.job_data.state.get() {
PendingJobState::Waiting => {
self.job_data.state.set(PendingJobState::Abandoned);
}
PendingJobState::Abandoned => {
unreachable!();
}
PendingJobState::Completed => {}
}
}
}
impl Drop for CpuWorker {
fn drop(&mut self) {
self.data.do_equeue_jobs();
if self.data.pending_jobs.is_not_empty() {
log::warn!("CpuWorker dropped with pending jobs. Completed jobs will not be triggered.")
}
}
}
impl Drop for PendingJob {
fn drop(&mut self) {
match self.job_data.state.get() {
PendingJobState::Waiting => {
log::warn!("PendingJob dropped before completion. Blocking.");
let data = &self.thread_data;
let id = self.id;
self.job_data.state.set(PendingJobState::Abandoned);
data.jobs_to_enqueue.push(Job::Cancel { id });
data.do_equeue_jobs();
loop {
data.dispatch_completions();
if !data.pending_jobs.contains(&id) {
break;
}
let mut remote = data.completed_jobs_remote.lock();
while remote.queue.is_empty() {
remote.condvar = Some(data.sync_wake_condvar.clone());
data.sync_wake_condvar.wait(&mut remote);
}
}
}
PendingJobState::Abandoned => {}
PendingJobState::Completed => {
self.thread_data
.pending_job_data_cache
.push(self.job_data.clone());
}
}
}
}
impl CpuWorkerData {
fn clear(&self) {
self.jobs_to_enqueue.clear();
self.new_jobs.lock().clear();
self.completed_jobs_remote.lock().queue.clear();
self.completed_jobs_local.borrow_mut().clear();
self.pending_jobs.clear();
self.pending_job_data_cache.take();
}
async fn wait_for_completions(self: Rc<Self>) {
let mut buf = TypedBuf::<u64>::new();
loop {
if let Err(e) = self.ring.read(&self.have_completed_jobs, buf.buf()).await {
log::error!("Could not wait for job completions: {}", ErrorFmt(e));
return;
}
self.dispatch_completions();
}
}
fn dispatch_completions(&self) {
let completions = &mut *self.completed_jobs_local.borrow_mut();
mem::swap(completions, &mut self.completed_jobs_remote.lock().queue);
while let Some(id) = completions.pop_front() {
let job_data = self.pending_jobs.remove(&id).unwrap();
let job = job_data.job.take().unwrap();
let job = unsafe { Box::from_raw(job.as_ptr()) };
match job_data.state.get() {
PendingJobState::Waiting => {
job_data.state.set(PendingJobState::Completed);
job.completed();
}
PendingJobState::Abandoned => {
self.pending_job_data_cache.push(job_data);
}
PendingJobState::Completed => {
unreachable!();
}
}
}
}
async fn equeue_jobs(self: Rc<Self>) {
loop {
self.jobs_to_enqueue.non_empty().await;
self.do_equeue_jobs();
}
}
fn do_equeue_jobs(&self) {
self.jobs_to_enqueue.move_to(&mut self.new_jobs.lock());
if let Err(e) = uapi::eventfd_write(self.have_new_jobs.raw(), 1) {
panic!("Could not signal eventfd: {}", ErrorFmt(e));
}
}
}
impl CpuWorker {
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> {
let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default();
let completed_jobs: Arc<Mutex<CompletedJobsExchange>> = Default::default();
let Pipe {
read: stop_read,
write: stop_write,
} = pipe().map_err(CpuWorkerError::Pipe)?;
let have_new_jobs = uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?;
let have_completed_jobs =
uapi::eventfd(0, c::EFD_CLOEXEC).map_os_err(CpuWorkerError::EventFd)?;
thread::Builder::new()
.name("cpu worker".to_string())
.spawn({
let new_jobs = new_jobs.clone();
let completed_jobs = completed_jobs.clone();
let have_new_jobs = uapi::fcntl_dupfd_cloexec(have_new_jobs.raw(), 0)
.map_os_err(CpuWorkerError::Dup)?;
let have_completed_jobs = uapi::fcntl_dupfd_cloexec(have_completed_jobs.raw(), 0)
.map_os_err(CpuWorkerError::Dup)?;
move || {
work(
new_jobs,
completed_jobs,
stop_write,
have_new_jobs,
have_completed_jobs,
)
}
})
.unwrap();
let data = Rc::new(CpuWorkerData {
next: Default::default(),
jobs_to_enqueue: Default::default(),
new_jobs,
have_new_jobs: Rc::new(have_new_jobs),
completed_jobs_remote: completed_jobs,
completed_jobs_local: Default::default(),
have_completed_jobs: Rc::new(have_completed_jobs),
pending_jobs: Default::default(),
ring: ring.clone(),
_stop: stop_read,
pending_job_data_cache: Default::default(),
sync_wake_condvar: Arc::new(Condvar::new()),
});
Ok(Self {
_completions_listener: eng.spawn(
"cpu worker completions",
data.clone().wait_for_completions(),
),
_job_enqueuer: eng.spawn("cpu worker enqueue", data.clone().equeue_jobs()),
data,
})
}
pub fn clear(&self) {
self.data.clear();
}
pub fn submit(&self, job: Box<dyn CpuJob>) -> PendingJob {
let mut job = NonNull::from(Box::leak(job));
let id = self.data.next.next();
self.data.jobs_to_enqueue.push(Job::New {
id,
work: unsafe { job.as_mut().work() },
});
let job_data = self.data.pending_job_data_cache.pop().unwrap_or_default();
job_data.job.set(Some(job));
job_data.state.set(PendingJobState::Waiting);
self.data.pending_jobs.set(id, job_data.clone());
PendingJob {
id,
thread_data: self.data.clone(),
job_data,
}
}
#[cfg(feature = "it")]
pub fn wait_idle(&self) -> bool {
let was_idle = self.data.pending_jobs.is_empty();
loop {
self.data.dispatch_completions();
if self.data.pending_jobs.is_empty() {
break;
}
let mut remote = self.data.completed_jobs_remote.lock();
while remote.queue.is_empty() {
remote.condvar = Some(self.data.sync_wake_condvar.clone());
self.data.sync_wake_condvar.wait(&mut remote);
}
}
was_idle
}
}
fn work(
new_jobs: Arc<Mutex<VecDeque<Job>>>,
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
stop: OwnedFd,
have_new_jobs: OwnedFd,
have_completed_jobs: OwnedFd,
) {
let eng = AsyncEngine::new();
let ring = IoUring::new(&eng, 32).unwrap();
let worker = Rc::new(Worker {
eng,
ring,
completed_jobs,
have_completed_jobs,
async_jobs: Default::default(),
stopped: Cell::new(false),
});
let _stop_listener = worker
.eng
.spawn("stop listener", worker.clone().handle_stop(stop));
let _new_job_listener = worker.eng.spawn(
"new job listener",
worker.clone().handle_new_jobs(new_jobs, have_new_jobs),
);
if let Err(e) = worker.ring.run() {
panic!("io_uring failed: {}", ErrorFmt(e));
}
}
struct Worker {
eng: Rc<AsyncEngine>,
ring: Rc<IoUring>,
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
have_completed_jobs: OwnedFd,
async_jobs: CopyHashMap<CpuJobId, AsyncJob>,
stopped: Cell<bool>,
}
struct AsyncJob {
_future: SpawnedFuture<CompletedWork>,
work: *mut dyn CpuWork,
}
impl Worker {
async fn handle_stop(self: Rc<Self>, stop: OwnedFd) {
let stop = Rc::new(stop);
if let Err(e) = self.ring.poll(&stop, 0).await {
log::error!(
"Could not wait for stop fd to become readable: {}",
ErrorFmt(e)
);
} else {
assert!(self.async_jobs.is_empty());
self.stopped.set(true);
self.ring.stop();
}
}
async fn handle_new_jobs(
self: Rc<Self>,
jobs_remote: Arc<Mutex<VecDeque<Job>>>,
new_jobs: OwnedFd,
) {
let mut buf = TypedBuf::<u64>::new();
let new_jobs = Rc::new(new_jobs);
let mut jobs = VecDeque::new();
loop {
if let Err(e) = self.ring.read(&new_jobs, buf.buf()).await {
if self.stopped.get() {
return;
}
panic!(
"Could not wait for new jobs fd to be signaled: {}",
ErrorFmt(e),
);
}
mem::swap(&mut jobs, &mut *jobs_remote.lock());
while let Some(job) = jobs.pop_front() {
self.handle_new_job(job);
}
}
}
fn handle_new_job(self: &Rc<Self>, job: Job) {
match job {
Job::Cancel { id } => {
let mut jobs = self.async_jobs.lock();
if let Some(job) = jobs.get_mut(&id) {
unsafe {
job.work.deref_mut().cancel_async(&self.ring);
}
}
}
Job::New { id, work } => match unsafe { work.deref_mut() }.run() {
None => {
self.send_completion(id);
return;
}
Some(w) => {
let completion = WorkCompletion {
worker: self.clone(),
id,
};
let future = w.run(&self.eng, &self.ring, completion);
self.async_jobs.set(
id,
AsyncJob {
_future: future,
work,
},
);
}
},
}
}
fn send_completion(&self, id: CpuJobId) {
let cv = {
let mut exchange = self.completed_jobs.lock();
exchange.queue.push_back(id);
exchange.condvar.take()
};
if let Some(cv) = cv {
cv.notify_all();
}
if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) {
panic!("Could not signal job completion: {}", ErrorFmt(e));
}
}
}

View file

@ -0,0 +1,111 @@
use {
crate::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion},
jay_async_engine::{AsyncEngine, SpawnedFuture},
jay_io_uring::IoUring,
jay_utils::asyncevent::AsyncEvent,
jay_wheel::Wheel,
std::{future::pending, rc::Rc, sync::Arc},
uapi::{OwnedFd, c::EFD_CLOEXEC},
};
struct Job {
ae: Rc<AsyncEvent>,
work: Work,
cancel: bool,
}
struct Work(Arc<OwnedFd>);
struct AsyncWork(Arc<OwnedFd>);
impl CpuJob for Job {
fn work(&mut self) -> &mut dyn CpuWork {
&mut self.work
}
fn completed(self: Box<Self>) {
if self.cancel {
unreachable!();
} else {
self.ae.trigger();
}
}
}
impl Drop for Job {
fn drop(&mut self) {
if self.cancel {
self.ae.trigger();
}
}
}
impl CpuWork for Work {
fn run(&mut self) -> Option<Box<dyn AsyncCpuWork>> {
Some(Box::new(AsyncWork(self.0.clone())))
}
fn cancel_async(&mut self, _ring: &Rc<IoUring>) {
uapi::eventfd_write(self.0.raw(), 1).unwrap();
}
fn async_work_done(&mut self, work: Box<dyn AsyncCpuWork>) {
let _ = work;
}
}
impl AsyncCpuWork for AsyncWork {
fn run(
self: Box<Self>,
eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
completion: WorkCompletion,
) -> SpawnedFuture<CompletedWork> {
let ring = ring.clone();
eng.spawn("", async move {
let mut buf = [0; 8];
let res = ring
.read_no_cancel(self.0.borrow(), 0, &mut buf, |_| ())
.await;
res.unwrap();
completion.complete(self)
})
}
}
fn run(cancel: bool) {
let eng = AsyncEngine::new();
let ring = IoUring::new(&eng, 32).unwrap();
let ring2 = ring.clone();
let wheel = Wheel::new(&eng, &ring).unwrap();
let cpu = Rc::new(CpuWorker::new(&ring, &eng).unwrap());
let ae = Rc::new(AsyncEvent::default());
let eventfd = Arc::new(uapi::eventfd(0, EFD_CLOEXEC).unwrap());
let pending_job = cpu.submit(Box::new(Job {
ae: ae.clone(),
work: Work(eventfd.clone()),
cancel,
}));
let _fut1 = eng.spawn("", async move {
wheel.timeout(1).await.unwrap();
if cancel {
drop(pending_job);
} else {
uapi::eventfd_write(eventfd.raw(), 1).unwrap();
pending::<()>().await;
}
});
let _fut2 = eng.spawn("", async move {
ae.triggered().await;
ring2.stop();
});
ring.run().unwrap();
}
#[test]
fn cancel() {
run(true);
}
#[test]
fn complete() {
run(false);
}