1
0
Fork 0
forked from wry/wry

autocommit 2022-03-02 14:24:07 CET

This commit is contained in:
Julian Orth 2022-03-02 14:24:07 +01:00
parent 0e9afcbfa5
commit aa0cb94143
30 changed files with 1059 additions and 123 deletions

View file

@ -3,7 +3,7 @@ use crate::event_loop::{EventLoop, EventLoopError};
use crate::utils::copyhashmap::CopyHashMap;
use crate::utils::numcell::NumCell;
use crate::wheel::{Wheel, WheelError};
pub use fd::AsyncFd;
pub use fd::{AsyncFd, FdStatus};
use fd::AsyncFdData;
use queue::{DispatchQueue, Dispatcher};
use std::cell::{Cell, RefCell};
@ -21,6 +21,8 @@ pub enum AsyncError {
WheelError(#[from] WheelError),
#[error("The event loop caused an error: {0}")]
EventLoopError(#[from] EventLoopError),
#[error("The file descriptor is in an error state")]
FdError,
}
#[derive(Copy, Clone, Eq, PartialEq)]
@ -90,7 +92,6 @@ impl AsyncEngine {
read_registered: Cell::new(false),
readers: RefCell::new(vec![]),
writers: RefCell::new(vec![]),
erroneous: Cell::new(false),
});
self.el.insert(id, Some(fd.raw()), 0, afd.clone())?;
afd
@ -198,6 +199,7 @@ mod task {
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
#[must_use]
pub struct SpawnedFuture<T: 'static> {
vtable: &'static SpawnedFutureVtable<T>,
data: *mut u8,
@ -535,7 +537,7 @@ mod fd {
use std::task::{Context, Poll, Waker};
use uapi::{c, OwnedFd};
type Queue = RefCell<Vec<(Waker, Rc<Cell<bool>>)>>;
type Queue = RefCell<Vec<(Waker, Rc<Cell<Option<FdStatus>>>)>>;
pub(super) struct AsyncFdData {
pub(super) ref_count: NumCell<u64>,
@ -546,7 +548,6 @@ mod fd {
pub(super) read_registered: Cell<bool>,
pub(super) readers: Queue,
pub(super) writers: Queue,
pub(super) erroneous: Cell<bool>,
}
impl AsyncFdData {
@ -560,21 +561,23 @@ mod fd {
}
let res = self.el.modify(self.id, events);
if res.is_err() {
self.erroneous.set(true);
let _ = self.el.remove(self.id);
if let Err(e) = self.el.remove(self.id) {
log::error!("Fatal error: Cannot remove file descriptor from event loop: {:?}", e);
self.el.stop();
}
}
res
}
fn poll(
&self,
woken: &Rc<Cell<bool>>,
woken: &Rc<Cell<Option<FdStatus>>>,
cx: &mut Context<'_>,
registered: impl Fn(&AsyncFdData) -> &Cell<bool>,
queue: impl Fn(&AsyncFdData) -> &Queue,
) -> Poll<Result<(), AsyncError>> {
if woken.get() || self.erroneous.get() {
return Poll::Ready(Ok(()));
) -> Poll<Result<FdStatus, AsyncError>> {
if let Some(status) = woken.get() {
return Poll::Ready(Ok(status));
}
if !registered(self).get() {
registered(self).set(true);
@ -591,30 +594,31 @@ mod fd {
impl EventLoopDispatcher for AsyncFdData {
fn dispatch(self: Rc<Self>, events: i32) -> Result<(), Box<dyn Error>> {
let mut status = FdStatus::Ok;
if events & (c::EPOLLERR | c::EPOLLHUP) != 0 {
self.erroneous.set(true);
status = FdStatus::Err;
if let Err(e) = self.el.remove(self.id) {
return Err(Box::new(e));
}
}
let mut woke_any = false;
if events & c::EPOLLIN != 0 || self.erroneous.get() {
if events & c::EPOLLIN != 0 || status == FdStatus::Err {
let mut readers = self.readers.borrow_mut();
woke_any |= !readers.is_empty();
for (waker, woken) in readers.drain(..) {
woken.set(true);
woken.set(Some(status));
waker.wake();
}
}
if events & c::EPOLLOUT != 0 || self.erroneous.get() {
if events & c::EPOLLOUT != 0 || status == FdStatus::Err {
let mut writers = self.writers.borrow_mut();
woke_any |= !writers.is_empty();
for (waker, woken) in writers.drain(..) {
woken.set(true);
woken.set(Some(status));
waker.wake();
}
}
if !woke_any && !self.erroneous.get() {
if !woke_any && status == FdStatus::Ok {
self.read_registered.set(false);
self.write_registered.set(false);
if let Err(e) = self.update_interests() {
@ -666,25 +670,31 @@ mod fd {
pub fn readable(&self) -> AsyncFdReadable {
AsyncFdReadable {
fd: self,
woken: Rc::new(Cell::new(false)),
woken: Rc::new(Cell::new(None)),
}
}
pub fn writable(&self) -> AsyncFdWritable {
AsyncFdWritable {
fd: self,
woken: Rc::new(Cell::new(false)),
woken: Rc::new(Cell::new(None)),
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum FdStatus {
Ok,
Err,
}
pub struct AsyncFdReadable<'a> {
fd: &'a AsyncFd,
woken: Rc<Cell<bool>>,
woken: Rc<Cell<Option<FdStatus>>>,
}
impl<'a> Future for AsyncFdReadable<'a> {
type Output = Result<(), AsyncError>;
type Output = Result<FdStatus, AsyncError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data = &self.fd.data;
@ -694,11 +704,11 @@ mod fd {
pub struct AsyncFdWritable<'a> {
fd: &'a AsyncFd,
woken: Rc<Cell<bool>>,
woken: Rc<Cell<Option<FdStatus>>>,
}
impl<'a> Future for AsyncFdWritable<'a> {
type Output = Result<(), AsyncError>;
type Output = Result<FdStatus, AsyncError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data = &self.fd.data;