1
0
Fork 0
forked from wry/wry

async_engine: move scheduler into workspace crate

This commit is contained in:
kossLAN 2026-05-29 09:19:14 -04:00
parent b7ecf700fa
commit 03d3876888
No known key found for this signature in database
7 changed files with 205 additions and 182 deletions

288
async-engine/src/ae_task.rs Normal file
View file

@ -0,0 +1,288 @@
use {
crate::{AsyncEngine, Phase},
jay_tracy::ZoneName,
jay_utils::{
numcell::NumCell,
ptr_ext::{MutPtrExt, PtrExt},
},
std::{
cell::{Cell, UnsafeCell},
future::Future,
mem::ManuallyDrop,
pin::Pin,
ptr,
rc::Rc,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
},
};
#[must_use]
pub struct SpawnedFuture<T: 'static> {
vtable: &'static SpawnedFutureVtable<T>,
data: *mut u8,
}
impl<T> Future for SpawnedFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { (self.vtable.poll)(self.data, cx) }
}
}
impl<T> Drop for SpawnedFuture<T> {
fn drop(&mut self) {
unsafe {
(self.vtable.drop)(self.data);
}
}
}
struct SpawnedFutureVTableProxy<T, F>(T, F);
impl<T: 'static, F: Future<Output = T>> SpawnedFutureVTableProxy<T, F> {
const VTABLE: &'static SpawnedFutureVtable<T> = &SpawnedFutureVtable {
poll: Self::poll,
drop: Self::drop,
};
unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll<T> {
unsafe {
let task = (data as *const Task<T, F>).deref();
if &task.state & COMPLETED == 0 {
task.waker.set(Some(ctx.waker().clone()));
Poll::Pending
} else if &task.state & EMPTIED == 0 {
task.state.or_assign(EMPTIED);
Poll::Ready(ptr::read(&*task.data.get().deref().result))
} else {
panic!("Future polled after it has already been emptied");
}
}
}
unsafe fn drop(data: *mut u8) {
unsafe {
{
let task = (data as *const Task<T, F>).deref();
task.state.or_assign(CANCELLED);
if &task.state & RUNNING == 0 {
task.drop_data();
}
}
Task::<T, F>::dec_ref_count(data as _);
}
}
}
struct SpawnedFutureVtable<T> {
poll: unsafe fn(data: *mut u8, ctx: &mut Context<'_>) -> Poll<T>,
drop: unsafe fn(data: *mut u8),
}
union TaskData<T, F: Future<Output = T>> {
result: ManuallyDrop<T>,
future: ManuallyDrop<F>,
}
const RUNNING: u32 = 1;
const RUN_AGAIN: u32 = 2;
const COMPLETED: u32 = 4;
const EMPTIED: u32 = 8;
const CANCELLED: u32 = 16;
struct Task<T, F: Future<Output = T>> {
ref_count: NumCell<u64>,
phase: Phase,
state: NumCell<u32>,
data: UnsafeCell<TaskData<T, F>>,
waker: Cell<Option<Waker>>,
queue: Rc<AsyncEngine>,
#[cfg_attr(not(feature = "tracy"), expect(dead_code))]
zone: ZoneName,
}
pub(super) struct Runnable {
data: *const u8,
run: unsafe fn(data: *const u8, run: bool),
}
impl Runnable {
pub(super) fn run(self) {
let slf = ManuallyDrop::new(self);
unsafe {
(slf.run)(slf.data, true);
}
}
}
impl Drop for Runnable {
fn drop(&mut self) {
unsafe {
(self.run)(self.data, false);
}
}
}
impl AsyncEngine {
pub(super) fn spawn_<T, F: Future<Output = T>>(
self: &Rc<Self>,
#[cfg_attr(not(feature = "tracy"), expect(unused_variables))] name: &str,
phase: Phase,
f: F,
) -> SpawnedFuture<T> {
let f = Box::new(Task {
ref_count: NumCell::new(1),
phase,
state: NumCell::new(0),
data: UnsafeCell::new(TaskData {
future: ManuallyDrop::new(f),
}),
waker: Cell::new(None),
queue: self.clone(),
zone: jay_tracy::create_zone_name!("task:{}", name),
});
unsafe {
f.schedule_run();
}
let f = Box::into_raw(f);
SpawnedFuture {
vtable: SpawnedFutureVTableProxy::<T, F>::VTABLE,
data: f as _,
}
}
}
impl<T, F: Future<Output = T>> Task<T, F> {
const VTABLE: &'static RawWakerVTable = &RawWakerVTable::new(
Self::waker_clone,
Self::waker_wake,
Self::waker_wake_by_ref,
Self::waker_drop,
);
unsafe fn run_proxy(data: *const u8, run: bool) {
unsafe {
let task = data as *const Self;
if run {
task.deref().run();
} else {
Self::task_runnable_dropped(task);
}
Self::dec_ref_count(task);
}
}
#[cold]
unsafe fn task_runnable_dropped(task: *const Self) {
unsafe {
let task = task.deref();
task.state.and_assign(!RUNNING);
if task.state.get() & CANCELLED != 0 {
task.drop_data();
}
}
}
unsafe fn dec_ref_count(slf: *const Self) {
unsafe {
if slf.deref().ref_count.fetch_sub(1) == 1 {
drop(Box::from_raw(slf as *mut Self));
}
}
}
unsafe fn inc_ref_count(&self) {
self.ref_count.fetch_add(1);
}
unsafe fn waker_clone(data: *const ()) -> RawWaker {
unsafe {
let task = &mut *(data as *mut Self);
task.inc_ref_count();
RawWaker::new(data, Self::VTABLE)
}
}
unsafe fn waker_wake(data: *const ()) {
unsafe {
Self::waker_wake_by_ref(data);
Self::waker_drop(data);
}
}
unsafe fn waker_wake_by_ref(data: *const ()) {
unsafe {
(data as *const Self).deref().schedule_run();
}
}
unsafe fn waker_drop(data: *const ()) {
unsafe { Self::dec_ref_count(data as _) }
}
unsafe fn schedule_run(&self) {
unsafe {
if &self.state & (COMPLETED | CANCELLED) == 0 {
if &self.state & RUNNING == 0 {
self.state.or_assign(RUNNING);
self.inc_ref_count();
let data = self as *const _ as _;
self.queue.push(
Runnable {
data,
run: Self::run_proxy,
},
self.phase,
);
} else {
self.state.or_assign(RUN_AGAIN);
}
}
}
}
unsafe fn run(&self) {
unsafe {
if &self.state & CANCELLED == 0 {
let data = self.data.get().deref_mut();
self.inc_ref_count();
let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE);
let waker = Waker::from_raw(raw_waker);
let mut ctx = Context::from_waker(&waker);
let poll = {
jay_tracy::dynamic_zone!(self.zone);
Pin::new_unchecked(&mut *data.future).poll(&mut ctx)
};
if let Poll::Ready(d) = poll {
ManuallyDrop::drop(&mut data.future);
ptr::write(&mut data.result, ManuallyDrop::new(d));
self.state.or_assign(COMPLETED);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}
self.state.and_assign(!RUNNING);
if &self.state & CANCELLED != 0 {
self.drop_data();
} else if &self.state & RUN_AGAIN != 0 {
self.state.and_assign(!RUN_AGAIN);
self.schedule_run()
}
}
}
unsafe fn drop_data(&self) {
unsafe {
if &self.state & COMPLETED == 0 {
ManuallyDrop::drop(&mut self.data.get().deref_mut().future);
} else if &self.state & EMPTIED == 0 {
ManuallyDrop::drop(&mut self.data.get().deref_mut().result);
}
}
}
}

View file

@ -0,0 +1,27 @@
use {
crate::AsyncEngine,
std::{
future::Future,
pin::Pin,
rc::Rc,
task::{Context, Poll},
},
};
pub struct Yield {
pub(super) iteration: u64,
pub(super) queue: Rc<AsyncEngine>,
}
impl Future for Yield {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.queue.iteration() > self.iteration {
Poll::Ready(())
} else {
self.queue.push_yield(cx.waker().clone());
Poll::Pending
}
}
}

168
async-engine/src/lib.rs Normal file
View file

@ -0,0 +1,168 @@
mod ae_task;
mod ae_yield;
pub use {ae_task::SpawnedFuture, ae_yield::Yield};
use {
crate::ae_task::Runnable,
jay_time::Time,
jay_utils::{array, numcell::NumCell, syncqueue::SyncQueue},
std::{
cell::{Cell, RefCell},
collections::VecDeque,
future::Future,
rc::Rc,
task::Waker,
},
};
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum Phase {
EventHandling,
Layout,
PostLayout,
Present,
}
const NUM_PHASES: usize = 4;
pub struct AsyncEngine {
num_queued: NumCell<usize>,
queues: [SyncQueue<Runnable>; NUM_PHASES],
iteration: NumCell<u64>,
yields: SyncQueue<Waker>,
stash: RefCell<VecDeque<Runnable>>,
yield_stash: RefCell<VecDeque<Waker>>,
stopped: Cell<bool>,
now: Cell<Option<Time>>,
#[cfg(feature = "it")]
idle: Cell<Option<Waker>>,
}
impl AsyncEngine {
pub fn new() -> Rc<Self> {
Rc::new(Self {
num_queued: Default::default(),
queues: array::from_fn(|_| Default::default()),
iteration: Default::default(),
yields: Default::default(),
stash: Default::default(),
yield_stash: Default::default(),
stopped: Cell::new(false),
now: Default::default(),
#[cfg(feature = "it")]
idle: Default::default(),
})
}
pub fn stop(&self) {
self.stopped.set(true);
}
pub fn clear(&self) {
self.stash.borrow_mut().clear();
self.yield_stash.borrow_mut().clear();
self.yields.take();
for queue in &self.queues {
queue.take();
}
}
pub fn spawn<T, F: Future<Output = T> + 'static>(
self: &Rc<Self>,
name: &str,
f: F,
) -> SpawnedFuture<T> {
self.spawn_(name, Phase::EventHandling, f)
}
pub fn spawn2<T, F: Future<Output = T> + 'static>(
self: &Rc<Self>,
name: &str,
phase: Phase,
f: F,
) -> SpawnedFuture<T> {
self.spawn_(name, phase, f)
}
pub fn yield_now(self: &Rc<Self>) -> Yield {
Yield {
iteration: self.iteration(),
queue: self.clone(),
}
}
pub fn dispatch(&self) {
let mut stash = self.stash.borrow_mut();
let mut yield_stash = self.yield_stash.borrow_mut();
loop {
if self.num_queued.get() == 0 {
#[cfg(feature = "it")]
if let Some(idle) = self.idle.take() {
idle.wake();
continue;
}
break;
}
self.now.take();
let mut phase = 0;
while phase < NUM_PHASES {
self.queues[phase].swap(&mut *stash);
if stash.is_empty() {
phase += 1;
continue;
}
self.num_queued.fetch_sub(stash.len());
while let Some(runnable) = stash.pop_front() {
runnable.run();
if self.stopped.get() {
return;
}
}
}
self.iteration.fetch_add(1);
self.yields.swap(&mut *yield_stash);
while let Some(waker) = yield_stash.pop_front() {
waker.wake();
}
}
}
#[cfg(feature = "it")]
pub async fn idle(&self) {
use std::{future::poll_fn, task::Poll};
let mut register = true;
poll_fn(|ctx| {
if register {
self.idle.set(Some(ctx.waker().clone()));
register = false;
Poll::Pending
} else {
Poll::Ready(())
}
})
.await
}
fn push(&self, runnable: Runnable, phase: Phase) {
self.queues[phase as usize].push(runnable);
self.num_queued.fetch_add(1);
}
fn push_yield(&self, waker: Waker) {
self.yields.push(waker);
}
pub fn iteration(&self) -> u64 {
self.iteration.get()
}
pub fn now(&self) -> Time {
match self.now.get() {
Some(t) => t,
None => {
let now = Time::now_unchecked();
self.now.set(Some(now));
now
}
}
}
}