From 4efb187e24b7f9d9bcb63f031ff7970ea4f33a1b Mon Sep 17 00:00:00 2001 From: Julian Orth Date: Sat, 8 Jan 2022 23:04:44 +0100 Subject: [PATCH] autocommit 2022-01-08 23:04:44 CET --- src/async_engine.rs | 129 ++++++++++++++++++++++--------------------- src/utils/numcell.rs | 41 +++++++++++++- src/utils/ptr_ext.rs | 3 + 3 files changed, 109 insertions(+), 64 deletions(-) diff --git a/src/async_engine.rs b/src/async_engine.rs index dbd831db..4eb7a6c7 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -171,6 +171,9 @@ mod timeout { mod task { use crate::async_engine::queue::DispatchQueue; + use crate::utils::ptr_ext::{MutPtrExt, PtrExt}; + use crate::NumCell; + use std::cell::{Cell, UnsafeCell}; use std::future::Future; use std::mem::ManuallyDrop; use std::pin::Pin; @@ -208,25 +211,27 @@ mod task { }; unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll { - let task = &mut *(data as *mut Task); - if task.state & COMPLETED == 0 { - task.waker = Some(ctx.waker().clone()); + let task = (data as *const Task).deref(); + if &task.state & COMPLETED == 0 { + task.waker.set(Some(ctx.waker().clone())); Poll::Pending - } else if task.state & EMPTIED == 0 { - task.state |= EMPTIED; - Poll::Ready(ptr::read(&*task.data.result)) + } else if &task.state & EMPTIED == 0 { + task.state.or_assign(EMPTIED); + Poll::Ready(ptr::read(&*task.data.get().deref().result)) } else { panic!("Future polled after it has already been emptied"); } } unsafe fn drop(data: *mut u8) { - let task = &mut *(data as *mut Task); - task.state |= CANCELLED; - if task.state & RUNNING == 0 { - task.drop_data(); + { + let task = (data as *const Task).deref(); + task.state.or_assign(CANCELLED); + if &task.state & RUNNING == 0 { + task.drop_data(); + } } - task.dec_ref_count(); + Task::::dec_ref_count(data as _); } } @@ -240,23 +245,23 @@ mod task { future: ManuallyDrop, } - const RUNNING: usize = 1; - const RUN_AGAIN: usize = 2; - const COMPLETED: usize = 4; - const EMPTIED: usize = 8; - const CANCELLED: usize = 16; + const RUNNING: u32 = 1; + const RUN_AGAIN: u32 = 2; + const COMPLETED: u32 = 4; + const EMPTIED: u32 = 8; + const CANCELLED: u32 = 16; struct Task> { - ref_count: u64, - state: usize, - data: TaskData, - waker: Option, + ref_count: NumCell, + state: NumCell, + data: UnsafeCell>, + waker: Cell>, queue: Rc, } pub(super) struct Runnable { - data: *mut u8, - run: unsafe fn(data: *mut u8, run: bool), + data: *const u8, + run: unsafe fn(data: *const u8, run: bool), } impl Runnable { @@ -278,13 +283,13 @@ mod task { impl DispatchQueue { pub(super) fn spawn>(self: &Rc, f: F) -> SpawnedFuture { - let mut f = Box::new(Task { - ref_count: 1, - state: 0, - data: TaskData { + let f = Box::new(Task { + ref_count: NumCell::new(1), + state: NumCell::new(0), + data: UnsafeCell::new(TaskData { future: ManuallyDrop::new(f), - }, - waker: None, + }), + waker: Cell::new(None), queue: self.clone(), }); unsafe { @@ -306,23 +311,22 @@ mod task { Self::waker_drop, ); - unsafe fn run_proxy(data: *mut u8, run: bool) { - let task = &mut *(data as *mut Self); + unsafe fn run_proxy(data: *const u8, run: bool) { + let task = data as *const Self; if run { - task.run(); + task.deref().run(); } - task.dec_ref_count(); + Self::dec_ref_count(task); } - unsafe fn dec_ref_count(&mut self) { - self.ref_count -= 1; - if self.ref_count == 0 { - Box::from_raw(self); + unsafe fn dec_ref_count(slf: *const Self) { + if slf.deref().ref_count.fetch_sub(1) == 1 { + Box::from_raw(slf as *mut Self); } } - unsafe fn inc_ref_count(&mut self) { - self.ref_count += 1; + unsafe fn inc_ref_count(&self) { + self.ref_count.fetch_add(1); } unsafe fn waker_clone(data: *const ()) -> RawWaker { @@ -337,63 +341,62 @@ mod task { } unsafe fn waker_wake_by_ref(data: *const ()) { - let task = &mut *(data as *mut Self); - task.schedule_run(); + (data as *const Self).deref().schedule_run(); } unsafe fn waker_drop(data: *const ()) { - let task = &mut *(data as *mut Self); - task.dec_ref_count(); + Self::dec_ref_count(data as _) } - unsafe fn schedule_run(&mut self) { - if self.state & (COMPLETED | CANCELLED) == 0 { - if self.state & RUNNING == 0 { - self.state |= RUNNING; + unsafe fn schedule_run(&self) { + if &self.state & (COMPLETED | CANCELLED) == 0 { + if &self.state & RUNNING == 0 { + self.state.or_assign(RUNNING); self.inc_ref_count(); - let data = self as *mut _ as _; + let data = self as *const _ as _; self.queue.push(Runnable { data, run: Self::run_proxy, }); } else { - self.state |= RUN_AGAIN; + self.state.or_assign(RUN_AGAIN); } } } - unsafe fn run(&mut self) { - if self.state & CANCELLED == 0 { + unsafe fn run(&self) { + if &self.state & CANCELLED == 0 { + let data = self.data.get().deref_mut(); self.inc_ref_count(); let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE); let waker = Waker::from_raw(raw_waker); let mut ctx = Context::from_waker(&waker); - if let Poll::Ready(d) = Pin::new_unchecked(&mut *self.data.future).poll(&mut ctx) { - ManuallyDrop::drop(&mut self.data.future); - ptr::write(&mut self.data.result, ManuallyDrop::new(d)); - self.state |= COMPLETED; + if let Poll::Ready(d) = Pin::new_unchecked(&mut *data.future).poll(&mut ctx) { + ManuallyDrop::drop(&mut data.future); + ptr::write(&mut data.result, ManuallyDrop::new(d)); + self.state.or_assign(COMPLETED); if let Some(waker) = self.waker.take() { waker.wake(); } } } - self.state &= !RUNNING; + self.state.and_assign(!RUNNING); - if self.state & CANCELLED != 0 { + if &self.state & CANCELLED != 0 { self.drop_data(); - } else if self.state & RUN_AGAIN != 0 { - self.state &= !RUN_AGAIN; + } else if &self.state & RUN_AGAIN != 0 { + self.state.and_assign(!RUN_AGAIN); self.schedule_run() } } - unsafe fn drop_data(&mut self) { - if self.state & COMPLETED == 0 { - ManuallyDrop::drop(&mut self.data.future); - } else if self.state & EMPTIED == 0 { - ManuallyDrop::drop(&mut self.data.result); + unsafe fn drop_data(&self) { + if &self.state & COMPLETED == 0 { + ManuallyDrop::drop(&mut self.data.get().deref_mut().future); + } else if &self.state & EMPTIED == 0 { + ManuallyDrop::drop(&mut self.data.get().deref_mut().result); } } } diff --git a/src/utils/numcell.rs b/src/utils/numcell.rs index 1c2007c1..de8c0f1a 100644 --- a/src/utils/numcell.rs +++ b/src/utils/numcell.rs @@ -1,5 +1,5 @@ use std::cell::Cell; -use std::ops::{Add, Sub}; +use std::ops::{Add, BitAnd, BitOr, Sub}; #[derive(Default)] pub struct NumCell { @@ -7,14 +7,17 @@ pub struct NumCell { } impl NumCell { + #[inline(always)] pub fn new(t: T) -> Self { Self { t: Cell::new(t) } } + #[inline(always)] pub fn replace(&self, n: T) -> T { self.t.replace(n) } + #[inline(always)] pub fn load(&self) -> T where T: Copy, @@ -22,6 +25,7 @@ impl NumCell { self.t.get() } + #[inline(always)] pub fn fetch_add(&self, n: T) -> T where T: Copy + Add, @@ -31,6 +35,7 @@ impl NumCell { res } + #[inline(always)] pub fn fetch_sub(&self, n: T) -> T where T: Copy + Sub, @@ -39,4 +44,38 @@ impl NumCell { self.t.set(res - n); res } + + #[inline(always)] + pub fn or_assign(&self, n: T) + where + T: Copy + BitOr, + { + self.t.set(self.t.get() | n); + } + + #[inline(always)] + pub fn and_assign(&self, n: T) + where + T: Copy + BitAnd, + { + self.t.set(self.t.get() & n); + } +} + +impl + Copy> BitOr for &'_ NumCell { + type Output = T; + + #[inline(always)] + fn bitor(self, rhs: T) -> Self::Output { + self.t.get() | rhs + } +} + +impl + Copy> BitAnd for &'_ NumCell { + type Output = T; + + #[inline(always)] + fn bitand(self, rhs: T) -> Self::Output { + self.t.get() & rhs + } } diff --git a/src/utils/ptr_ext.rs b/src/utils/ptr_ext.rs index bf098523..31e73ec2 100644 --- a/src/utils/ptr_ext.rs +++ b/src/utils/ptr_ext.rs @@ -7,18 +7,21 @@ pub trait MutPtrExt { } impl PtrExt for *const T { + #[inline(always)] unsafe fn deref<'a>(self) -> &'a T { &*self } } impl PtrExt for *mut T { + #[inline(always)] unsafe fn deref<'a>(self) -> &'a T { &*self } } impl MutPtrExt for *mut T { + #[inline(always)] unsafe fn deref_mut<'a>(self) -> &'a mut T { &mut *self }