autocommit 2022-01-03 18:56:52 CET
This commit is contained in:
parent
fc887b339e
commit
30376c595c
39 changed files with 3157 additions and 309 deletions
106
src/wheel.rs
106
src/wheel.rs
|
|
@ -1,11 +1,11 @@
|
|||
use crate::event_loop::{EventLoopDispatcher, EventLoopError, EventLoopId, EventLoopRef};
|
||||
use crate::event_loop::{EventLoop, EventLoopDispatcher, EventLoopError};
|
||||
use crate::time::{Time, TimeError};
|
||||
use crate::utils::copyhashmap::CopyHashMap;
|
||||
use crate::utils::numcell::NumCell;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::rc::{Rc, Weak};
|
||||
use std::rc::Rc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use uapi::{c, OwnedFd};
|
||||
|
|
@ -30,11 +30,6 @@ pub trait WheelDispatcher {
|
|||
fn dispatch(self: Rc<Self>) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WheelRef {
|
||||
data: Weak<WheelData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||
struct WheelEntry {
|
||||
expiration: Time,
|
||||
|
|
@ -44,10 +39,9 @@ struct WheelEntry {
|
|||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct WheelId(u64);
|
||||
|
||||
struct WheelData {
|
||||
id: EventLoopId,
|
||||
pub struct Wheel {
|
||||
destroyed: Cell<bool>,
|
||||
fd: OwnedFd,
|
||||
el: EventLoopRef,
|
||||
next_id: NumCell<u64>,
|
||||
start: Time,
|
||||
current_expiration: Cell<Option<Time>>,
|
||||
|
|
@ -55,37 +49,47 @@ struct WheelData {
|
|||
expirations: RefCell<BinaryHeap<Reverse<WheelEntry>>>,
|
||||
}
|
||||
|
||||
impl WheelData {
|
||||
fn new(el: &EventLoopRef) -> Result<Rc<Self>, WheelError> {
|
||||
impl Wheel {
|
||||
pub fn install(el: &Rc<EventLoop>) -> Result<Rc<Self>, WheelError> {
|
||||
let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) {
|
||||
Ok(fd) => fd,
|
||||
Err(e) => return Err(WheelError::CreateFailed(e.into())),
|
||||
};
|
||||
let id = el.id()?;
|
||||
let id = el.id();
|
||||
let wheel = Rc::new(Self {
|
||||
id,
|
||||
destroyed: Cell::new(false),
|
||||
fd,
|
||||
el: el.clone(),
|
||||
next_id: Default::default(),
|
||||
start: Time::now()?,
|
||||
current_expiration: Cell::new(None),
|
||||
dispatchers: CopyHashMap::new(),
|
||||
expirations: RefCell::new(Default::default()),
|
||||
});
|
||||
el.insert(id, Some(wheel.fd.raw()), c::EPOLLIN, wheel.clone())?;
|
||||
let wrapper = Rc::new(WheelWrapper {
|
||||
wheel: wheel.clone(),
|
||||
});
|
||||
el.insert(id, Some(wheel.fd.raw()), c::EPOLLIN, wrapper)?;
|
||||
Ok(wheel)
|
||||
}
|
||||
|
||||
fn id(&self) -> WheelId {
|
||||
pub fn id(&self) -> WheelId {
|
||||
WheelId(self.next_id.fetch_add(1))
|
||||
}
|
||||
|
||||
fn timeout(
|
||||
fn check_destroyed(&self) -> Result<(), WheelError> {
|
||||
if self.destroyed.get() {
|
||||
return Err(WheelError::Destroyed);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn timeout(
|
||||
&self,
|
||||
id: WheelId,
|
||||
ms: u64,
|
||||
dispatcher: Rc<dyn WheelDispatcher>,
|
||||
) -> Result<(), WheelError> {
|
||||
self.check_destroyed()?;
|
||||
let expiration = (Time::now()? + Duration::from_millis(ms)).round_to_ms();
|
||||
let current = self.current_expiration.get();
|
||||
if current.is_none() || expiration - self.start < current.unwrap() - self.start {
|
||||
|
|
@ -110,41 +114,45 @@ impl WheelData {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&self, id: WheelId) {
|
||||
log::trace!("removing {:?} from wheel", id);
|
||||
pub fn remove(&self, id: WheelId) {
|
||||
// log::trace!("removing {:?} from wheel", id);
|
||||
self.dispatchers.remove(&id.0);
|
||||
}
|
||||
}
|
||||
|
||||
impl EventLoopDispatcher for WheelData {
|
||||
struct WheelWrapper {
|
||||
wheel: Rc<Wheel>,
|
||||
}
|
||||
|
||||
impl EventLoopDispatcher for WheelWrapper {
|
||||
fn dispatch(&self, events: i32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 {
|
||||
return Err(Box::new(WheelError::ErrorEvent));
|
||||
}
|
||||
let mut n = 0u64;
|
||||
while uapi::read(self.fd.raw(), &mut n).is_ok() {}
|
||||
while uapi::read(self.wheel.fd.raw(), &mut n).is_ok() {}
|
||||
let now = match Time::now() {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
};
|
||||
let dist = now - self.start;
|
||||
let dist = now - self.wheel.start;
|
||||
let mut to_dispatch = vec![];
|
||||
{
|
||||
let mut expirations = self.expirations.borrow_mut();
|
||||
let mut expirations = self.wheel.expirations.borrow_mut();
|
||||
while let Some(Reverse(entry)) = expirations.peek() {
|
||||
if entry.expiration - self.start > dist {
|
||||
if entry.expiration - self.wheel.start > dist {
|
||||
break;
|
||||
}
|
||||
if let Some(dispatcher) = self.dispatchers.remove(&entry.id) {
|
||||
if let Some(dispatcher) = self.wheel.dispatchers.remove(&entry.id) {
|
||||
to_dispatch.push(dispatcher);
|
||||
}
|
||||
expirations.pop();
|
||||
}
|
||||
self.current_expiration.set(None);
|
||||
self.wheel.current_expiration.set(None);
|
||||
while let Some(Reverse(entry)) = expirations.peek() {
|
||||
if self.dispatchers.get(&entry.id).is_some() {
|
||||
if self.wheel.dispatchers.get(&entry.id).is_some() {
|
||||
let res = uapi::timerfd_settime(
|
||||
self.fd.raw(),
|
||||
self.wheel.fd.raw(),
|
||||
c::TFD_TIMER_ABSTIME,
|
||||
&c::itimerspec {
|
||||
it_interval: uapi::pod_zeroed(),
|
||||
|
|
@ -154,7 +162,7 @@ impl EventLoopDispatcher for WheelData {
|
|||
if let Err(e) = res {
|
||||
return Err(Box::new(WheelError::SetFailed(e.into())));
|
||||
}
|
||||
self.current_expiration.set(Some(entry.expiration));
|
||||
self.wheel.current_expiration.set(Some(entry.expiration));
|
||||
break;
|
||||
}
|
||||
expirations.pop();
|
||||
|
|
@ -167,41 +175,9 @@ impl EventLoopDispatcher for WheelData {
|
|||
}
|
||||
}
|
||||
|
||||
impl Drop for WheelData {
|
||||
impl Drop for WheelWrapper {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.el.remove(self.id);
|
||||
}
|
||||
}
|
||||
|
||||
impl WheelRef {
|
||||
pub fn new(el: &EventLoopRef) -> Result<Self, WheelError> {
|
||||
Ok(Self {
|
||||
data: Rc::downgrade(&WheelData::new(el)?),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn id(&self) -> Result<WheelId, WheelError> {
|
||||
match self.data.upgrade() {
|
||||
Some(d) => Ok(d.id()),
|
||||
_ => Err(WheelError::Destroyed),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timeout(
|
||||
&self,
|
||||
id: WheelId,
|
||||
ms: u64,
|
||||
dispatcher: Rc<dyn WheelDispatcher>,
|
||||
) -> Result<(), WheelError> {
|
||||
match self.data.upgrade() {
|
||||
Some(d) => d.timeout(id, ms, dispatcher),
|
||||
_ => Err(WheelError::Destroyed),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&self, id: WheelId) {
|
||||
if let Some(wheel) = self.data.upgrade() {
|
||||
wheel.remove(id);
|
||||
}
|
||||
self.wheel.destroyed.set(true);
|
||||
self.wheel.dispatchers.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue