Merge pull request #124 from mahkoh/jorth/config-fds
config: add async tasks and polling infrastructure
This commit is contained in:
commit
37fc28c749
14 changed files with 654 additions and 18 deletions
28
Cargo.lock
generated
28
Cargo.lock
generated
|
|
@ -350,6 +350,12 @@ version = "0.3.30"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.30"
|
||||
|
|
@ -374,8 +380,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
|
|
@ -531,9 +539,13 @@ dependencies = [
|
|||
name = "jay-config"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bincode",
|
||||
"futures-util",
|
||||
"log",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"uapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -691,18 +703,18 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.4"
|
||||
version = "1.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0"
|
||||
checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.4"
|
||||
version = "1.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
|
||||
checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
|
@ -969,18 +981,18 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.56"
|
||||
version = "1.0.57"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad"
|
||||
checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.56"
|
||||
version = "1.0.57"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471"
|
||||
checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
|
|
|||
|
|
@ -9,3 +9,7 @@ description = "Configuration crate for the Jay compositor"
|
|||
bincode = "1.3.3"
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
log = "0.4.14"
|
||||
futures-util = { version = "0.3.30", features = ["io"] }
|
||||
uapi = "0.2.10"
|
||||
thiserror = "1.0.57"
|
||||
backtrace = "0.3.69"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
pub mod client;
|
||||
pub mod ipc;
|
||||
mod logging;
|
||||
pub(crate) mod string_error;
|
||||
|
||||
use {
|
||||
crate::video::Mode,
|
||||
|
|
@ -58,3 +59,6 @@ impl WireMode {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct PollableId(pub u64);
|
||||
|
|
|
|||
|
|
@ -5,12 +5,13 @@ use {
|
|||
_private::{
|
||||
bincode_ops,
|
||||
ipc::{ClientMessage, InitMessage, Response, ServerMessage},
|
||||
logging, Config, ConfigEntry, ConfigEntryGen, WireMode, VERSION,
|
||||
logging, Config, ConfigEntry, ConfigEntryGen, PollableId, WireMode, VERSION,
|
||||
},
|
||||
exec::Command,
|
||||
input::{acceleration::AccelProfile, capability::Capability, InputDevice, Seat},
|
||||
keyboard::Keymap,
|
||||
logging::LogLevel,
|
||||
tasks::JoinSlot,
|
||||
theme::{colors::Colorable, sized::Resizable, Color},
|
||||
timer::Timer,
|
||||
video::{
|
||||
|
|
@ -20,13 +21,22 @@ use {
|
|||
Axis, Direction, ModifiedKeySym, PciId, Workspace,
|
||||
},
|
||||
bincode::Options,
|
||||
futures_util::task::ArcWake,
|
||||
std::{
|
||||
cell::{Cell, RefCell},
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
collections::{hash_map::Entry, HashMap, VecDeque},
|
||||
future::Future,
|
||||
mem,
|
||||
ops::Deref,
|
||||
pin::Pin,
|
||||
ptr,
|
||||
rc::Rc,
|
||||
slice,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering::Relaxed},
|
||||
Arc, Mutex,
|
||||
},
|
||||
task::{Context, Poll, Waker},
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
|
|
@ -50,6 +60,40 @@ pub(crate) struct Client {
|
|||
on_idle: RefCell<Option<Rc<dyn Fn()>>>,
|
||||
bufs: RefCell<Vec<Vec<u8>>>,
|
||||
reload: Cell<bool>,
|
||||
read_interests: RefCell<HashMap<PollableId, Interest>>,
|
||||
write_interests: RefCell<HashMap<PollableId, Interest>>,
|
||||
tasks: Tasks,
|
||||
}
|
||||
|
||||
struct Interest {
|
||||
result: Option<Result<(), String>>,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Tasks {
|
||||
last_id: Cell<u64>,
|
||||
ready_front: RefCell<VecDeque<u64>>,
|
||||
ready_back: Arc<TasksBackBuffer>,
|
||||
tasks: RefCell<HashMap<u64, Rc<RefCell<Task>>>>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TasksBackBuffer {
|
||||
any: AtomicBool,
|
||||
tasks: Mutex<VecDeque<u64>>,
|
||||
}
|
||||
|
||||
impl TasksBackBuffer {
|
||||
fn append(&self, task: u64) {
|
||||
self.tasks.lock().unwrap().push_back(task);
|
||||
self.any.store(true, Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct Task {
|
||||
task: Pin<Box<dyn Future<Output = ()>>>,
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl Drop for Client {
|
||||
|
|
@ -138,6 +182,9 @@ pub unsafe extern "C" fn init(
|
|||
on_idle: Default::default(),
|
||||
bufs: Default::default(),
|
||||
reload: Cell::new(false),
|
||||
read_interests: Default::default(),
|
||||
write_interests: Default::default(),
|
||||
tasks: Default::default(),
|
||||
});
|
||||
let init = slice::from_raw_parts(init, size);
|
||||
client.handle_init_msg(init);
|
||||
|
|
@ -729,7 +776,122 @@ impl Client {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn create_pollable(&self, fd: i32) -> Result<PollableId, String> {
|
||||
let res = self.send_with_response(&ClientMessage::AddPollable { fd });
|
||||
get_response!(
|
||||
res,
|
||||
Err("Compositor did not send a response".to_string()),
|
||||
AddPollable { id }
|
||||
);
|
||||
id
|
||||
}
|
||||
|
||||
pub fn remove_pollable(&self, id: PollableId) {
|
||||
self.send(&ClientMessage::RemovePollable { id });
|
||||
self.write_interests.borrow_mut().remove(&id);
|
||||
self.read_interests.borrow_mut().remove(&id);
|
||||
}
|
||||
|
||||
pub fn poll_io(
|
||||
&self,
|
||||
pollable: PollableId,
|
||||
writable: bool,
|
||||
ctx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), String>> {
|
||||
let interests = match writable {
|
||||
true => &self.write_interests,
|
||||
false => &self.read_interests,
|
||||
};
|
||||
let mut interests = interests.borrow_mut();
|
||||
match interests.entry(pollable) {
|
||||
Entry::Occupied(mut o) => {
|
||||
let interest = o.get_mut();
|
||||
if interest.result.is_some() {
|
||||
Poll::Ready(o.remove().result.unwrap())
|
||||
} else {
|
||||
interest.waker = Some(ctx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
self.send(&ClientMessage::AddInterest { pollable, writable });
|
||||
v.insert(Interest {
|
||||
result: None,
|
||||
waker: Some(ctx.waker().clone()),
|
||||
});
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_msg(&self, msg: &[u8]) {
|
||||
self.handle_msg2(msg);
|
||||
self.dispatch_futures();
|
||||
}
|
||||
|
||||
fn dispatch_futures(&self) {
|
||||
let futures = &self.tasks;
|
||||
if !futures.ready_back.any.load(Relaxed) {
|
||||
return;
|
||||
}
|
||||
let mut ready = futures.ready_front.borrow_mut();
|
||||
loop {
|
||||
mem::swap(&mut *ready, &mut *futures.ready_back.tasks.lock().unwrap());
|
||||
futures.ready_back.any.store(false, Relaxed);
|
||||
while let Some(id) = ready.pop_front() {
|
||||
let fut = futures.tasks.borrow_mut().get(&id).cloned();
|
||||
if let Some(fut) = fut {
|
||||
let mut fut = fut.borrow_mut();
|
||||
let fut = &mut *fut;
|
||||
if let Poll::Ready(()) =
|
||||
fut.task.as_mut().poll(&mut Context::from_waker(&fut.waker))
|
||||
{
|
||||
futures.tasks.borrow_mut().remove(&id);
|
||||
}
|
||||
}
|
||||
}
|
||||
if !futures.ready_back.any.load(Relaxed) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_task<T: 'static>(&self, f: impl Future<Output = T> + 'static) -> Rc<JoinSlot<T>> {
|
||||
struct Waker(Arc<TasksBackBuffer>, u64);
|
||||
impl ArcWake for Waker {
|
||||
fn wake_by_ref(arc_self: &Arc<Self>) {
|
||||
arc_self.0.append(arc_self.1);
|
||||
}
|
||||
}
|
||||
let tasks = &self.tasks;
|
||||
let id = tasks.last_id.get() + 1;
|
||||
tasks.last_id.set(id);
|
||||
let waker = futures_util::task::waker(Arc::new(Waker(tasks.ready_back.clone(), id)));
|
||||
tasks.ready_back.append(id);
|
||||
let slot = Rc::new(JoinSlot {
|
||||
task_id: id,
|
||||
slot: Cell::new(None),
|
||||
waker: Cell::new(None),
|
||||
});
|
||||
let slot2 = slot.clone();
|
||||
let task = Rc::new(RefCell::new(Task {
|
||||
task: Box::pin(async move {
|
||||
slot2.slot.set(Some(f.await));
|
||||
if let Some(waker) = slot2.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}),
|
||||
waker,
|
||||
}));
|
||||
tasks.tasks.borrow_mut().insert(id, task);
|
||||
slot
|
||||
}
|
||||
|
||||
pub fn abort_task(&self, id: u64) {
|
||||
self.tasks.tasks.borrow_mut().remove(&id);
|
||||
}
|
||||
|
||||
fn handle_msg2(&self, msg: &[u8]) {
|
||||
let res = bincode_ops().deserialize::<ServerMessage>(msg);
|
||||
let msg = match res {
|
||||
Ok(msg) => msg,
|
||||
|
|
@ -813,6 +975,19 @@ impl Client {
|
|||
handler();
|
||||
}
|
||||
}
|
||||
ServerMessage::InterestReady { id, writable, res } => {
|
||||
let interests = match writable {
|
||||
true => &self.write_interests,
|
||||
false => &self.read_interests,
|
||||
};
|
||||
let mut interests = interests.borrow_mut();
|
||||
if let Some(interest) = interests.get_mut(&id) {
|
||||
interest.result = Some(res);
|
||||
if let Some(waker) = interest.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ use {
|
|||
timer::Timer,
|
||||
video::{connector_type::ConnectorType, Connector, DrmDevice, GfxApi, Transform},
|
||||
Axis, Direction, PciId, Workspace,
|
||||
_private::WireMode,
|
||||
_private::{PollableId, WireMode},
|
||||
},
|
||||
serde::{Deserialize, Serialize},
|
||||
std::time::Duration,
|
||||
|
|
@ -57,6 +57,11 @@ pub enum ServerMessage {
|
|||
},
|
||||
Idle,
|
||||
DevicesEnumerated,
|
||||
InterestReady {
|
||||
id: PollableId,
|
||||
writable: bool,
|
||||
res: Result<(), String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
|
@ -360,6 +365,16 @@ pub enum ClientMessage<'a> {
|
|||
connector: Connector,
|
||||
mode: WireMode,
|
||||
},
|
||||
AddPollable {
|
||||
fd: i32,
|
||||
},
|
||||
RemovePollable {
|
||||
id: PollableId,
|
||||
},
|
||||
AddInterest {
|
||||
pollable: PollableId,
|
||||
writable: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
|
@ -465,6 +480,9 @@ pub enum Response {
|
|||
ConnectorModes {
|
||||
modes: Vec<WireMode>,
|
||||
},
|
||||
AddPollable {
|
||||
id: Result<PollableId, String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
|
|
|||
|
|
@ -1,11 +1,31 @@
|
|||
use {
|
||||
crate::logging::LogLevel,
|
||||
backtrace::Backtrace,
|
||||
log::{Level, LevelFilter, Log, Metadata, Record},
|
||||
};
|
||||
|
||||
pub fn init() {
|
||||
let _ = log::set_logger(&Logger);
|
||||
log::set_max_level(LevelFilter::Trace);
|
||||
std::panic::set_hook(Box::new(|p| {
|
||||
if let Some(loc) = p.location() {
|
||||
log::error!(
|
||||
"Panic at {} line {} column {}",
|
||||
loc.file(),
|
||||
loc.line(),
|
||||
loc.column()
|
||||
);
|
||||
} else {
|
||||
log::error!("Panic at unknown location");
|
||||
}
|
||||
if let Some(msg) = p.payload().downcast_ref::<&str>() {
|
||||
log::error!("Message: {}", msg);
|
||||
}
|
||||
if let Some(msg) = p.payload().downcast_ref::<String>() {
|
||||
log::error!("Message: {}", msg);
|
||||
}
|
||||
log::error!("Backtrace:\n{:?}", Backtrace::new());
|
||||
}));
|
||||
}
|
||||
|
||||
struct Logger;
|
||||
|
|
|
|||
15
jay-config/src/_private/string_error.rs
Normal file
15
jay-config/src/_private/string_error.rs
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
use std::{
|
||||
error::Error,
|
||||
fmt::{Display, Formatter},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StringError(pub String);
|
||||
|
||||
impl Display for StringError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for StringError {}
|
||||
194
jay-config/src/io.rs
Normal file
194
jay-config/src/io.rs
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
//! Tools for IO operations.
|
||||
|
||||
use {
|
||||
crate::_private::PollableId,
|
||||
futures_util::{io::AsyncRead, AsyncWrite},
|
||||
std::{
|
||||
future::poll_fn,
|
||||
io::{self, ErrorKind, IoSlice, IoSliceMut, Read, Write},
|
||||
os::fd::{AsFd, AsRawFd},
|
||||
pin::Pin,
|
||||
task::{ready, Context, Poll},
|
||||
},
|
||||
thiserror::Error,
|
||||
uapi::c,
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum AsyncError {
|
||||
#[error("Could not retrieve the file description flags")]
|
||||
GetFl(#[source] io::Error),
|
||||
#[error("Could not set the file description flags")]
|
||||
SetFl(#[source] io::Error),
|
||||
#[error("This configuration has already been destroyed")]
|
||||
Destroyed,
|
||||
#[error("The compositor could not create the necessary data structures: {0}")]
|
||||
CompositorSetup(String),
|
||||
#[error("Could not poll the file description: {0}")]
|
||||
Poll(String),
|
||||
}
|
||||
|
||||
impl From<AsyncError> for io::Error {
|
||||
fn from(value: AsyncError) -> Self {
|
||||
io::Error::new(ErrorKind::Other, value)
|
||||
}
|
||||
}
|
||||
|
||||
/// An async adapter for types implementing [`AsFd`].
|
||||
pub struct Async<T> {
|
||||
id: PollableIdWrapper,
|
||||
t: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> Unpin for Async<T> {}
|
||||
|
||||
struct PollableIdWrapper {
|
||||
id: PollableId,
|
||||
}
|
||||
|
||||
impl Drop for PollableIdWrapper {
|
||||
fn drop(&mut self) {
|
||||
get!().remove_pollable(self.id);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Async<T>
|
||||
where
|
||||
T: AsFd,
|
||||
{
|
||||
/// Creates a new async adapter.
|
||||
///
|
||||
/// This takes ownership of the file description and duplicates the file descriptor.
|
||||
/// You should not modify the file description while this object is in use, otherwise
|
||||
/// the behavior is undefined.
|
||||
pub fn new(t: T) -> Result<Self, io::Error> {
|
||||
Ok(Self::new_(t)?)
|
||||
}
|
||||
|
||||
fn new_(t: T) -> Result<Self, AsyncError> {
|
||||
let fd = t.as_fd();
|
||||
let fl = uapi::fcntl_getfl(fd.as_raw_fd())
|
||||
.map_err(|e| AsyncError::GetFl(io::Error::from_raw_os_error(e.0)))?;
|
||||
uapi::fcntl_setfl(fd.as_raw_fd(), fl | c::O_NONBLOCK)
|
||||
.map_err(|e| AsyncError::SetFl(io::Error::from_raw_os_error(e.0)))?;
|
||||
let id = get!(Err(AsyncError::Destroyed))
|
||||
.create_pollable(fd.as_raw_fd())
|
||||
.map_err(AsyncError::CompositorSetup)?;
|
||||
Ok(Self {
|
||||
id: PollableIdWrapper { id },
|
||||
t: Some(t),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Async<T> {
|
||||
/// Unwraps the underlying object.
|
||||
///
|
||||
/// Note that the underlying object is still non-blocking at this point.
|
||||
pub fn unwrap(self) -> T {
|
||||
self.t.unwrap()
|
||||
}
|
||||
|
||||
fn poll_(&self, writable: bool, cx: &mut Context<'_>) -> Poll<Result<(), AsyncError>> {
|
||||
get!(Poll::Ready(Err(AsyncError::Destroyed)))
|
||||
.poll_io(self.id.id, writable, cx)
|
||||
.map_err(AsyncError::Poll)
|
||||
}
|
||||
|
||||
async fn poll(&self, writable: bool) -> Result<(), io::Error> {
|
||||
poll_fn(|cx| self.poll_(writable, cx)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Waits for the file description to become readable.
|
||||
pub async fn readable(&self) -> Result<(), io::Error> {
|
||||
self.poll(false).await
|
||||
}
|
||||
|
||||
/// Waits for the file description to become writable.
|
||||
pub async fn writable(&self) -> Result<(), io::Error> {
|
||||
self.poll(true).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsRef<T> for Async<T> {
|
||||
fn as_ref(&self) -> &T {
|
||||
self.t.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsMut<T> for Async<T> {
|
||||
fn as_mut(&mut self) -> &mut T {
|
||||
self.t.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_io<T, R>(
|
||||
slf: &mut Async<T>,
|
||||
writable: bool,
|
||||
cx: &mut Context<'_>,
|
||||
mut f: impl FnMut(&mut Async<T>) -> io::Result<R>,
|
||||
) -> Poll<io::Result<R>> {
|
||||
loop {
|
||||
ready!(slf.poll_(writable, cx))?;
|
||||
match f(slf) {
|
||||
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsyncRead for Async<T>
|
||||
where
|
||||
T: Read,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_io(self.get_mut(), false, cx, |slf| slf.as_mut().read(buf))
|
||||
}
|
||||
|
||||
fn poll_read_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &mut [IoSliceMut<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_io(self.get_mut(), false, cx, |slf| {
|
||||
slf.as_mut().read_vectored(bufs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsyncWrite for Async<T>
|
||||
where
|
||||
T: Write,
|
||||
{
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().write(buf))
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_io(self.get_mut(), true, cx, |slf| {
|
||||
slf.as_mut().write_vectored(bufs)
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().flush())
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.get_mut().t.take();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
|
@ -53,9 +53,11 @@ pub mod _private;
|
|||
pub mod embedded;
|
||||
pub mod exec;
|
||||
pub mod input;
|
||||
pub mod io;
|
||||
pub mod keyboard;
|
||||
pub mod logging;
|
||||
pub mod status;
|
||||
pub mod tasks;
|
||||
pub mod theme;
|
||||
pub mod timer;
|
||||
pub mod video;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,20 @@ macro_rules! config {
|
|||
};
|
||||
}
|
||||
|
||||
macro_rules! try_get {
|
||||
() => {{
|
||||
#[allow(unused_unsafe)]
|
||||
unsafe {
|
||||
let client = crate::_private::client::CLIENT.with(|client| client.get());
|
||||
if client.is_null() {
|
||||
None
|
||||
} else {
|
||||
Some(&*client)
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! get {
|
||||
() => {{
|
||||
get!(Default::default())
|
||||
|
|
|
|||
69
jay-config/src/tasks.rs
Normal file
69
jay-config/src/tasks.rs
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
//! Tools for async task management.
|
||||
|
||||
use std::{
|
||||
cell::Cell,
|
||||
fmt::{Debug, Formatter},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
task::{Context, Poll, Waker},
|
||||
};
|
||||
|
||||
/// Spawns an asynchronous task that will run in the background.
|
||||
pub fn spawn<T, F>(f: F) -> JoinHandle<T>
|
||||
where
|
||||
T: 'static,
|
||||
F: Future<Output = T> + 'static,
|
||||
{
|
||||
let slot = match try_get!() {
|
||||
None => Rc::new(JoinSlot {
|
||||
task_id: 0,
|
||||
slot: Cell::new(None),
|
||||
waker: Cell::new(None),
|
||||
}),
|
||||
Some(c) => c.spawn_task(f),
|
||||
};
|
||||
JoinHandle { slot }
|
||||
}
|
||||
|
||||
pub(crate) struct JoinSlot<T> {
|
||||
pub task_id: u64,
|
||||
pub slot: Cell<Option<T>>,
|
||||
pub waker: Cell<Option<Waker>>,
|
||||
}
|
||||
|
||||
/// A handle to join or abort a spawned task.
|
||||
///
|
||||
/// When the handle is dropped, the task continues to run in the background.
|
||||
pub struct JoinHandle<T> {
|
||||
slot: Rc<JoinSlot<T>>,
|
||||
}
|
||||
|
||||
impl<T> Debug for JoinHandle<T> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("JoinHandle")
|
||||
.field("task_id", &self.slot.task_id)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for JoinHandle<T> {}
|
||||
|
||||
impl<T> JoinHandle<T> {
|
||||
/// Aborts the task immediately.
|
||||
pub fn abort(self) {
|
||||
get!().abort_task(self.slot.task_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for JoinHandle<T> {
|
||||
type Output = T;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
if let Some(t) = self.slot.slot.take() {
|
||||
return Poll::Ready(t);
|
||||
}
|
||||
self.slot.waker.set(Some(cx.waker().clone()));
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
|
@ -156,9 +156,15 @@ unsafe extern "C" fn default_client_init(
|
|||
}
|
||||
|
||||
impl ConfigProxy {
|
||||
fn new(lib: Option<Library>, entry: &ConfigEntry, state: &Rc<State>) -> Self {
|
||||
fn new(
|
||||
lib: Option<Library>,
|
||||
entry: &ConfigEntry,
|
||||
state: &Rc<State>,
|
||||
path: Option<String>,
|
||||
) -> Self {
|
||||
let version = entry.version.min(VERSION);
|
||||
let data = Rc::new(ConfigProxyHandler {
|
||||
path,
|
||||
client_data: Cell::new(ptr::null()),
|
||||
dropped: Cell::new(false),
|
||||
_lib: lib,
|
||||
|
|
@ -175,6 +181,8 @@ impl ConfigProxy {
|
|||
timer_ids: NumCell::new(1),
|
||||
timers_by_name: Default::default(),
|
||||
timers_by_id: Default::default(),
|
||||
pollable_id: Default::default(),
|
||||
pollables: Default::default(),
|
||||
});
|
||||
let init_msg = bincode_ops()
|
||||
.serialize(&InitMessage::V1(V1InitMessage {}))
|
||||
|
|
@ -205,12 +213,12 @@ impl ConfigProxy {
|
|||
unref: jay_config::_private::client::unref,
|
||||
handle_msg: jay_config::_private::client::handle_msg,
|
||||
};
|
||||
Self::new(None, &entry, state)
|
||||
Self::new(None, &entry, state, None)
|
||||
}
|
||||
|
||||
#[cfg(feature = "it")]
|
||||
pub fn for_test(state: &Rc<State>) -> Self {
|
||||
Self::new(None, &TEST_CONFIG_ENTRY, state)
|
||||
Self::new(None, &TEST_CONFIG_ENTRY, state, None)
|
||||
}
|
||||
|
||||
pub fn from_config_dir(state: &Rc<State>) -> Result<Self, ConfigError> {
|
||||
|
|
@ -249,7 +257,7 @@ impl ConfigProxy {
|
|||
if let Err(e) = std::fs::copy(path, ©) {
|
||||
return Err(ConfigError::CopyConfigFile(e));
|
||||
}
|
||||
let _unlink = UnlinkOnDrop(©);
|
||||
let unlink = UnlinkOnDrop(©);
|
||||
let lib = match Library::new(©) {
|
||||
Ok(l) => l,
|
||||
Err(e) => return Err(ConfigError::CouldNotLoadLibrary(e)),
|
||||
|
|
@ -259,7 +267,8 @@ impl ConfigProxy {
|
|||
Ok(e) => *e,
|
||||
Err(e) => return Err(ConfigError::LibraryDoesNotContainEntry(e)),
|
||||
};
|
||||
Ok(Self::new(Some(lib), entry, state))
|
||||
mem::forget(unlink);
|
||||
Ok(Self::new(Some(lib), entry, state, Some(copy)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,15 +8,18 @@ use {
|
|||
compositor::MAX_EXTENTS,
|
||||
config::ConfigProxy,
|
||||
ifs::wl_seat::{SeatId, WlSeatGlobal},
|
||||
io_uring::TaskResultExt,
|
||||
scale::Scale,
|
||||
state::{ConnectorData, DeviceHandlerData, DrmDevData, OutputData, State},
|
||||
theme::{Color, ThemeSized, DEFAULT_FONT},
|
||||
tree::{ContainerNode, ContainerSplit, FloatNode, Node, NodeVisitorBase, OutputNode},
|
||||
utils::{
|
||||
asyncevent::AsyncEvent,
|
||||
copyhashmap::CopyHashMap,
|
||||
debug_fn::debug_fn,
|
||||
errorfmt::ErrorFmt,
|
||||
numcell::NumCell,
|
||||
oserror::OsError,
|
||||
stack::Stack,
|
||||
timer::{TimerError, TimerFd},
|
||||
},
|
||||
|
|
@ -27,7 +30,7 @@ use {
|
|||
_private::{
|
||||
bincode_ops,
|
||||
ipc::{ClientMessage, Response, ServerMessage},
|
||||
WireMode,
|
||||
PollableId, WireMode,
|
||||
},
|
||||
input::{
|
||||
acceleration::{AccelProfile, ACCEL_PROFILE_ADAPTIVE, ACCEL_PROFILE_FLAT},
|
||||
|
|
@ -48,10 +51,11 @@ use {
|
|||
log::Level,
|
||||
std::{cell::Cell, ops::Deref, rc::Rc, time::Duration},
|
||||
thiserror::Error,
|
||||
uapi::c,
|
||||
uapi::{c, fcntl_dupfd_cloexec},
|
||||
};
|
||||
|
||||
pub(super) struct ConfigProxyHandler {
|
||||
pub path: Option<String>,
|
||||
pub client_data: Cell<*const u8>,
|
||||
pub dropped: Cell<bool>,
|
||||
pub _lib: Option<Library>,
|
||||
|
|
@ -70,6 +74,16 @@ pub(super) struct ConfigProxyHandler {
|
|||
pub timer_ids: NumCell<u64>,
|
||||
pub timers_by_name: CopyHashMap<Rc<String>, Rc<TimerData>>,
|
||||
pub timers_by_id: CopyHashMap<u64, Rc<TimerData>>,
|
||||
|
||||
pub pollable_id: NumCell<u64>,
|
||||
pub pollables: CopyHashMap<PollableId, Rc<Pollable>>,
|
||||
}
|
||||
|
||||
pub struct Pollable {
|
||||
write_trigger: Rc<AsyncEvent>,
|
||||
_write_future: SpawnedFuture<()>,
|
||||
read_trigger: Rc<AsyncEvent>,
|
||||
_read_future: SpawnedFuture<()>,
|
||||
}
|
||||
|
||||
pub(super) struct TimerData {
|
||||
|
|
@ -85,6 +99,14 @@ impl ConfigProxyHandler {
|
|||
|
||||
self.timers_by_name.clear();
|
||||
self.timers_by_id.clear();
|
||||
|
||||
self.pollables.clear();
|
||||
|
||||
if let Some(path) = &self.path {
|
||||
if let Err(e) = uapi::unlink(path.as_str()) {
|
||||
log::error!("Could not unlink {}: {}", path, ErrorFmt(OsError(e.0)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: &ServerMessage) {
|
||||
|
|
@ -1027,6 +1049,74 @@ impl ConfigProxyHandler {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_add_pollable(self: &Rc<Self>, fd: i32) -> Result<(), CphError> {
|
||||
let fd = match fcntl_dupfd_cloexec(fd, 0) {
|
||||
Ok(fd) => Rc::new(fd),
|
||||
Err(e) => {
|
||||
let err = format!(
|
||||
"Could not invoke F_DUPFD_CLOEXEC: {}",
|
||||
ErrorFmt(OsError::from(e))
|
||||
);
|
||||
log::error!("{}", err);
|
||||
self.respond(Response::AddPollable { id: Err(err) });
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let id = self.pollable_id.fetch_add(1);
|
||||
let id = PollableId(id);
|
||||
let create = |writable: bool, events: c::c_short| {
|
||||
let event = Rc::new(AsyncEvent::default());
|
||||
let slf = self.clone();
|
||||
let trigger = event.clone();
|
||||
let fd = fd.clone();
|
||||
let future = self.state.eng.spawn(async move {
|
||||
loop {
|
||||
trigger.triggered().await;
|
||||
let res = slf.state.ring.poll(&fd, events).await.merge();
|
||||
if let Err(e) = &res {
|
||||
log::warn!("Could not poll fd: {}", ErrorFmt(e));
|
||||
}
|
||||
let res = res.map_err(|e| ErrorFmt(e).to_string()).map(drop);
|
||||
slf.send(&ServerMessage::InterestReady { id, writable, res });
|
||||
}
|
||||
});
|
||||
(event, future)
|
||||
};
|
||||
let (read_trigger, _read_future) = create(false, c::POLLIN);
|
||||
let (write_trigger, _write_future) = create(true, c::POLLOUT);
|
||||
self.pollables.set(
|
||||
id,
|
||||
Rc::new(Pollable {
|
||||
write_trigger,
|
||||
_write_future,
|
||||
read_trigger,
|
||||
_read_future,
|
||||
}),
|
||||
);
|
||||
self.respond(Response::AddPollable { id: Ok(id) });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_remove_pollable(self: &Rc<Self>, id: PollableId) {
|
||||
self.pollables.remove(&id);
|
||||
}
|
||||
|
||||
fn handle_add_interest(
|
||||
self: &Rc<Self>,
|
||||
id: PollableId,
|
||||
writable: bool,
|
||||
) -> Result<(), CphError> {
|
||||
let Some(pollable) = self.pollables.get(&id) else {
|
||||
return Err(CphError::PollableDoesNotExist);
|
||||
};
|
||||
let trigger = match writable {
|
||||
true => &pollable.write_trigger,
|
||||
false => &pollable.read_trigger,
|
||||
};
|
||||
trigger.trigger();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spaces_change(&self) {
|
||||
struct V;
|
||||
impl NodeVisitorBase for V {
|
||||
|
|
@ -1408,6 +1498,13 @@ impl ConfigProxyHandler {
|
|||
ClientMessage::ConnectorSetMode { connector, mode } => self
|
||||
.handle_connector_set_mode(connector, mode)
|
||||
.wrn("connector_set_mode")?,
|
||||
ClientMessage::AddPollable { fd } => {
|
||||
self.handle_add_pollable(fd).wrn("add_pollable")?
|
||||
}
|
||||
ClientMessage::RemovePollable { id } => self.handle_remove_pollable(id),
|
||||
ClientMessage::AddInterest { pollable, writable } => self
|
||||
.handle_add_interest(pollable, writable)
|
||||
.wrn("add_interest")?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1465,6 +1562,8 @@ enum CphError {
|
|||
ScaleTooLarge(f64),
|
||||
#[error("Tried to set a negative cursor size")]
|
||||
NegativeCursorSize,
|
||||
#[error("Config referred to a pollable that does not exist")]
|
||||
PollableDoesNotExist,
|
||||
}
|
||||
|
||||
trait WithRequestName {
|
||||
|
|
|
|||
|
|
@ -106,6 +106,7 @@ unsafe extern "C" fn handle_msg(data: *const u8, msg: *const u8, size: usize) {
|
|||
ServerMessage::DelDrmDev { .. } => {}
|
||||
ServerMessage::Idle => {}
|
||||
ServerMessage::DevicesEnumerated => {}
|
||||
ServerMessage::InterestReady { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue