async: rebase wheel on top of async engine
This commit is contained in:
parent
87a90a8ae4
commit
3875c63172
13 changed files with 218 additions and 285 deletions
363
src/wheel.rs
363
src/wheel.rs
|
|
@ -1,112 +1,174 @@
|
|||
use {
|
||||
crate::{
|
||||
event_loop::{EventLoop, EventLoopDispatcher, EventLoopError, EventLoopId},
|
||||
async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture},
|
||||
time::{Time, TimeError},
|
||||
utils::{copyhashmap::CopyHashMap, numcell::NumCell},
|
||||
utils::{
|
||||
copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsError,
|
||||
stack::Stack,
|
||||
},
|
||||
},
|
||||
std::{
|
||||
cell::{Cell, RefCell},
|
||||
cmp::Reverse,
|
||||
collections::BinaryHeap,
|
||||
error::Error,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
task::{Context, Poll, Waker},
|
||||
time::Duration,
|
||||
},
|
||||
thiserror::Error,
|
||||
uapi::{c, OwnedFd},
|
||||
uapi::c,
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum WheelError {
|
||||
#[error("Could not create the timerfd: {0}")]
|
||||
CreateFailed(crate::utils::oserror::OsError),
|
||||
#[error("Could not set the timerfd: {0}")]
|
||||
SetFailed(crate::utils::oserror::OsError),
|
||||
#[error("The timerfd is in an error state")]
|
||||
ErrorEvent,
|
||||
#[error("An event loop error occurred: {0}")]
|
||||
EventLoopError(#[from] EventLoopError),
|
||||
#[error("Cannot determine the time: {0}")]
|
||||
#[error("Could not create the timerfd")]
|
||||
CreateFailed(#[source] OsError),
|
||||
#[error("Could not set the timerfd")]
|
||||
SetFailed(#[source] OsError),
|
||||
#[error("An async error occurred")]
|
||||
AsyncError(#[from] AsyncError),
|
||||
#[error("Cannot determine the time")]
|
||||
TimeError(#[from] TimeError),
|
||||
#[error("The timer wheel is already destroyed")]
|
||||
Destroyed,
|
||||
}
|
||||
|
||||
pub trait WheelDispatcher {
|
||||
fn dispatch(self: Rc<Self>) -> Result<(), Box<dyn std::error::Error>>;
|
||||
#[error("Could not read from the timerfd")]
|
||||
Read(#[source] OsError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||
struct WheelEntry {
|
||||
expiration: Time,
|
||||
id: WheelId,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
|
||||
pub struct WheelId(u64);
|
||||
|
||||
pub struct Wheel {
|
||||
data: Rc<WheelData>,
|
||||
}
|
||||
|
||||
impl Drop for Wheel {
|
||||
fn drop(&mut self) {
|
||||
self.data.kill();
|
||||
}
|
||||
}
|
||||
|
||||
struct WheelTimeoutData {
|
||||
id: u64,
|
||||
expired: Cell<Option<Result<(), WheelError>>>,
|
||||
wheel: Rc<WheelData>,
|
||||
waker: Cell<Option<Waker>>,
|
||||
}
|
||||
|
||||
impl WheelTimeoutData {
|
||||
fn complete(&self, res: Result<(), WheelError>) {
|
||||
self.expired.set(Some(res));
|
||||
if let Some(waker) = self.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WheelTimeoutFuture {
|
||||
data: Rc<WheelTimeoutData>,
|
||||
}
|
||||
|
||||
impl Drop for WheelTimeoutFuture {
|
||||
fn drop(&mut self) {
|
||||
self.data.wheel.dispatchers.remove(&self.data.id);
|
||||
self.data.waker.set(None);
|
||||
if !self.data.wheel.destroyed.get() {
|
||||
self.data.expired.take();
|
||||
self.data.wheel.cached_futures.push(self.data.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for WheelTimeoutFuture {
|
||||
type Output = Result<(), WheelError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
if let Some(res) = self.data.expired.take() {
|
||||
Poll::Ready(res)
|
||||
} else {
|
||||
self.data.waker.set(Some(cx.waker().clone()));
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WheelData {
|
||||
destroyed: Cell<bool>,
|
||||
fd: OwnedFd,
|
||||
fd: AsyncFd,
|
||||
next_id: NumCell<u64>,
|
||||
start: Time,
|
||||
current_expiration: Cell<Option<Time>>,
|
||||
dispatchers: CopyHashMap<WheelId, Rc<dyn WheelDispatcher>>,
|
||||
periodic_dispatchers: CopyHashMap<WheelId, Rc<PeriodicDispatcher>>,
|
||||
dispatchers: CopyHashMap<u64, Rc<WheelTimeoutData>>,
|
||||
expirations: RefCell<BinaryHeap<Reverse<WheelEntry>>>,
|
||||
id: EventLoopId,
|
||||
el: Rc<EventLoop>,
|
||||
dispatcher: Cell<Option<SpawnedFuture<()>>>,
|
||||
cached_futures: Stack<Rc<WheelTimeoutData>>,
|
||||
}
|
||||
|
||||
impl Wheel {
|
||||
pub fn install(el: &Rc<EventLoop>) -> Result<Rc<Self>, WheelError> {
|
||||
pub fn new(eng: &Rc<AsyncEngine>) -> Result<Rc<Self>, WheelError> {
|
||||
let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) {
|
||||
Ok(fd) => fd,
|
||||
Ok(fd) => Rc::new(fd),
|
||||
Err(e) => return Err(WheelError::CreateFailed(e.into())),
|
||||
};
|
||||
let id = el.id();
|
||||
let wheel = Rc::new(Self {
|
||||
let fd = eng.fd(&fd)?;
|
||||
let data = Rc::new(WheelData {
|
||||
destroyed: Cell::new(false),
|
||||
fd,
|
||||
next_id: Default::default(),
|
||||
next_id: NumCell::new(1),
|
||||
start: Time::now()?,
|
||||
current_expiration: Cell::new(None),
|
||||
dispatchers: CopyHashMap::new(),
|
||||
periodic_dispatchers: Default::default(),
|
||||
expirations: RefCell::new(Default::default()),
|
||||
id,
|
||||
el: el.clone(),
|
||||
current_expiration: Default::default(),
|
||||
dispatchers: Default::default(),
|
||||
expirations: Default::default(),
|
||||
dispatcher: Default::default(),
|
||||
cached_futures: Default::default(),
|
||||
});
|
||||
let wrapper = Rc::new(WheelWrapper {
|
||||
wheel: wheel.clone(),
|
||||
});
|
||||
el.insert(id, Some(wheel.fd.raw()), c::EPOLLIN, wrapper)?;
|
||||
Ok(wheel)
|
||||
data.dispatcher
|
||||
.set(Some(eng.spawn(data.clone().dispatch())));
|
||||
Ok(Rc::new(Wheel { data }))
|
||||
}
|
||||
|
||||
pub fn id(&self) -> WheelId {
|
||||
WheelId(self.next_id.fetch_add(1))
|
||||
fn future(&self) -> WheelTimeoutFuture {
|
||||
let data = self.data.cached_futures.pop().unwrap_or_else(|| {
|
||||
Rc::new(WheelTimeoutData {
|
||||
id: self.data.next_id.fetch_add(1),
|
||||
expired: Cell::new(None),
|
||||
wheel: self.data.clone(),
|
||||
waker: Cell::new(None),
|
||||
})
|
||||
});
|
||||
WheelTimeoutFuture { data }
|
||||
}
|
||||
|
||||
fn check_destroyed(&self) -> Result<(), WheelError> {
|
||||
if self.destroyed.get() {
|
||||
return Err(WheelError::Destroyed);
|
||||
pub fn timeout(&self, ms: u64) -> WheelTimeoutFuture {
|
||||
if self.data.destroyed.get() {
|
||||
return WheelTimeoutFuture {
|
||||
data: Rc::new(WheelTimeoutData {
|
||||
id: 0,
|
||||
expired: Cell::new(Some(Err(WheelError::Destroyed))),
|
||||
wheel: self.data.clone(),
|
||||
waker: Default::default(),
|
||||
}),
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn timeout(
|
||||
&self,
|
||||
id: WheelId,
|
||||
ms: u64,
|
||||
dispatcher: Rc<dyn WheelDispatcher>,
|
||||
) -> Result<(), WheelError> {
|
||||
self.check_destroyed()?;
|
||||
let expiration = (Time::now()? + Duration::from_millis(ms)).round_to_ms();
|
||||
let current = self.current_expiration.get();
|
||||
if current.is_none() || expiration - self.start < current.unwrap() - self.start {
|
||||
let future = self.future();
|
||||
let now = match Time::now() {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
future.data.expired.set(Some(Err(WheelError::TimeError(e))));
|
||||
return future;
|
||||
}
|
||||
};
|
||||
let expiration = (now + Duration::from_millis(ms)).round_to_ms();
|
||||
let current = self.data.current_expiration.get();
|
||||
if current.is_none() || expiration - self.data.start < current.unwrap() - self.data.start {
|
||||
log::info!("programming timer {}", self.data.fd.raw());
|
||||
let res = uapi::timerfd_settime(
|
||||
self.fd.raw(),
|
||||
self.data.fd.raw(),
|
||||
c::TFD_TIMER_ABSTIME,
|
||||
&c::itimerspec {
|
||||
it_interval: uapi::pod_zeroed(),
|
||||
|
|
@ -114,101 +176,82 @@ impl Wheel {
|
|||
},
|
||||
);
|
||||
if let Err(e) = res {
|
||||
return Err(WheelError::SetFailed(e.into()));
|
||||
future
|
||||
.data
|
||||
.expired
|
||||
.set(Some(Err(WheelError::SetFailed(e.into()))));
|
||||
return future;
|
||||
}
|
||||
self.current_expiration.set(Some(expiration));
|
||||
}
|
||||
self.expirations
|
||||
.borrow_mut()
|
||||
.push(Reverse(WheelEntry { expiration, id }));
|
||||
self.dispatchers.set(id, dispatcher);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn periodic(
|
||||
&self,
|
||||
id: WheelId,
|
||||
us: u64,
|
||||
dispatcher: Rc<dyn WheelDispatcher>,
|
||||
) -> Result<(), WheelError> {
|
||||
self.check_destroyed()?;
|
||||
let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) {
|
||||
Ok(fd) => fd,
|
||||
Err(e) => return Err(WheelError::CreateFailed(e.into())),
|
||||
};
|
||||
let tv_sec = (us / 1_000_000) as _;
|
||||
let tv_nsec = (us % 1_000_000 * 1_000) as _;
|
||||
let res = uapi::timerfd_settime(
|
||||
fd.raw(),
|
||||
0,
|
||||
&c::itimerspec {
|
||||
it_interval: c::timespec { tv_sec, tv_nsec },
|
||||
it_value: c::timespec { tv_sec, tv_nsec },
|
||||
},
|
||||
);
|
||||
if let Err(e) = res {
|
||||
return Err(WheelError::SetFailed(e.into()));
|
||||
}
|
||||
let el_id = self.el.id();
|
||||
let pd = Rc::new(PeriodicDispatcher {
|
||||
fd,
|
||||
id: el_id,
|
||||
el: self.el.clone(),
|
||||
dispatcher,
|
||||
});
|
||||
self.el
|
||||
.insert(el_id, Some(pd.fd.raw()), c::EPOLLIN, pd.clone())?;
|
||||
self.periodic_dispatchers.set(id, pd);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove(&self, id: WheelId) {
|
||||
// log::trace!("removing {:?} from wheel", id);
|
||||
self.dispatchers.remove(&id);
|
||||
if let Some(d) = self.periodic_dispatchers.remove(&id) {
|
||||
let _ = self.el.remove(d.id);
|
||||
self.data.current_expiration.set(Some(expiration));
|
||||
}
|
||||
self.data.expirations.borrow_mut().push(Reverse(WheelEntry {
|
||||
expiration,
|
||||
id: future.data.id,
|
||||
}));
|
||||
self.data
|
||||
.dispatchers
|
||||
.set(future.data.id, future.data.clone());
|
||||
future
|
||||
}
|
||||
}
|
||||
|
||||
struct WheelWrapper {
|
||||
wheel: Rc<Wheel>,
|
||||
}
|
||||
|
||||
impl EventLoopDispatcher for WheelWrapper {
|
||||
fn dispatch(
|
||||
self: Rc<Self>,
|
||||
_fd: Option<i32>,
|
||||
events: i32,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 {
|
||||
return Err(Box::new(WheelError::ErrorEvent));
|
||||
impl WheelData {
|
||||
fn kill(&self) {
|
||||
self.destroyed.set(true);
|
||||
self.dispatcher.set(None);
|
||||
self.cached_futures.take();
|
||||
for (_, dispatcher) in self.dispatchers.lock().drain() {
|
||||
dispatcher.complete(Err(WheelError::Destroyed));
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch(self: Rc<Self>) {
|
||||
loop {
|
||||
if let Err(e) = self.fd.readable().await {
|
||||
log::error!(
|
||||
"Could not wait for the timerfd to become readable: {}",
|
||||
ErrorFmt(e)
|
||||
);
|
||||
self.kill();
|
||||
return;
|
||||
}
|
||||
if let Err(e) = self.dispatch_once() {
|
||||
log::error!("Could not dispatch wheel expirations: {}", ErrorFmt(e));
|
||||
self.kill();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatch_once(&self) -> Result<(), WheelError> {
|
||||
let mut n = 0u64;
|
||||
while uapi::read(self.wheel.fd.raw(), &mut n).is_ok() {}
|
||||
let now = match Time::now() {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
};
|
||||
let dist = now - self.wheel.start;
|
||||
let mut to_dispatch = vec![];
|
||||
{
|
||||
let mut expirations = self.wheel.expirations.borrow_mut();
|
||||
while let Some(Reverse(entry)) = expirations.peek() {
|
||||
if entry.expiration - self.wheel.start > dist {
|
||||
loop {
|
||||
if let Err(e) = uapi::read(self.fd.raw(), &mut n) {
|
||||
if e.0 == c::EAGAIN {
|
||||
break;
|
||||
}
|
||||
if let Some(dispatcher) = self.wheel.dispatchers.remove(&entry.id) {
|
||||
return Err(WheelError::Read(e.into()));
|
||||
}
|
||||
}
|
||||
let now = Time::now()?;
|
||||
let dist = now - self.start;
|
||||
let mut to_dispatch = vec![];
|
||||
{
|
||||
let mut expirations = self.expirations.borrow_mut();
|
||||
while let Some(Reverse(entry)) = expirations.peek() {
|
||||
if entry.expiration - self.start > dist {
|
||||
break;
|
||||
}
|
||||
if let Some(dispatcher) = self.dispatchers.remove(&entry.id) {
|
||||
to_dispatch.push(dispatcher);
|
||||
}
|
||||
expirations.pop();
|
||||
}
|
||||
self.wheel.current_expiration.set(None);
|
||||
self.current_expiration.set(None);
|
||||
while let Some(Reverse(entry)) = expirations.peek() {
|
||||
if self.wheel.dispatchers.get(&entry.id).is_some() {
|
||||
if self.dispatchers.get(&entry.id).is_some() {
|
||||
let res = uapi::timerfd_settime(
|
||||
self.wheel.fd.raw(),
|
||||
self.fd.raw(),
|
||||
c::TFD_TIMER_ABSTIME,
|
||||
&c::itimerspec {
|
||||
it_interval: uapi::pod_zeroed(),
|
||||
|
|
@ -216,49 +259,17 @@ impl EventLoopDispatcher for WheelWrapper {
|
|||
},
|
||||
);
|
||||
if let Err(e) = res {
|
||||
return Err(Box::new(WheelError::SetFailed(e.into())));
|
||||
return Err(WheelError::SetFailed(e.into()));
|
||||
}
|
||||
self.wheel.current_expiration.set(Some(entry.expiration));
|
||||
self.current_expiration.set(Some(entry.expiration));
|
||||
break;
|
||||
}
|
||||
expirations.pop();
|
||||
}
|
||||
}
|
||||
for dispatcher in to_dispatch {
|
||||
dispatcher.dispatch()?;
|
||||
dispatcher.complete(Ok(()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WheelWrapper {
|
||||
fn drop(&mut self) {
|
||||
self.wheel.destroyed.set(true);
|
||||
self.wheel.dispatchers.clear();
|
||||
let _ = self.wheel.el.remove(self.wheel.id);
|
||||
}
|
||||
}
|
||||
|
||||
struct PeriodicDispatcher {
|
||||
fd: OwnedFd,
|
||||
id: EventLoopId,
|
||||
el: Rc<EventLoop>,
|
||||
dispatcher: Rc<dyn WheelDispatcher>,
|
||||
}
|
||||
|
||||
impl EventLoopDispatcher for PeriodicDispatcher {
|
||||
fn dispatch(self: Rc<Self>, _fd: Option<i32>, events: i32) -> Result<(), Box<dyn Error>> {
|
||||
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 {
|
||||
return Err(Box::new(WheelError::ErrorEvent));
|
||||
}
|
||||
let mut n = 0u64;
|
||||
while uapi::read(self.fd.raw(), &mut n).is_ok() {}
|
||||
self.dispatcher.clone().dispatch()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PeriodicDispatcher {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.el.remove(self.id);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue