use { crate::{ async_engine::{AsyncEngine, Phase}, tracy::ZoneName, utils::{ numcell::NumCell, ptr_ext::{MutPtrExt, PtrExt}, }, }, std::{ cell::{Cell, UnsafeCell}, future::Future, mem::ManuallyDrop, pin::Pin, ptr, rc::Rc, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }, }; #[must_use] pub struct SpawnedFuture { vtable: &'static SpawnedFutureVtable, data: *mut u8, } impl Future for SpawnedFuture { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { (self.vtable.poll)(self.data, cx) } } } impl Drop for SpawnedFuture { fn drop(&mut self) { unsafe { (self.vtable.drop)(self.data); } } } struct SpawnedFutureVTableProxy(T, F); impl> SpawnedFutureVTableProxy { const VTABLE: &'static SpawnedFutureVtable = &SpawnedFutureVtable { poll: Self::poll, drop: Self::drop, }; unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll { unsafe { let task = (data as *const Task).deref(); if &task.state & COMPLETED == 0 { task.waker.set(Some(ctx.waker().clone())); Poll::Pending } else if &task.state & EMPTIED == 0 { task.state.or_assign(EMPTIED); Poll::Ready(ptr::read(&*task.data.get().deref().result)) } else { panic!("Future polled after it has already been emptied"); } } } unsafe fn drop(data: *mut u8) { unsafe { { let task = (data as *const Task).deref(); task.state.or_assign(CANCELLED); if &task.state & RUNNING == 0 { task.drop_data(); } } Task::::dec_ref_count(data as _); } } } struct SpawnedFutureVtable { poll: unsafe fn(data: *mut u8, ctx: &mut Context<'_>) -> Poll, drop: unsafe fn(data: *mut u8), } union TaskData> { result: ManuallyDrop, future: ManuallyDrop, } const RUNNING: u32 = 1; const RUN_AGAIN: u32 = 2; const COMPLETED: u32 = 4; const EMPTIED: u32 = 8; const CANCELLED: u32 = 16; struct Task> { ref_count: NumCell, phase: Phase, state: NumCell, data: UnsafeCell>, waker: Cell>, queue: Rc, #[cfg_attr(not(feature = "tracy"), expect(dead_code))] zone: ZoneName, } pub(super) struct Runnable { data: *const u8, run: unsafe fn(data: *const u8, run: bool), } impl Runnable { pub(super) fn run(self) { let slf = ManuallyDrop::new(self); unsafe { (slf.run)(slf.data, true); } } } impl Drop for Runnable { fn drop(&mut self) { unsafe { (self.run)(self.data, false); } } } impl AsyncEngine { pub(super) fn spawn_>( self: &Rc, #[cfg_attr(not(feature = "tracy"), expect(unused_variables))] name: &str, phase: Phase, f: F, ) -> SpawnedFuture { let f = Box::new(Task { ref_count: NumCell::new(1), phase, state: NumCell::new(0), data: UnsafeCell::new(TaskData { future: ManuallyDrop::new(f), }), waker: Cell::new(None), queue: self.clone(), zone: create_zone_name!("task:{}", name), }); unsafe { f.schedule_run(); } let f = Box::into_raw(f); SpawnedFuture { vtable: SpawnedFutureVTableProxy::::VTABLE, data: f as _, } } } impl> Task { const VTABLE: &'static RawWakerVTable = &RawWakerVTable::new( Self::waker_clone, Self::waker_wake, Self::waker_wake_by_ref, Self::waker_drop, ); unsafe fn run_proxy(data: *const u8, run: bool) { unsafe { let task = data as *const Self; if run { task.deref().run(); } else { Self::task_runnable_dropped(task); } Self::dec_ref_count(task); } } #[cold] unsafe fn task_runnable_dropped(task: *const Self) { unsafe { let task = task.deref(); task.state.and_assign(!RUNNING); if task.state.get() & CANCELLED != 0 { task.drop_data(); } } } unsafe fn dec_ref_count(slf: *const Self) { unsafe { if slf.deref().ref_count.fetch_sub(1) == 1 { drop(Box::from_raw(slf as *mut Self)); } } } unsafe fn inc_ref_count(&self) { self.ref_count.fetch_add(1); } unsafe fn waker_clone(data: *const ()) -> RawWaker { unsafe { let task = &mut *(data as *mut Self); task.inc_ref_count(); RawWaker::new(data, Self::VTABLE) } } unsafe fn waker_wake(data: *const ()) { unsafe { Self::waker_wake_by_ref(data); Self::waker_drop(data); } } unsafe fn waker_wake_by_ref(data: *const ()) { unsafe { (data as *const Self).deref().schedule_run(); } } unsafe fn waker_drop(data: *const ()) { unsafe { Self::dec_ref_count(data as _) } } unsafe fn schedule_run(&self) { unsafe { if &self.state & (COMPLETED | CANCELLED) == 0 { if &self.state & RUNNING == 0 { self.state.or_assign(RUNNING); self.inc_ref_count(); let data = self as *const _ as _; self.queue.push( Runnable { data, run: Self::run_proxy, }, self.phase, ); } else { self.state.or_assign(RUN_AGAIN); } } } } unsafe fn run(&self) { unsafe { if &self.state & CANCELLED == 0 { let data = self.data.get().deref_mut(); self.inc_ref_count(); let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE); let waker = Waker::from_raw(raw_waker); let mut ctx = Context::from_waker(&waker); let poll = { dynamic_zone!(self.zone); Pin::new_unchecked(&mut *data.future).poll(&mut ctx) }; if let Poll::Ready(d) = poll { ManuallyDrop::drop(&mut data.future); ptr::write(&mut data.result, ManuallyDrop::new(d)); self.state.or_assign(COMPLETED); if let Some(waker) = self.waker.take() { waker.wake(); } } } self.state.and_assign(!RUNNING); if &self.state & CANCELLED != 0 { self.drop_data(); } else if &self.state & RUN_AGAIN != 0 { self.state.and_assign(!RUN_AGAIN); self.schedule_run() } } } unsafe fn drop_data(&self) { unsafe { if &self.state & COMPLETED == 0 { ManuallyDrop::drop(&mut self.data.get().deref_mut().future); } else if &self.state & EMPTIED == 0 { ManuallyDrop::drop(&mut self.data.get().deref_mut().result); } } } }