pub use crate::async_engine::yield_::Yield; use crate::event_loop::{EventLoop, EventLoopError}; use crate::utils::copyhashmap::CopyHashMap; use crate::utils::numcell::NumCell; use crate::wheel::{Wheel, WheelError}; pub use fd::AsyncFd; use fd::AsyncFdData; use queue::{DispatchQueue, Dispatcher}; use std::cell::{Cell, RefCell}; use std::future::Future; use std::rc::Rc; pub use task::SpawnedFuture; use thiserror::Error; pub use timeout::Timeout; use timeout::TimeoutData; use uapi::OwnedFd; #[derive(Debug, Error)] pub enum AsyncError { #[error("The timer wheel returned an error: {0}")] WheelError(#[from] WheelError), #[error("The event loop caused an error: {0}")] EventLoopError(#[from] EventLoopError), } pub struct AsyncEngine { wheel: Rc, el: Rc, queue: Rc, fds: CopyHashMap>, } impl AsyncEngine { pub fn install(el: &Rc, wheel: &Rc) -> Result, AsyncError> { let queue = Dispatcher::install(el)?; Ok(Rc::new(Self { wheel: wheel.clone(), el: el.clone(), queue, fds: CopyHashMap::new(), })) } pub fn timeout(&self, ms: u64) -> Result { let data = Rc::new(TimeoutData { expired: Cell::new(false), waker: RefCell::new(None), }); let id = self.wheel.id(); self.wheel.timeout(id, ms, data.clone())?; Ok(Timeout { id, wheel: self.wheel.clone(), data, }) } pub fn spawn + 'static>(&self, f: F) -> SpawnedFuture { self.queue.spawn(f) } pub fn fd(self: &Rc, fd: &Rc) -> Result { let data = if let Some(afd) = self.fds.get(&fd.raw()) { afd.ref_count.fetch_add(1); afd } else { let id = self.el.id(); let afd = Rc::new(AsyncFdData { ref_count: NumCell::new(1), fd: fd.clone(), id, el: self.el.clone(), write_registered: Cell::new(false), read_registered: Cell::new(false), readers: RefCell::new(vec![]), writers: RefCell::new(vec![]), erroneous: Cell::new(false), }); self.el.insert(id, Some(fd.raw()), 0, afd.clone())?; afd }; Ok(AsyncFd { engine: self.clone(), data, }) } pub fn yield_now(&self) -> Yield { Yield { iteration: self.queue.iteration(), queue: self.queue.clone(), } } } mod yield_ { use crate::async_engine::queue::DispatchQueue; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; pub struct Yield { pub(super) iteration: u64, pub(super) queue: Rc, } impl Future for Yield { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.queue.iteration() > self.iteration { Poll::Ready(()) } else { cx.waker().wake_by_ref(); Poll::Pending } } } } mod timeout { use crate::wheel::{Wheel, WheelDispatcher, WheelId}; use std::cell::{Cell, RefCell}; use std::error::Error; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, Waker}; pub(super) struct TimeoutData { pub expired: Cell, pub waker: RefCell>, } impl WheelDispatcher for TimeoutData { fn dispatch(self: Rc) -> Result<(), Box> { self.expired.set(true); if let Some(w) = self.waker.borrow_mut().take() { w.wake(); } Ok(()) } } pub struct Timeout { pub(super) id: WheelId, pub(super) wheel: Rc, pub(super) data: Rc, } impl Future for Timeout { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.data.expired.get() { Poll::Ready(()) } else { *self.data.waker.borrow_mut() = Some(cx.waker().clone()); Poll::Pending } } } impl Drop for Timeout { fn drop(&mut self) { self.wheel.remove(self.id); } } } mod task { use crate::async_engine::queue::DispatchQueue; use crate::utils::ptr_ext::{MutPtrExt, PtrExt}; use crate::NumCell; use std::cell::{Cell, UnsafeCell}; use std::future::Future; use std::mem::ManuallyDrop; use std::pin::Pin; use std::ptr; use std::rc::Rc; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; pub struct SpawnedFuture { vtable: &'static SpawnedFutureVtable, data: *mut u8, } impl Future for SpawnedFuture { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { (self.vtable.poll)(self.data, cx) } } } impl Drop for SpawnedFuture { fn drop(&mut self) { unsafe { (self.vtable.drop)(self.data); } } } struct SpawnedFutureVTableProxy(T, F); impl> SpawnedFutureVTableProxy { const VTABLE: &'static SpawnedFutureVtable = &SpawnedFutureVtable { poll: Self::poll, drop: Self::drop, }; unsafe fn poll(data: *mut u8, ctx: &mut Context<'_>) -> Poll { let task = (data as *const Task).deref(); if &task.state & COMPLETED == 0 { task.waker.set(Some(ctx.waker().clone())); Poll::Pending } else if &task.state & EMPTIED == 0 { task.state.or_assign(EMPTIED); Poll::Ready(ptr::read(&*task.data.get().deref().result)) } else { panic!("Future polled after it has already been emptied"); } } unsafe fn drop(data: *mut u8) { { let task = (data as *const Task).deref(); task.state.or_assign(CANCELLED); if &task.state & RUNNING == 0 { task.drop_data(); } } Task::::dec_ref_count(data as _); } } struct SpawnedFutureVtable { poll: unsafe fn(data: *mut u8, ctx: &mut Context<'_>) -> Poll, drop: unsafe fn(data: *mut u8), } union TaskData> { result: ManuallyDrop, future: ManuallyDrop, } const RUNNING: u32 = 1; const RUN_AGAIN: u32 = 2; const COMPLETED: u32 = 4; const EMPTIED: u32 = 8; const CANCELLED: u32 = 16; struct Task> { ref_count: NumCell, state: NumCell, data: UnsafeCell>, waker: Cell>, queue: Rc, } pub(super) struct Runnable { data: *const u8, run: unsafe fn(data: *const u8, run: bool), } impl Runnable { pub(super) fn run(self) { let slf = ManuallyDrop::new(self); unsafe { (slf.run)(slf.data, true); } } } impl Drop for Runnable { fn drop(&mut self) { unsafe { (self.run)(self.data, false); } } } impl DispatchQueue { pub(super) fn spawn>(self: &Rc, f: F) -> SpawnedFuture { let f = Box::new(Task { ref_count: NumCell::new(1), state: NumCell::new(0), data: UnsafeCell::new(TaskData { future: ManuallyDrop::new(f), }), waker: Cell::new(None), queue: self.clone(), }); unsafe { f.schedule_run(); } let f = Box::into_raw(f); SpawnedFuture { vtable: SpawnedFutureVTableProxy::::VTABLE, data: f as _, } } } impl> Task { const VTABLE: &'static RawWakerVTable = &RawWakerVTable::new( Self::waker_clone, Self::waker_wake, Self::waker_wake_by_ref, Self::waker_drop, ); unsafe fn run_proxy(data: *const u8, run: bool) { let task = data as *const Self; if run { task.deref().run(); } Self::dec_ref_count(task); } unsafe fn dec_ref_count(slf: *const Self) { if slf.deref().ref_count.fetch_sub(1) == 1 { Box::from_raw(slf as *mut Self); } } unsafe fn inc_ref_count(&self) { self.ref_count.fetch_add(1); } unsafe fn waker_clone(data: *const ()) -> RawWaker { let task = &mut *(data as *mut Self); task.inc_ref_count(); RawWaker::new(data, Self::VTABLE) } unsafe fn waker_wake(data: *const ()) { Self::waker_wake_by_ref(data); Self::waker_drop(data); } unsafe fn waker_wake_by_ref(data: *const ()) { (data as *const Self).deref().schedule_run(); } unsafe fn waker_drop(data: *const ()) { Self::dec_ref_count(data as _) } unsafe fn schedule_run(&self) { if &self.state & (COMPLETED | CANCELLED) == 0 { if &self.state & RUNNING == 0 { self.state.or_assign(RUNNING); self.inc_ref_count(); let data = self as *const _ as _; self.queue.push(Runnable { data, run: Self::run_proxy, }); } else { self.state.or_assign(RUN_AGAIN); } } } unsafe fn run(&self) { if &self.state & CANCELLED == 0 { let data = self.data.get().deref_mut(); self.inc_ref_count(); let raw_waker = RawWaker::new(self as *const _ as _, Self::VTABLE); let waker = Waker::from_raw(raw_waker); let mut ctx = Context::from_waker(&waker); if let Poll::Ready(d) = Pin::new_unchecked(&mut *data.future).poll(&mut ctx) { ManuallyDrop::drop(&mut data.future); ptr::write(&mut data.result, ManuallyDrop::new(d)); self.state.or_assign(COMPLETED); if let Some(waker) = self.waker.take() { waker.wake(); } } } self.state.and_assign(!RUNNING); if &self.state & CANCELLED != 0 { self.drop_data(); } else if &self.state & RUN_AGAIN != 0 { self.state.and_assign(!RUN_AGAIN); self.schedule_run() } } unsafe fn drop_data(&self) { if &self.state & COMPLETED == 0 { ManuallyDrop::drop(&mut self.data.get().deref_mut().future); } else if &self.state & EMPTIED == 0 { ManuallyDrop::drop(&mut self.data.get().deref_mut().result); } } } } mod queue { use crate::async_engine::task::Runnable; use crate::async_engine::AsyncError; use crate::event_loop::{EventLoop, EventLoopDispatcher, EventLoopId}; use crate::utils::numcell::NumCell; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::error::Error; use std::mem; use std::rc::Rc; pub(super) struct Dispatcher { queue: Rc, stash: RefCell>, } impl Dispatcher { pub fn install(el: &Rc) -> Result, AsyncError> { let id = el.id(); let queue = Rc::new(DispatchQueue { id, el: el.clone(), dispatch_scheduled: Cell::new(false), queue: RefCell::new(Default::default()), iteration: Default::default(), }); let slf = Rc::new(Dispatcher { queue: queue.clone(), stash: RefCell::new(Default::default()), }); el.insert(id, None, 0, slf)?; Ok(queue) } } impl EventLoopDispatcher for Dispatcher { fn dispatch(self: Rc, _events: i32) -> Result<(), Box> { loop { self.queue.iteration.fetch_add(1); let mut stash = self.stash.borrow_mut(); mem::swap(&mut *stash, &mut *self.queue.queue.borrow_mut()); if stash.is_empty() { break; } for runnable in stash.drain(..) { runnable.run(); } } self.queue.dispatch_scheduled.set(false); Ok(()) } } impl Drop for Dispatcher { fn drop(&mut self) { let _ = self.queue.el.remove(self.queue.id); mem::take(&mut *self.queue.queue.borrow_mut()); } } pub(super) struct DispatchQueue { dispatch_scheduled: Cell, id: EventLoopId, el: Rc, queue: RefCell>, iteration: NumCell, } impl DispatchQueue { pub fn push(&self, runnable: Runnable) { self.queue.borrow_mut().push_back(runnable); if !self.dispatch_scheduled.get() { let _ = self.el.schedule(self.id); self.dispatch_scheduled.set(true); } } pub fn iteration(&self) -> u64 { self.iteration.load() } } } mod fd { use crate::async_engine::{AsyncEngine, AsyncError}; use crate::event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId}; use crate::utils::numcell::NumCell; use std::cell::{Cell, RefCell}; use std::error::Error; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, Waker}; use uapi::{c, OwnedFd}; type Queue = RefCell>)>>; pub(super) struct AsyncFdData { pub(super) ref_count: NumCell, pub(super) fd: Rc, pub(super) id: EventLoopId, pub(super) el: Rc, pub(super) write_registered: Cell, pub(super) read_registered: Cell, pub(super) readers: Queue, pub(super) writers: Queue, pub(super) erroneous: Cell, } impl AsyncFdData { fn update_interests(&self) -> Result<(), EventLoopError> { let mut events = 0; if self.write_registered.get() { events |= c::EPOLLOUT; } if self.read_registered.get() { events |= c::EPOLLIN; } let res = self.el.modify(self.id, events); if res.is_err() { self.erroneous.set(true); let _ = self.el.remove(self.id); } res } fn poll( &self, woken: &Rc>, cx: &mut Context<'_>, registered: impl Fn(&AsyncFdData) -> &Cell, queue: impl Fn(&AsyncFdData) -> &Queue, ) -> Poll> { if woken.get() || self.erroneous.get() { return Poll::Ready(Ok(())); } if !registered(self).get() { registered(self).set(true); if let Err(e) = self.update_interests() { return Poll::Ready(Err(AsyncError::EventLoopError(e))); } } queue(self) .borrow_mut() .push((cx.waker().clone(), woken.clone())); Poll::Pending } } impl EventLoopDispatcher for AsyncFdData { fn dispatch(self: Rc, events: i32) -> Result<(), Box> { if events & (c::EPOLLERR | c::EPOLLHUP) != 0 { self.erroneous.set(true); if let Err(e) = self.el.remove(self.id) { return Err(Box::new(e)); } } let mut woke_any = false; if events & c::EPOLLIN != 0 || self.erroneous.get() { let mut readers = self.readers.borrow_mut(); woke_any |= !readers.is_empty(); for (waker, woken) in readers.drain(..) { woken.set(true); waker.wake(); } } if events & c::EPOLLOUT != 0 || self.erroneous.get() { let mut writers = self.writers.borrow_mut(); woke_any |= !writers.is_empty(); for (waker, woken) in writers.drain(..) { woken.set(true); waker.wake(); } } if !woke_any && !self.erroneous.get() { self.read_registered.set(false); self.write_registered.set(false); if let Err(e) = self.update_interests() { return Err(Box::new(e)); } } Ok(()) } } impl Drop for AsyncFdData { fn drop(&mut self) { let _ = self.el.remove(self.id); } } pub struct AsyncFd { pub(super) engine: Rc, pub(super) data: Rc, } impl Clone for AsyncFd { fn clone(&self) -> Self { self.data.ref_count.fetch_add(1); Self { engine: self.engine.clone(), data: self.data.clone(), } } } impl Drop for AsyncFd { fn drop(&mut self) { if self.data.ref_count.fetch_sub(1) == 1 { self.engine.fds.remove(&self.data.fd.raw()); } } } impl AsyncFd { pub fn raw(&self) -> i32 { self.data.fd.raw() } pub fn eng(&self) -> &Rc { &self.engine } pub fn readable(&self) -> AsyncFdReadable { AsyncFdReadable { fd: self, woken: Rc::new(Cell::new(false)), } } pub fn writable(&self) -> AsyncFdWritable { AsyncFdWritable { fd: self, woken: Rc::new(Cell::new(false)), } } } pub struct AsyncFdReadable<'a> { fd: &'a AsyncFd, woken: Rc>, } impl<'a> Future for AsyncFdReadable<'a> { type Output = Result<(), AsyncError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let data = &self.fd.data; data.poll(&self.woken, cx, |d| &d.read_registered, |d| &d.readers) } } pub struct AsyncFdWritable<'a> { fd: &'a AsyncFd, woken: Rc>, } impl<'a> Future for AsyncFdWritable<'a> { type Output = Result<(), AsyncError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let data = &self.fd.data; data.poll(&self.woken, cx, |d| &d.write_registered, |d| &d.writers) } } }