1
0
Fork 0
forked from wry/wry

config: add async tasks and polling infrastructure

This commit is contained in:
Julian Orth 2024-03-05 18:28:15 +01:00
parent 7fb68561e8
commit d4d76c0ef3
13 changed files with 613 additions and 13 deletions

View file

@ -1,6 +1,7 @@
pub mod client;
pub mod ipc;
mod logging;
pub(crate) mod string_error;
use {
crate::video::Mode,
@ -58,3 +59,6 @@ impl WireMode {
}
}
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct PollableId(pub u64);

View file

@ -5,12 +5,13 @@ use {
_private::{
bincode_ops,
ipc::{ClientMessage, InitMessage, Response, ServerMessage},
logging, Config, ConfigEntry, ConfigEntryGen, WireMode, VERSION,
logging, Config, ConfigEntry, ConfigEntryGen, PollableId, WireMode, VERSION,
},
exec::Command,
input::{acceleration::AccelProfile, capability::Capability, InputDevice, Seat},
keyboard::Keymap,
logging::LogLevel,
tasks::JoinSlot,
theme::{colors::Colorable, sized::Resizable, Color},
timer::Timer,
video::{
@ -20,13 +21,22 @@ use {
Axis, Direction, ModifiedKeySym, PciId, Workspace,
},
bincode::Options,
futures_util::task::ArcWake,
std::{
cell::{Cell, RefCell},
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, VecDeque},
future::Future,
mem,
ops::Deref,
pin::Pin,
ptr,
rc::Rc,
slice,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex,
},
task::{Context, Poll, Waker},
time::Duration,
},
};
@ -50,6 +60,40 @@ pub(crate) struct Client {
on_idle: RefCell<Option<Rc<dyn Fn()>>>,
bufs: RefCell<Vec<Vec<u8>>>,
reload: Cell<bool>,
read_interests: RefCell<HashMap<PollableId, Interest>>,
write_interests: RefCell<HashMap<PollableId, Interest>>,
tasks: Tasks,
}
struct Interest {
result: Option<Result<(), String>>,
waker: Option<Waker>,
}
#[derive(Default)]
struct Tasks {
last_id: Cell<u64>,
ready_front: RefCell<VecDeque<u64>>,
ready_back: Arc<TasksBackBuffer>,
tasks: RefCell<HashMap<u64, Rc<RefCell<Task>>>>,
}
#[derive(Default)]
struct TasksBackBuffer {
any: AtomicBool,
tasks: Mutex<VecDeque<u64>>,
}
impl TasksBackBuffer {
fn append(&self, task: u64) {
self.tasks.lock().unwrap().push_back(task);
self.any.store(true, Relaxed);
}
}
struct Task {
task: Pin<Box<dyn Future<Output = ()>>>,
waker: Waker,
}
impl Drop for Client {
@ -138,6 +182,9 @@ pub unsafe extern "C" fn init(
on_idle: Default::default(),
bufs: Default::default(),
reload: Cell::new(false),
read_interests: Default::default(),
write_interests: Default::default(),
tasks: Default::default(),
});
let init = slice::from_raw_parts(init, size);
client.handle_init_msg(init);
@ -729,7 +776,122 @@ impl Client {
})
}
pub fn create_pollable(&self, fd: i32) -> Result<PollableId, String> {
let res = self.send_with_response(&ClientMessage::AddPollable { fd });
get_response!(
res,
Err("Compositor did not send a response".to_string()),
AddPollable { id }
);
id
}
pub fn remove_pollable(&self, id: PollableId) {
self.send(&ClientMessage::RemovePollable { id });
self.write_interests.borrow_mut().remove(&id);
self.read_interests.borrow_mut().remove(&id);
}
pub fn poll_io(
&self,
pollable: PollableId,
writable: bool,
ctx: &mut Context<'_>,
) -> Poll<Result<(), String>> {
let interests = match writable {
true => &self.write_interests,
false => &self.read_interests,
};
let mut interests = interests.borrow_mut();
match interests.entry(pollable) {
Entry::Occupied(mut o) => {
let interest = o.get_mut();
if interest.result.is_some() {
Poll::Ready(o.remove().result.unwrap())
} else {
interest.waker = Some(ctx.waker().clone());
Poll::Pending
}
}
Entry::Vacant(v) => {
self.send(&ClientMessage::AddInterest { pollable, writable });
v.insert(Interest {
result: None,
waker: Some(ctx.waker().clone()),
});
Poll::Pending
}
}
}
fn handle_msg(&self, msg: &[u8]) {
self.handle_msg2(msg);
self.dispatch_futures();
}
fn dispatch_futures(&self) {
let futures = &self.tasks;
if !futures.ready_back.any.load(Relaxed) {
return;
}
let mut ready = futures.ready_front.borrow_mut();
loop {
mem::swap(&mut *ready, &mut *futures.ready_back.tasks.lock().unwrap());
futures.ready_back.any.store(false, Relaxed);
while let Some(id) = ready.pop_front() {
let fut = futures.tasks.borrow_mut().get(&id).cloned();
if let Some(fut) = fut {
let mut fut = fut.borrow_mut();
let fut = &mut *fut;
if let Poll::Ready(()) =
fut.task.as_mut().poll(&mut Context::from_waker(&fut.waker))
{
futures.tasks.borrow_mut().remove(&id);
}
}
}
if !futures.ready_back.any.load(Relaxed) {
return;
}
}
}
pub fn spawn_task<T: 'static>(&self, f: impl Future<Output = T> + 'static) -> Rc<JoinSlot<T>> {
struct Waker(Arc<TasksBackBuffer>, u64);
impl ArcWake for Waker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.append(arc_self.1);
}
}
let tasks = &self.tasks;
let id = tasks.last_id.get() + 1;
tasks.last_id.set(id);
let waker = futures_util::task::waker(Arc::new(Waker(tasks.ready_back.clone(), id)));
tasks.ready_back.append(id);
let slot = Rc::new(JoinSlot {
task_id: id,
slot: Cell::new(None),
waker: Cell::new(None),
});
let slot2 = slot.clone();
let task = Rc::new(RefCell::new(Task {
task: Box::pin(async move {
slot2.slot.set(Some(f.await));
if let Some(waker) = slot2.waker.take() {
waker.wake();
}
}),
waker,
}));
tasks.tasks.borrow_mut().insert(id, task);
slot
}
pub fn abort_task(&self, id: u64) {
self.tasks.tasks.borrow_mut().remove(&id);
}
fn handle_msg2(&self, msg: &[u8]) {
let res = bincode_ops().deserialize::<ServerMessage>(msg);
let msg = match res {
Ok(msg) => msg,
@ -813,6 +975,19 @@ impl Client {
handler();
}
}
ServerMessage::InterestReady { id, writable, res } => {
let interests = match writable {
true => &self.write_interests,
false => &self.read_interests,
};
let mut interests = interests.borrow_mut();
if let Some(interest) = interests.get_mut(&id) {
interest.result = Some(res);
if let Some(waker) = interest.waker.take() {
waker.wake();
}
}
}
}
}

View file

@ -7,7 +7,7 @@ use {
timer::Timer,
video::{connector_type::ConnectorType, Connector, DrmDevice, GfxApi, Transform},
Axis, Direction, PciId, Workspace,
_private::WireMode,
_private::{PollableId, WireMode},
},
serde::{Deserialize, Serialize},
std::time::Duration,
@ -57,6 +57,11 @@ pub enum ServerMessage {
},
Idle,
DevicesEnumerated,
InterestReady {
id: PollableId,
writable: bool,
res: Result<(), String>,
},
}
#[derive(Serialize, Deserialize, Debug)]
@ -360,6 +365,16 @@ pub enum ClientMessage<'a> {
connector: Connector,
mode: WireMode,
},
AddPollable {
fd: i32,
},
RemovePollable {
id: PollableId,
},
AddInterest {
pollable: PollableId,
writable: bool,
},
}
#[derive(Serialize, Deserialize, Debug)]
@ -465,6 +480,9 @@ pub enum Response {
ConnectorModes {
modes: Vec<WireMode>,
},
AddPollable {
id: Result<PollableId, String>,
},
}
#[derive(Serialize, Deserialize, Debug)]

View file

@ -0,0 +1,15 @@
use std::{
error::Error,
fmt::{Display, Formatter},
};
#[derive(Debug)]
pub struct StringError(pub String);
impl Display for StringError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl Error for StringError {}

194
jay-config/src/io.rs Normal file
View file

@ -0,0 +1,194 @@
//! Tools for IO operations.
use {
crate::_private::PollableId,
futures_util::{io::AsyncRead, AsyncWrite},
std::{
future::poll_fn,
io::{self, ErrorKind, IoSlice, IoSliceMut, Read, Write},
os::fd::{AsFd, AsRawFd},
pin::Pin,
task::{ready, Context, Poll},
},
thiserror::Error,
uapi::c,
};
#[derive(Debug, Error)]
enum AsyncError {
#[error("Could not retrieve the file description flags")]
GetFl(#[source] io::Error),
#[error("Could not set the file description flags")]
SetFl(#[source] io::Error),
#[error("This configuration has already been destroyed")]
Destroyed,
#[error("The compositor could not create the necessary data structures: {0}")]
CompositorSetup(String),
#[error("Could not poll the file description: {0}")]
Poll(String),
}
impl From<AsyncError> for io::Error {
fn from(value: AsyncError) -> Self {
io::Error::new(ErrorKind::Other, value)
}
}
/// An async adapter for types implementing [`AsFd`].
pub struct Async<T> {
id: PollableIdWrapper,
t: Option<T>,
}
impl<T> Unpin for Async<T> {}
struct PollableIdWrapper {
id: PollableId,
}
impl Drop for PollableIdWrapper {
fn drop(&mut self) {
get!().remove_pollable(self.id);
}
}
impl<T> Async<T>
where
T: AsFd,
{
/// Creates a new async adapter.
///
/// This takes ownership of the file description and duplicates the file descriptor.
/// You should not modify the file description while this object is in use, otherwise
/// the behavior is undefined.
pub fn new(t: T) -> Result<Self, io::Error> {
Ok(Self::new_(t)?)
}
fn new_(t: T) -> Result<Self, AsyncError> {
let fd = t.as_fd();
let fl = uapi::fcntl_getfl(fd.as_raw_fd())
.map_err(|e| AsyncError::GetFl(io::Error::from_raw_os_error(e.0)))?;
uapi::fcntl_setfl(fd.as_raw_fd(), fl | c::O_NONBLOCK)
.map_err(|e| AsyncError::SetFl(io::Error::from_raw_os_error(e.0)))?;
let id = get!(Err(AsyncError::Destroyed))
.create_pollable(fd.as_raw_fd())
.map_err(AsyncError::CompositorSetup)?;
Ok(Self {
id: PollableIdWrapper { id },
t: Some(t),
})
}
}
impl<T> Async<T> {
/// Unwraps the underlying object.
///
/// Note that the underlying object is still non-blocking at this point.
pub fn unwrap(self) -> T {
self.t.unwrap()
}
fn poll_(&self, writable: bool, cx: &mut Context<'_>) -> Poll<Result<(), AsyncError>> {
get!(Poll::Ready(Err(AsyncError::Destroyed)))
.poll_io(self.id.id, writable, cx)
.map_err(AsyncError::Poll)
}
async fn poll(&self, writable: bool) -> Result<(), io::Error> {
poll_fn(|cx| self.poll_(writable, cx)).await?;
Ok(())
}
/// Waits for the file description to become readable.
pub async fn readable(&self) -> Result<(), io::Error> {
self.poll(false).await
}
/// Waits for the file description to become writable.
pub async fn writable(&self) -> Result<(), io::Error> {
self.poll(true).await
}
}
impl<T> AsRef<T> for Async<T> {
fn as_ref(&self) -> &T {
self.t.as_ref().unwrap()
}
}
impl<T> AsMut<T> for Async<T> {
fn as_mut(&mut self) -> &mut T {
self.t.as_mut().unwrap()
}
}
fn poll_io<T, R>(
slf: &mut Async<T>,
writable: bool,
cx: &mut Context<'_>,
mut f: impl FnMut(&mut Async<T>) -> io::Result<R>,
) -> Poll<io::Result<R>> {
loop {
ready!(slf.poll_(writable, cx))?;
match f(slf) {
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
}
}
impl<T> AsyncRead for Async<T>
where
T: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
poll_io(self.get_mut(), false, cx, |slf| slf.as_mut().read(buf))
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
poll_io(self.get_mut(), false, cx, |slf| {
slf.as_mut().read_vectored(bufs)
})
}
}
impl<T> AsyncWrite for Async<T>
where
T: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().write(buf))
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
poll_io(self.get_mut(), true, cx, |slf| {
slf.as_mut().write_vectored(bufs)
})
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().flush())
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_mut().t.take();
Poll::Ready(Ok(()))
}
}

View file

@ -53,9 +53,11 @@ pub mod _private;
pub mod embedded;
pub mod exec;
pub mod input;
pub mod io;
pub mod keyboard;
pub mod logging;
pub mod status;
pub mod tasks;
pub mod theme;
pub mod timer;
pub mod video;

View file

@ -16,6 +16,20 @@ macro_rules! config {
};
}
macro_rules! try_get {
() => {{
#[allow(unused_unsafe)]
unsafe {
let client = crate::_private::client::CLIENT.with(|client| client.get());
if client.is_null() {
None
} else {
Some(&*client)
}
}
}};
}
macro_rules! get {
() => {{
get!(Default::default())

69
jay-config/src/tasks.rs Normal file
View file

@ -0,0 +1,69 @@
//! Tools for async task management.
use std::{
cell::Cell,
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
/// Spawns an asynchronous task that will run in the background.
pub fn spawn<T, F>(f: F) -> JoinHandle<T>
where
T: 'static,
F: Future<Output = T> + 'static,
{
let slot = match try_get!() {
None => Rc::new(JoinSlot {
task_id: 0,
slot: Cell::new(None),
waker: Cell::new(None),
}),
Some(c) => c.spawn_task(f),
};
JoinHandle { slot }
}
pub(crate) struct JoinSlot<T> {
pub task_id: u64,
pub slot: Cell<Option<T>>,
pub waker: Cell<Option<Waker>>,
}
/// A handle to join or abort a spawned task.
///
/// When the handle is dropped, the task continues to run in the background.
pub struct JoinHandle<T> {
slot: Rc<JoinSlot<T>>,
}
impl<T> Debug for JoinHandle<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JoinHandle")
.field("task_id", &self.slot.task_id)
.finish_non_exhaustive()
}
}
impl<T> Unpin for JoinHandle<T> {}
impl<T> JoinHandle<T> {
/// Aborts the task immediately.
pub fn abort(self) {
get!().abort_task(self.slot.task_id);
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(t) = self.slot.slot.take() {
return Poll::Ready(t);
}
self.slot.waker.set(Some(cx.waker().clone()));
Poll::Pending
}
}