diff --git a/src/async_engine.rs b/src/async_engine.rs index 83e3a538..2f669e08 100644 --- a/src/async_engine.rs +++ b/src/async_engine.rs @@ -1,9 +1,16 @@ +mod ae_fd; +mod ae_queue; +mod ae_task; +mod ae_timeout; +mod ae_timer; +mod ae_yield; + pub use { - crate::async_engine::yield_::Yield, - fd::{AsyncFd, FdStatus}, - task::SpawnedFuture, - timeout::Timeout, - timer::Timer, + crate::async_engine::ae_yield::Yield, + ae_fd::{AsyncFd, FdStatus}, + ae_task::SpawnedFuture, + ae_timeout::Timeout, + ae_timer::Timer, }; use { crate::{ @@ -11,15 +18,15 @@ use { utils::{copyhashmap::CopyHashMap, numcell::NumCell, oserror::OsError}, wheel::{Wheel, WheelError}, }, - fd::AsyncFdData, - queue::{DispatchQueue, Dispatcher}, + ae_fd::AsyncFdData, + ae_queue::{DispatchQueue, Dispatcher}, + ae_timeout::TimeoutData, std::{ cell::{Cell, RefCell}, future::Future, rc::Rc, }, thiserror::Error, - timeout::TimeoutData, uapi::{c, OwnedFd}, }; @@ -134,712 +141,3 @@ impl AsyncEngine { } } } - -mod yield_ { - use { - crate::async_engine::queue::DispatchQueue, - std::{ - future::Future, - pin::Pin, - rc::Rc, - task::{Context, Poll}, - }, - }; - - pub struct Yield { - pub(super) iteration: u64, - pub(super) queue: Rc, - } - - impl Future for Yield { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.queue.iteration() > self.iteration { - Poll::Ready(()) - } else { - self.queue.push_yield(cx.waker().clone()); - Poll::Pending - } - } - } -} - -mod timer { - use { - crate::async_engine::{AsyncEngine, AsyncError, AsyncFd}, - std::{rc::Rc, time::Duration}, - uapi::c, - }; - - #[derive(Clone)] - pub struct Timer { - fd: AsyncFd, - } - - impl Timer { - pub(super) fn new(eng: &Rc, clock_id: c::c_int) -> Result { - let fd = match uapi::timerfd_create(clock_id, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { - Ok(fd) => fd, - Err(e) => return Err(AsyncError::CreateTimer(e.into())), - }; - let afd = eng.fd(&Rc::new(fd))?; - Ok(Self { fd: afd }) - } - - pub async fn expired(&self) -> Result { - self.fd.readable().await?; - let mut buf = 0u64; - if let Err(e) = uapi::read(self.fd.raw(), &mut buf) { - return Err(AsyncError::TimerReadError(e.into())); - } - Ok(buf) - } - - pub fn program( - &self, - initial: Option, - periodic: Option, - ) -> Result<(), AsyncError> { - let mut timerspec: c::itimerspec = uapi::pod_zeroed(); - if let Some(init) = initial { - timerspec.it_value.tv_sec = init.as_secs() as _; - timerspec.it_value.tv_nsec = init.subsec_nanos() as _; - if let Some(per) = periodic { - timerspec.it_interval.tv_sec = per.as_secs() as _; - timerspec.it_interval.tv_nsec = per.subsec_nanos() as _; - } - } - if let Err(e) = uapi::timerfd_settime(self.fd.raw(), 0, &timerspec) { - return Err(AsyncError::SetTimer(e.into())); - } - Ok(()) - } - } -} - -mod timeout { - use { - crate::wheel::{Wheel, WheelDispatcher, WheelId}, - std::{ - cell::{Cell, RefCell}, - error::Error, - future::Future, - pin::Pin, - rc::Rc, - task::{Context, Poll, Waker}, - }, - }; - - pub(super) struct TimeoutData { - pub expired: Cell, - pub waker: RefCell>, - } - - impl WheelDispatcher for TimeoutData { - fn dispatch(self: Rc) -> Result<(), Box> { - self.expired.set(true); - if let Some(w) = self.waker.borrow_mut().take() { - w.wake(); - } - Ok(()) - } - } - - pub struct Timeout { - pub(super) id: WheelId, - pub(super) wheel: Rc, - pub(super) data: Rc, - } - - impl Future for Timeout { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.data.expired.get() { - Poll::Ready(()) - } else { - *self.data.waker.borrow_mut() = Some(cx.waker().clone()); - Poll::Pending - } - } - } - - impl Drop for Timeout { - fn drop(&mut self) { - self.wheel.remove(self.id); - } - } -} - -mod task { - use { - crate::{ - async_engine::{queue::DispatchQueue, Phase}, - utils::{ - numcell::NumCell, - ptr_ext::{MutPtrExt, PtrExt}, - }, - }, - std::{ - cell::{Cell, UnsafeCell}, - future::Future, - mem::ManuallyDrop, - pin::Pin, - ptr, - rc::Rc, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, - }, - }; - - #[must_use] - pub struct SpawnedFuture { - vtable: &'static SpawnedFutureVtable, - data: *mut u8, - } - - impl Future for SpawnedFuture { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unsafe { (self.vtable.poll)(self.data, cx) } - } - } - - impl Drop for SpawnedFuture { - fn drop(&mut self) { - unsafe { - (self.vtable.drop)(self.data); - } - } - } - - struct SpawnedFutureVTableProxy(T, F); - - impl> SpawnedFutureVTableProxy { - const VTABLE: &'static SpawnedFutureVtable = &SpawnedFutureVtable { - poll: Self::poll, - drop: Self::drop, - }; - - unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll { - let task = (data as *const Task).deref(); - if &task.state & COMPLETED == 0 { - task.waker.set(Some(ctx.waker().clone())); - Poll::Pending - } else if &task.state & EMPTIED == 0 { - task.state.or_assign(EMPTIED); - Poll::Ready(ptr::read(&*task.data.get().deref().result)) - } else { - panic!("Future polled after it has already been emptied"); - } - } - - unsafe fn drop(data: *mut u8) { - { - let task = (data as *const Task).deref(); - task.state.or_assign(CANCELLED); - if &task.state & RUNNING == 0 { - task.drop_data(); - } - } - Task::::dec_ref_count(data as _); - } - } - - struct SpawnedFutureVtable { - poll: unsafe fn(data: *mut u8, ctx: &mut Context<'_>) -> Poll, - drop: unsafe fn(data: *mut u8), - } - - union TaskData> { - result: ManuallyDrop, - future: ManuallyDrop, - } - - const RUNNING: u32 = 1; - const RUN_AGAIN: u32 = 2; - const COMPLETED: u32 = 4; - const EMPTIED: u32 = 8; - const CANCELLED: u32 = 16; - - struct Task> { - ref_count: NumCell, - phase: Phase, - state: NumCell, - data: UnsafeCell>, - waker: Cell>, - queue: Rc, - } - - pub(super) struct Runnable { - data: *const u8, - run: unsafe fn(data: *const u8, run: bool), - } - - impl Runnable { - pub(super) fn run(self) { - let slf = ManuallyDrop::new(self); - unsafe { - (slf.run)(slf.data, true); - } - } - } - - impl Drop for Runnable { - fn drop(&mut self) { - unsafe { - (self.run)(self.data, false); - } - } - } - - impl DispatchQueue { - pub(super) fn spawn>( - self: &Rc, - phase: Phase, - f: F, - ) -> SpawnedFuture { - let f = Box::new(Task { - ref_count: NumCell::new(1), - phase, - state: NumCell::new(0), - data: UnsafeCell::new(TaskData { - future: ManuallyDrop::new(f), - }), - waker: Cell::new(None), - queue: self.clone(), - }); - unsafe { - f.schedule_run(); - } - let f = Box::into_raw(f); - SpawnedFuture { - vtable: SpawnedFutureVTableProxy::::VTABLE, - data: f as _, - } - } - } - - impl> Task { - const VTABLE: &'static RawWakerVTable = &RawWakerVTable::new( - Self::waker_clone, - Self::waker_wake, - Self::waker_wake_by_ref, - Self::waker_drop, - ); - - unsafe fn run_proxy(data: *const u8, run: bool) { - let task = data as *const Self; - if run { - task.deref().run(); - } - Self::dec_ref_count(task); - } - - unsafe fn dec_ref_count(slf: *const Self) { - if slf.deref().ref_count.fetch_sub(1) == 1 { - Box::from_raw(slf as *mut Self); - } - } - - unsafe fn inc_ref_count(&self) { - self.ref_count.fetch_add(1); - } - - unsafe fn waker_clone(data: *const ()) -> RawWaker { - let task = &mut *(data as *mut Self); - task.inc_ref_count(); - RawWaker::new(data, Self::VTABLE) - } - - unsafe fn waker_wake(data: *const ()) { - Self::waker_wake_by_ref(data); - Self::waker_drop(data); - } - - unsafe fn waker_wake_by_ref(data: *const ()) { - (data as *const Self).deref().schedule_run(); - } - - unsafe fn waker_drop(data: *const ()) { - Self::dec_ref_count(data as _) - } - - unsafe fn schedule_run(&self) { - if &self.state & (COMPLETED | CANCELLED) == 0 { - if &self.state & RUNNING == 0 { - self.state.or_assign(RUNNING); - self.inc_ref_count(); - let data = self as *const _ as _; - self.queue.push( - Runnable { - data, - run: Self::run_proxy, - }, - self.phase, - ); - } else { - self.state.or_assign(RUN_AGAIN); - } - } - } - - unsafe fn run(&self) { - if &self.state & CANCELLED == 0 { - let data = self.data.get().deref_mut(); - self.inc_ref_count(); - let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE); - let waker = Waker::from_raw(raw_waker); - - let mut ctx = Context::from_waker(&waker); - if let Poll::Ready(d) = Pin::new_unchecked(&mut *data.future).poll(&mut ctx) { - ManuallyDrop::drop(&mut data.future); - ptr::write(&mut data.result, ManuallyDrop::new(d)); - self.state.or_assign(COMPLETED); - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } - } - - self.state.and_assign(!RUNNING); - - if &self.state & CANCELLED != 0 { - self.drop_data(); - } else if &self.state & RUN_AGAIN != 0 { - self.state.and_assign(!RUN_AGAIN); - self.schedule_run() - } - } - - unsafe fn drop_data(&self) { - if &self.state & COMPLETED == 0 { - ManuallyDrop::drop(&mut self.data.get().deref_mut().future); - } else if &self.state & EMPTIED == 0 { - ManuallyDrop::drop(&mut self.data.get().deref_mut().result); - } - } - } -} - -mod queue { - use { - crate::{ - async_engine::{task::Runnable, AsyncError, Phase, NUM_PHASES}, - event_loop::{EventLoop, EventLoopDispatcher, EventLoopId}, - utils::{array, numcell::NumCell, syncqueue::SyncQueue}, - }, - std::{ - cell::{Cell, RefCell}, - collections::VecDeque, - error::Error, - rc::Rc, - task::Waker, - }, - }; - - pub(super) struct Dispatcher { - queue: Rc, - stash: RefCell>, - yield_stash: RefCell>, - } - - impl Dispatcher { - pub fn install(el: &Rc) -> Result, AsyncError> { - let id = el.id(); - let queue = Rc::new(DispatchQueue { - id, - el: el.clone(), - dispatch_scheduled: Cell::new(false), - num_queued: Default::default(), - queues: array::from_fn(|_| Default::default()), - iteration: Default::default(), - yields: Default::default(), - }); - let slf = Rc::new(Dispatcher { - queue: queue.clone(), - stash: Default::default(), - yield_stash: Default::default(), - }); - el.insert(id, None, 0, slf)?; - Ok(queue) - } - } - - impl EventLoopDispatcher for Dispatcher { - fn dispatch(self: Rc, _fd: Option, _events: i32) -> Result<(), Box> { - let mut stash = self.stash.borrow_mut(); - let mut yield_stash = self.yield_stash.borrow_mut(); - while self.queue.num_queued.get() > 0 { - self.queue.iteration.fetch_add(1); - let mut phase = 0; - while phase < NUM_PHASES as usize { - self.queue.queues[phase].swap(&mut *stash); - if stash.is_empty() { - phase += 1; - continue; - } - self.queue.num_queued.fetch_sub(stash.len()); - for runnable in stash.drain(..) { - runnable.run(); - } - } - self.queue.yields.swap(&mut *yield_stash); - for waker in yield_stash.drain(..) { - waker.wake(); - } - } - self.queue.dispatch_scheduled.set(false); - Ok(()) - } - } - - impl Drop for Dispatcher { - fn drop(&mut self) { - let _ = self.queue.el.remove(self.queue.id); - for queue in &self.queue.queues { - queue.swap(&mut VecDeque::new()); - } - } - } - - pub(super) struct DispatchQueue { - dispatch_scheduled: Cell, - id: EventLoopId, - el: Rc, - num_queued: NumCell, - queues: [SyncQueue; NUM_PHASES], - iteration: NumCell, - yields: SyncQueue, - } - - impl DispatchQueue { - pub fn clear(&self) { - self.yields.take(); - for queue in &self.queues { - queue.take(); - } - } - - pub fn push(&self, runnable: Runnable, phase: Phase) { - self.queues[phase as usize].push(runnable); - self.num_queued.fetch_add(1); - if !self.dispatch_scheduled.get() { - let _ = self.el.schedule(self.id); - self.dispatch_scheduled.set(true); - } - } - - pub fn push_yield(&self, waker: Waker) { - self.yields.push(waker); - } - - pub fn iteration(&self) -> u64 { - self.iteration.get() - } - } -} - -mod fd { - use { - crate::{ - async_engine::{AsyncEngine, AsyncError}, - event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId}, - utils::numcell::NumCell, - }, - std::{ - cell::{Cell, RefCell}, - error::Error, - fmt::{Debug, Formatter}, - future::Future, - pin::Pin, - rc::Rc, - task::{Context, Poll, Waker}, - }, - uapi::{c, OwnedFd}, - }; - - type Queue = RefCell>>)>>; - - pub(super) struct AsyncFdData { - pub(super) ref_count: NumCell, - pub(super) fd: Rc, - pub(super) id: EventLoopId, - pub(super) el: Rc, - pub(super) write_registered: Cell, - pub(super) read_registered: Cell, - pub(super) readers: Queue, - pub(super) writers: Queue, - } - - impl AsyncFdData { - fn update_interests(&self) -> Result<(), EventLoopError> { - let mut events = 0; - if self.write_registered.get() { - events |= c::EPOLLOUT; - } - if self.read_registered.get() { - events |= c::EPOLLIN; - } - let res = self.el.modify(self.id, events); - if res.is_err() { - if let Err(e) = self.el.remove(self.id) { - log::error!( - "Fatal error: Cannot remove file descriptor from event loop: {:?}", - e - ); - self.el.stop(); - } - } - res - } - - fn poll( - &self, - woken: &Rc>>, - cx: &mut Context<'_>, - registered: impl Fn(&AsyncFdData) -> &Cell, - queue: impl Fn(&AsyncFdData) -> &Queue, - ) -> Poll> { - if let Some(status) = woken.get() { - return Poll::Ready(Ok(status)); - } - if !registered(self).get() { - registered(self).set(true); - if let Err(e) = self.update_interests() { - return Poll::Ready(Err(AsyncError::EventLoopError(e))); - } - } - queue(self) - .borrow_mut() - .push((cx.waker().clone(), woken.clone())); - Poll::Pending - } - } - - impl EventLoopDispatcher for AsyncFdData { - fn dispatch(self: Rc, _fd: Option, events: i32) -> Result<(), Box> { - let mut status = FdStatus::Ok; - if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { - status = FdStatus::Err; - if let Err(e) = self.el.remove(self.id) { - return Err(Box::new(e)); - } - } - let mut woke_any = false; - if events & c::EPOLLIN != 0 || status == FdStatus::Err { - let mut readers = self.readers.borrow_mut(); - woke_any |= !readers.is_empty(); - for (waker, woken) in readers.drain(..) { - woken.set(Some(status)); - waker.wake(); - } - } - if events & c::EPOLLOUT != 0 || status == FdStatus::Err { - let mut writers = self.writers.borrow_mut(); - woke_any |= !writers.is_empty(); - for (waker, woken) in writers.drain(..) { - woken.set(Some(status)); - waker.wake(); - } - } - if !woke_any && status == FdStatus::Ok { - self.read_registered.set(false); - self.write_registered.set(false); - if let Err(e) = self.update_interests() { - return Err(Box::new(e)); - } - } - Ok(()) - } - } - - pub struct AsyncFd { - pub(super) engine: Rc, - pub(super) data: Rc, - } - - impl Debug for AsyncFd { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AsyncFd").finish_non_exhaustive() - } - } - - impl Clone for AsyncFd { - fn clone(&self) -> Self { - self.data.ref_count.fetch_add(1); - Self { - engine: self.engine.clone(), - data: self.data.clone(), - } - } - } - - impl Drop for AsyncFd { - fn drop(&mut self) { - if self.data.ref_count.fetch_sub(1) == 1 { - self.engine.fds.remove(&self.data.fd.raw()); - let _ = self.data.el.remove(self.data.id); - } - } - } - - impl AsyncFd { - pub fn raw(&self) -> i32 { - self.data.fd.raw() - } - - pub fn eng(&self) -> &Rc { - &self.engine - } - - pub fn readable(&self) -> AsyncFdReadable { - AsyncFdReadable { - fd: self, - woken: Rc::new(Cell::new(None)), - } - } - - pub fn writable(&self) -> AsyncFdWritable { - AsyncFdWritable { - fd: self, - woken: Rc::new(Cell::new(None)), - } - } - } - - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub enum FdStatus { - Ok, - Err, - } - - pub struct AsyncFdReadable<'a> { - fd: &'a AsyncFd, - woken: Rc>>, - } - - impl<'a> Future for AsyncFdReadable<'a> { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let data = &self.fd.data; - data.poll(&self.woken, cx, |d| &d.read_registered, |d| &d.readers) - } - } - - pub struct AsyncFdWritable<'a> { - fd: &'a AsyncFd, - woken: Rc>>, - } - - impl<'a> Future for AsyncFdWritable<'a> { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let data = &self.fd.data; - data.poll(&self.woken, cx, |d| &d.write_registered, |d| &d.writers) - } - } -} diff --git a/src/async_engine/ae_fd.rs b/src/async_engine/ae_fd.rs new file mode 100644 index 00000000..8ae6b922 --- /dev/null +++ b/src/async_engine/ae_fd.rs @@ -0,0 +1,200 @@ +use { + crate::{ + async_engine::{AsyncEngine, AsyncError}, + event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId}, + utils::numcell::NumCell, + }, + std::{ + cell::{Cell, RefCell}, + error::Error, + fmt::{Debug, Formatter}, + future::Future, + pin::Pin, + rc::Rc, + task::{Context, Poll, Waker}, + }, + uapi::{c, OwnedFd}, +}; + +type Queue = RefCell>>)>>; + +pub(super) struct AsyncFdData { + pub(super) ref_count: NumCell, + pub(super) fd: Rc, + pub(super) id: EventLoopId, + pub(super) el: Rc, + pub(super) write_registered: Cell, + pub(super) read_registered: Cell, + pub(super) readers: Queue, + pub(super) writers: Queue, +} + +impl AsyncFdData { + fn update_interests(&self) -> Result<(), EventLoopError> { + let mut events = 0; + if self.write_registered.get() { + events |= c::EPOLLOUT; + } + if self.read_registered.get() { + events |= c::EPOLLIN; + } + let res = self.el.modify(self.id, events); + if res.is_err() { + if let Err(e) = self.el.remove(self.id) { + log::error!( + "Fatal error: Cannot remove file descriptor from event loop: {:?}", + e + ); + self.el.stop(); + } + } + res + } + + fn poll( + &self, + woken: &Rc>>, + cx: &mut Context<'_>, + registered: impl Fn(&AsyncFdData) -> &Cell, + queue: impl Fn(&AsyncFdData) -> &Queue, + ) -> Poll> { + if let Some(status) = woken.get() { + return Poll::Ready(Ok(status)); + } + if !registered(self).get() { + registered(self).set(true); + if let Err(e) = self.update_interests() { + return Poll::Ready(Err(AsyncError::EventLoopError(e))); + } + } + queue(self) + .borrow_mut() + .push((cx.waker().clone(), woken.clone())); + Poll::Pending + } +} + +impl EventLoopDispatcher for AsyncFdData { + fn dispatch(self: Rc, _fd: Option, events: i32) -> Result<(), Box> { + let mut status = FdStatus::Ok; + if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { + status = FdStatus::Err; + if let Err(e) = self.el.remove(self.id) { + return Err(Box::new(e)); + } + } + let mut woke_any = false; + if events & c::EPOLLIN != 0 || status == FdStatus::Err { + let mut readers = self.readers.borrow_mut(); + woke_any |= !readers.is_empty(); + for (waker, woken) in readers.drain(..) { + woken.set(Some(status)); + waker.wake(); + } + } + if events & c::EPOLLOUT != 0 || status == FdStatus::Err { + let mut writers = self.writers.borrow_mut(); + woke_any |= !writers.is_empty(); + for (waker, woken) in writers.drain(..) { + woken.set(Some(status)); + waker.wake(); + } + } + if !woke_any && status == FdStatus::Ok { + self.read_registered.set(false); + self.write_registered.set(false); + if let Err(e) = self.update_interests() { + return Err(Box::new(e)); + } + } + Ok(()) + } +} + +pub struct AsyncFd { + pub(super) engine: Rc, + pub(super) data: Rc, +} + +impl Debug for AsyncFd { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncFd").finish_non_exhaustive() + } +} + +impl Clone for AsyncFd { + fn clone(&self) -> Self { + self.data.ref_count.fetch_add(1); + Self { + engine: self.engine.clone(), + data: self.data.clone(), + } + } +} + +impl Drop for AsyncFd { + fn drop(&mut self) { + if self.data.ref_count.fetch_sub(1) == 1 { + self.engine.fds.remove(&self.data.fd.raw()); + let _ = self.data.el.remove(self.data.id); + } + } +} + +impl AsyncFd { + pub fn raw(&self) -> i32 { + self.data.fd.raw() + } + + pub fn eng(&self) -> &Rc { + &self.engine + } + + pub fn readable(&self) -> AsyncFdReadable { + AsyncFdReadable { + fd: self, + woken: Rc::new(Cell::new(None)), + } + } + + pub fn writable(&self) -> AsyncFdWritable { + AsyncFdWritable { + fd: self, + woken: Rc::new(Cell::new(None)), + } + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum FdStatus { + Ok, + Err, +} + +pub struct AsyncFdReadable<'a> { + fd: &'a AsyncFd, + woken: Rc>>, +} + +impl<'a> Future for AsyncFdReadable<'a> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let data = &self.fd.data; + data.poll(&self.woken, cx, |d| &d.read_registered, |d| &d.readers) + } +} + +pub struct AsyncFdWritable<'a> { + fd: &'a AsyncFd, + woken: Rc>>, +} + +impl<'a> Future for AsyncFdWritable<'a> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let data = &self.fd.data; + data.poll(&self.woken, cx, |d| &d.write_registered, |d| &d.writers) + } +} diff --git a/src/async_engine/ae_queue.rs b/src/async_engine/ae_queue.rs new file mode 100644 index 00000000..58c4f9ea --- /dev/null +++ b/src/async_engine/ae_queue.rs @@ -0,0 +1,115 @@ +use { + crate::{ + async_engine::{ae_task::Runnable, AsyncError, Phase, NUM_PHASES}, + event_loop::{EventLoop, EventLoopDispatcher, EventLoopId}, + utils::{array, numcell::NumCell, syncqueue::SyncQueue}, + }, + std::{ + cell::{Cell, RefCell}, + collections::VecDeque, + error::Error, + rc::Rc, + task::Waker, + }, +}; + +pub(super) struct Dispatcher { + queue: Rc, + stash: RefCell>, + yield_stash: RefCell>, +} + +impl Dispatcher { + pub fn install(el: &Rc) -> Result, AsyncError> { + let id = el.id(); + let queue = Rc::new(DispatchQueue { + id, + el: el.clone(), + dispatch_scheduled: Cell::new(false), + num_queued: Default::default(), + queues: array::from_fn(|_| Default::default()), + iteration: Default::default(), + yields: Default::default(), + }); + let slf = Rc::new(Dispatcher { + queue: queue.clone(), + stash: Default::default(), + yield_stash: Default::default(), + }); + el.insert(id, None, 0, slf)?; + Ok(queue) + } +} + +impl EventLoopDispatcher for Dispatcher { + fn dispatch(self: Rc, _fd: Option, _events: i32) -> Result<(), Box> { + let mut stash = self.stash.borrow_mut(); + let mut yield_stash = self.yield_stash.borrow_mut(); + while self.queue.num_queued.get() > 0 { + self.queue.iteration.fetch_add(1); + let mut phase = 0; + while phase < NUM_PHASES as usize { + self.queue.queues[phase].swap(&mut *stash); + if stash.is_empty() { + phase += 1; + continue; + } + self.queue.num_queued.fetch_sub(stash.len()); + for runnable in stash.drain(..) { + runnable.run(); + } + } + self.queue.yields.swap(&mut *yield_stash); + for waker in yield_stash.drain(..) { + waker.wake(); + } + } + self.queue.dispatch_scheduled.set(false); + Ok(()) + } +} + +impl Drop for Dispatcher { + fn drop(&mut self) { + let _ = self.queue.el.remove(self.queue.id); + for queue in &self.queue.queues { + queue.swap(&mut VecDeque::new()); + } + } +} + +pub(super) struct DispatchQueue { + dispatch_scheduled: Cell, + id: EventLoopId, + el: Rc, + num_queued: NumCell, + queues: [SyncQueue; NUM_PHASES], + iteration: NumCell, + yields: SyncQueue, +} + +impl DispatchQueue { + pub fn clear(&self) { + self.yields.take(); + for queue in &self.queues { + queue.take(); + } + } + + pub fn push(&self, runnable: Runnable, phase: Phase) { + self.queues[phase as usize].push(runnable); + self.num_queued.fetch_add(1); + if !self.dispatch_scheduled.get() { + let _ = self.el.schedule(self.id); + self.dispatch_scheduled.set(true); + } + } + + pub fn push_yield(&self, waker: Waker) { + self.yields.push(waker); + } + + pub fn iteration(&self) -> u64 { + self.iteration.get() + } +} diff --git a/src/async_engine/ae_task.rs b/src/async_engine/ae_task.rs new file mode 100644 index 00000000..a147a99e --- /dev/null +++ b/src/async_engine/ae_task.rs @@ -0,0 +1,248 @@ +use { + crate::{ + async_engine::{ae_queue::DispatchQueue, Phase}, + utils::{ + numcell::NumCell, + ptr_ext::{MutPtrExt, PtrExt}, + }, + }, + std::{ + cell::{Cell, UnsafeCell}, + future::Future, + mem::ManuallyDrop, + pin::Pin, + ptr, + rc::Rc, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + }, +}; + +#[must_use] +pub struct SpawnedFuture { + vtable: &'static SpawnedFutureVtable, + data: *mut u8, +} + +impl Future for SpawnedFuture { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { (self.vtable.poll)(self.data, cx) } + } +} + +impl Drop for SpawnedFuture { + fn drop(&mut self) { + unsafe { + (self.vtable.drop)(self.data); + } + } +} + +struct SpawnedFutureVTableProxy(T, F); + +impl> SpawnedFutureVTableProxy { + const VTABLE: &'static SpawnedFutureVtable = &SpawnedFutureVtable { + poll: Self::poll, + drop: Self::drop, + }; + + unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll { + let task = (data as *const Task).deref(); + if &task.state & COMPLETED == 0 { + task.waker.set(Some(ctx.waker().clone())); + Poll::Pending + } else if &task.state & EMPTIED == 0 { + task.state.or_assign(EMPTIED); + Poll::Ready(ptr::read(&*task.data.get().deref().result)) + } else { + panic!("Future polled after it has already been emptied"); + } + } + + unsafe fn drop(data: *mut u8) { + { + let task = (data as *const Task).deref(); + task.state.or_assign(CANCELLED); + if &task.state & RUNNING == 0 { + task.drop_data(); + } + } + Task::::dec_ref_count(data as _); + } +} + +struct SpawnedFutureVtable { + poll: unsafe fn(data: *mut u8, ctx: &mut Context<'_>) -> Poll, + drop: unsafe fn(data: *mut u8), +} + +union TaskData> { + result: ManuallyDrop, + future: ManuallyDrop, +} + +const RUNNING: u32 = 1; +const RUN_AGAIN: u32 = 2; +const COMPLETED: u32 = 4; +const EMPTIED: u32 = 8; +const CANCELLED: u32 = 16; + +struct Task> { + ref_count: NumCell, + phase: Phase, + state: NumCell, + data: UnsafeCell>, + waker: Cell>, + queue: Rc, +} + +pub(super) struct Runnable { + data: *const u8, + run: unsafe fn(data: *const u8, run: bool), +} + +impl Runnable { + pub(super) fn run(self) { + let slf = ManuallyDrop::new(self); + unsafe { + (slf.run)(slf.data, true); + } + } +} + +impl Drop for Runnable { + fn drop(&mut self) { + unsafe { + (self.run)(self.data, false); + } + } +} + +impl DispatchQueue { + pub(super) fn spawn>( + self: &Rc, + phase: Phase, + f: F, + ) -> SpawnedFuture { + let f = Box::new(Task { + ref_count: NumCell::new(1), + phase, + state: NumCell::new(0), + data: UnsafeCell::new(TaskData { + future: ManuallyDrop::new(f), + }), + waker: Cell::new(None), + queue: self.clone(), + }); + unsafe { + f.schedule_run(); + } + let f = Box::into_raw(f); + SpawnedFuture { + vtable: SpawnedFutureVTableProxy::::VTABLE, + data: f as _, + } + } +} + +impl> Task { + const VTABLE: &'static RawWakerVTable = &RawWakerVTable::new( + Self::waker_clone, + Self::waker_wake, + Self::waker_wake_by_ref, + Self::waker_drop, + ); + + unsafe fn run_proxy(data: *const u8, run: bool) { + let task = data as *const Self; + if run { + task.deref().run(); + } + Self::dec_ref_count(task); + } + + unsafe fn dec_ref_count(slf: *const Self) { + if slf.deref().ref_count.fetch_sub(1) == 1 { + Box::from_raw(slf as *mut Self); + } + } + + unsafe fn inc_ref_count(&self) { + self.ref_count.fetch_add(1); + } + + unsafe fn waker_clone(data: *const ()) -> RawWaker { + let task = &mut *(data as *mut Self); + task.inc_ref_count(); + RawWaker::new(data, Self::VTABLE) + } + + unsafe fn waker_wake(data: *const ()) { + Self::waker_wake_by_ref(data); + Self::waker_drop(data); + } + + unsafe fn waker_wake_by_ref(data: *const ()) { + (data as *const Self).deref().schedule_run(); + } + + unsafe fn waker_drop(data: *const ()) { + Self::dec_ref_count(data as _) + } + + unsafe fn schedule_run(&self) { + if &self.state & (COMPLETED | CANCELLED) == 0 { + if &self.state & RUNNING == 0 { + self.state.or_assign(RUNNING); + self.inc_ref_count(); + let data = self as *const _ as _; + self.queue.push( + Runnable { + data, + run: Self::run_proxy, + }, + self.phase, + ); + } else { + self.state.or_assign(RUN_AGAIN); + } + } + } + + unsafe fn run(&self) { + if &self.state & CANCELLED == 0 { + let data = self.data.get().deref_mut(); + self.inc_ref_count(); + let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE); + let waker = Waker::from_raw(raw_waker); + + let mut ctx = Context::from_waker(&waker); + if let Poll::Ready(d) = Pin::new_unchecked(&mut *data.future).poll(&mut ctx) { + ManuallyDrop::drop(&mut data.future); + ptr::write(&mut data.result, ManuallyDrop::new(d)); + self.state.or_assign(COMPLETED); + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + } + + self.state.and_assign(!RUNNING); + + if &self.state & CANCELLED != 0 { + self.drop_data(); + } else if &self.state & RUN_AGAIN != 0 { + self.state.and_assign(!RUN_AGAIN); + self.schedule_run() + } + } + + unsafe fn drop_data(&self) { + if &self.state & COMPLETED == 0 { + ManuallyDrop::drop(&mut self.data.get().deref_mut().future); + } else if &self.state & EMPTIED == 0 { + ManuallyDrop::drop(&mut self.data.get().deref_mut().result); + } + } +} diff --git a/src/async_engine/ae_timeout.rs b/src/async_engine/ae_timeout.rs new file mode 100644 index 00000000..bc2d94b2 --- /dev/null +++ b/src/async_engine/ae_timeout.rs @@ -0,0 +1,51 @@ +use { + crate::wheel::{Wheel, WheelDispatcher, WheelId}, + std::{ + cell::{Cell, RefCell}, + error::Error, + future::Future, + pin::Pin, + rc::Rc, + task::{Context, Poll, Waker}, + }, +}; + +pub(super) struct TimeoutData { + pub expired: Cell, + pub waker: RefCell>, +} + +impl WheelDispatcher for TimeoutData { + fn dispatch(self: Rc) -> Result<(), Box> { + self.expired.set(true); + if let Some(w) = self.waker.borrow_mut().take() { + w.wake(); + } + Ok(()) + } +} + +pub struct Timeout { + pub(super) id: WheelId, + pub(super) wheel: Rc, + pub(super) data: Rc, +} + +impl Future for Timeout { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.data.expired.get() { + Poll::Ready(()) + } else { + *self.data.waker.borrow_mut() = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +impl Drop for Timeout { + fn drop(&mut self) { + self.wheel.remove(self.id); + } +} diff --git a/src/async_engine/ae_timer.rs b/src/async_engine/ae_timer.rs new file mode 100644 index 00000000..fe72d336 --- /dev/null +++ b/src/async_engine/ae_timer.rs @@ -0,0 +1,50 @@ +use { + crate::async_engine::{AsyncEngine, AsyncError, AsyncFd}, + std::{rc::Rc, time::Duration}, + uapi::c, +}; + +#[derive(Clone)] +pub struct Timer { + fd: AsyncFd, +} + +impl Timer { + pub(super) fn new(eng: &Rc, clock_id: c::c_int) -> Result { + let fd = match uapi::timerfd_create(clock_id, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { + Ok(fd) => fd, + Err(e) => return Err(AsyncError::CreateTimer(e.into())), + }; + let afd = eng.fd(&Rc::new(fd))?; + Ok(Self { fd: afd }) + } + + pub async fn expired(&self) -> Result { + self.fd.readable().await?; + let mut buf = 0u64; + if let Err(e) = uapi::read(self.fd.raw(), &mut buf) { + return Err(AsyncError::TimerReadError(e.into())); + } + Ok(buf) + } + + pub fn program( + &self, + initial: Option, + periodic: Option, + ) -> Result<(), AsyncError> { + let mut timerspec: c::itimerspec = uapi::pod_zeroed(); + if let Some(init) = initial { + timerspec.it_value.tv_sec = init.as_secs() as _; + timerspec.it_value.tv_nsec = init.subsec_nanos() as _; + if let Some(per) = periodic { + timerspec.it_interval.tv_sec = per.as_secs() as _; + timerspec.it_interval.tv_nsec = per.subsec_nanos() as _; + } + } + if let Err(e) = uapi::timerfd_settime(self.fd.raw(), 0, &timerspec) { + return Err(AsyncError::SetTimer(e.into())); + } + Ok(()) + } +} diff --git a/src/async_engine/ae_yield.rs b/src/async_engine/ae_yield.rs new file mode 100644 index 00000000..0e38affd --- /dev/null +++ b/src/async_engine/ae_yield.rs @@ -0,0 +1,27 @@ +use { + crate::async_engine::ae_queue::DispatchQueue, + std::{ + future::Future, + pin::Pin, + rc::Rc, + task::{Context, Poll}, + }, +}; + +pub struct Yield { + pub(super) iteration: u64, + pub(super) queue: Rc, +} + +impl Future for Yield { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.queue.iteration() > self.iteration { + Poll::Ready(()) + } else { + self.queue.push_yield(cx.waker().clone()); + Poll::Pending + } + } +}