1
0
Fork 0
forked from wry/wry

io-uring: add readable/writable

This commit is contained in:
Julian Orth 2022-05-12 20:33:58 +02:00
parent 25d817b722
commit dcdd91c0b0
31 changed files with 285 additions and 189 deletions

View file

@ -4,7 +4,7 @@ mod video;
use { use {
crate::{ crate::{
async_engine::{AsyncError, AsyncFd, SpawnedFuture}, async_engine::SpawnedFuture,
backend::{ backend::{
Backend, InputDevice, InputDeviceAccelProfile, InputDeviceCapability, InputDeviceId, Backend, InputDevice, InputDeviceAccelProfile, InputDeviceCapability, InputDeviceId,
InputEvent, KeyState, TransformMatrix, InputEvent, KeyState, TransformMatrix,
@ -103,8 +103,6 @@ pub enum MetalError {
CreateEncoder(#[source] DrmError), CreateEncoder(#[source] DrmError),
#[error(transparent)] #[error(transparent)]
DrmError(#[from] DrmError), DrmError(#[from] DrmError),
#[error("Could not create an async fd")]
CreateAsyncFd(#[source] AsyncError),
#[error("Could not create device-paused signal handler")] #[error("Could not create device-paused signal handler")]
DevicePauseSignalHandler(#[source] DbusError), DevicePauseSignalHandler(#[source] DbusError),
#[error("Could not create device-resumed signal handler")] #[error("Could not create device-resumed signal handler")]
@ -115,9 +113,9 @@ pub struct MetalBackend {
state: Rc<State>, state: Rc<State>,
udev: Rc<Udev>, udev: Rc<Udev>,
monitor: Rc<UdevMonitor>, monitor: Rc<UdevMonitor>,
monitor_fd: AsyncFd, monitor_fd: Rc<OwnedFd>,
libinput: Rc<LibInput>, libinput: Rc<LibInput>,
libinput_fd: AsyncFd, libinput_fd: Rc<OwnedFd>,
device_holder: Rc<DeviceHolder>, device_holder: Rc<DeviceHolder>,
session: Session, session: Session,
pause_handler: Cell<Option<SignalHandler>>, pause_handler: Cell<Option<SignalHandler>>,
@ -204,12 +202,9 @@ impl Backend for MetalBackend {
} }
} }
fn dup_async_fd(state: &Rc<State>, fd: c::c_int) -> Result<AsyncFd, MetalError> { fn dup_fd(fd: c::c_int) -> Result<Rc<OwnedFd>, MetalError> {
match uapi::fcntl_dupfd_cloexec(fd, 0) { match uapi::fcntl_dupfd_cloexec(fd, 0) {
Ok(m) => match state.eng.fd(&Rc::new(m)) { Ok(m) => Ok(Rc::new(m)),
Ok(fd) => Ok(fd),
Err(e) => Err(MetalError::CreateAsyncFd(e)),
},
Err(e) => Err(MetalError::Dup(e.into())), Err(e) => Err(MetalError::Dup(e.into())),
} }
} }
@ -238,8 +233,8 @@ pub async fn create(state: &Rc<State>) -> Result<Rc<MetalBackend>, MetalError> {
monitor.add_match_subsystem_devtype(Some("drm"), None)?; monitor.add_match_subsystem_devtype(Some("drm"), None)?;
monitor.enable_receiving()?; monitor.enable_receiving()?;
let libinput = Rc::new(LibInput::new(device_holder.clone())?); let libinput = Rc::new(LibInput::new(device_holder.clone())?);
let monitor_fd = dup_async_fd(&state, monitor.fd())?; let monitor_fd = dup_fd(monitor.fd())?;
let libinput_fd = dup_async_fd(&state, libinput.fd())?; let libinput_fd = dup_fd(libinput.fd())?;
let metal = Rc::new(MetalBackend { let metal = Rc::new(MetalBackend {
state: state.clone(), state: state.clone(),
udev, udev,

View file

@ -1,6 +1,5 @@
use { use {
crate::{ crate::{
async_engine::FdStatus,
backend::{AxisSource, InputEvent, KeyState, ScrollAxis}, backend::{AxisSource, InputEvent, KeyState, ScrollAxis},
backends::metal::MetalBackend, backends::metal::MetalBackend,
fixed::Fixed, fixed::Fixed,
@ -12,9 +11,10 @@ use {
}, },
event::LibInputEvent, event::LibInputEvent,
}, },
utils::errorfmt::ErrorFmt, utils::{bitflags::BitflagsExt, errorfmt::ErrorFmt},
}, },
std::rc::Rc, std::rc::Rc,
uapi::c,
}; };
macro_rules! unpack { macro_rules! unpack {
@ -49,7 +49,7 @@ macro_rules! unpack {
impl MetalBackend { impl MetalBackend {
pub async fn handle_libinput_events(self: Rc<Self>) { pub async fn handle_libinput_events(self: Rc<Self>) {
loop { loop {
match self.libinput_fd.readable().await { match self.state.ring.readable(&self.libinput_fd).await {
Err(e) => { Err(e) => {
log::error!( log::error!(
"Cannot wait for libinput fd to become readable: {}", "Cannot wait for libinput fd to become readable: {}",
@ -57,7 +57,7 @@ impl MetalBackend {
); );
break; break;
} }
Ok(FdStatus::Err) => { Ok(n) if n.intersects(c::POLLERR | c::POLLHUP) => {
log::error!("libinput fd fd is in an error state"); log::error!("libinput fd fd is in an error state");
break; break;
} }

View file

@ -1,6 +1,5 @@
use { use {
crate::{ crate::{
async_engine::FdStatus,
backend::BackendEvent, backend::BackendEvent,
backends::metal::{ backends::metal::{
video::{MetalDrmDevice, PendingDrmDevice}, video::{MetalDrmDevice, PendingDrmDevice},
@ -8,7 +7,7 @@ use {
}, },
dbus::TRUE, dbus::TRUE,
udev::UdevDevice, udev::UdevDevice,
utils::{errorfmt::ErrorFmt, nonblock::set_nonblock}, utils::{bitflags::BitflagsExt, errorfmt::ErrorFmt, nonblock::set_nonblock},
video::drm::DrmMaster, video::drm::DrmMaster,
wire_dbus::org::freedesktop::login1::session::{PauseDevice, ResumeDevice}, wire_dbus::org::freedesktop::login1::session::{PauseDevice, ResumeDevice},
}, },
@ -33,7 +32,7 @@ fn is_primary_node(n: &[u8]) -> bool {
impl MetalBackend { impl MetalBackend {
pub async fn monitor_devices(self: Rc<Self>) { pub async fn monitor_devices(self: Rc<Self>) {
loop { loop {
match self.monitor_fd.readable().await { match self.state.ring.readable(&self.monitor_fd).await {
Err(e) => { Err(e) => {
log::error!( log::error!(
"Cannot wait for udev_monitor to become readable: {}", "Cannot wait for udev_monitor to become readable: {}",
@ -41,7 +40,7 @@ impl MetalBackend {
); );
break; break;
} }
Ok(FdStatus::Err) => { Ok(n) if n.intersects(c::POLLERR | c::POLLHUP) => {
log::error!("udev_monitor fd is in an error state"); log::error!("udev_monitor fd is in an error state");
break; break;
} }

View file

@ -1,6 +1,6 @@
use { use {
crate::{ crate::{
async_engine::{AsyncFd, Phase, SpawnedFuture}, async_engine::{Phase, SpawnedFuture},
backend::{ backend::{
BackendDrmDevice, BackendEvent, Connector, ConnectorEvent, ConnectorId, BackendDrmDevice, BackendEvent, Connector, ConnectorEvent, ConnectorId,
ConnectorKernelId, DrmDeviceId, MonitorInfo, ConnectorKernelId, DrmDeviceId, MonitorInfo,
@ -64,7 +64,6 @@ pub struct MetalDrmDeviceStatic {
pub min_height: u32, pub min_height: u32,
pub max_height: u32, pub max_height: u32,
pub gbm: GbmDevice, pub gbm: GbmDevice,
pub async_fd: AsyncFd,
pub handle_events: HandleEvents, pub handle_events: HandleEvents,
} }
@ -749,10 +748,6 @@ impl MetalBackend {
Ok(g) => g, Ok(g) => g,
Err(e) => return Err(MetalError::GbmDevice(e)), Err(e) => return Err(MetalError::GbmDevice(e)),
}; };
let async_fd = match self.state.eng.fd(master.fd()) {
Ok(f) => f,
Err(e) => return Err(MetalError::CreateAsyncFd(e)),
};
let dev = Rc::new(MetalDrmDeviceStatic { let dev = Rc::new(MetalDrmDeviceStatic {
id: pending.id, id: pending.id,
@ -767,7 +762,6 @@ impl MetalBackend {
min_height: resources.min_height, min_height: resources.min_height,
max_height: resources.max_height, max_height: resources.max_height,
gbm, gbm,
async_fd,
handle_events: HandleEvents { handle_events: HandleEvents {
handle_events: Cell::new(None), handle_events: Cell::new(None),
}, },
@ -883,7 +877,7 @@ impl MetalBackend {
async fn handle_drm_events(self: Rc<Self>, dev: Rc<MetalDrmDevice>) { async fn handle_drm_events(self: Rc<Self>, dev: Rc<MetalDrmDevice>) {
loop { loop {
if let Err(e) = dev.dev.async_fd.readable().await { if let Err(e) = self.state.ring.readable(dev.dev.master.fd()).await {
log::error!("Could not register the DRM fd for reading: {}", ErrorFmt(e)); log::error!("Could not register the DRM fd for reading: {}", ErrorFmt(e));
break; break;
} }

View file

@ -116,7 +116,7 @@ pub enum XBackendError {
} }
pub async fn create(state: &Rc<State>) -> Result<Rc<XBackend>, XBackendError> { pub async fn create(state: &Rc<State>) -> Result<Rc<XBackend>, XBackendError> {
let c = match Xcon::connect(state.eng.clone()).await { let c = match Xcon::connect(state).await {
Ok(c) => c, Ok(c) => c,
Err(e) => return Err(XBackendError::CannotConnect(e)), Err(e) => return Err(XBackendError::CannotConnect(e)),
}; };

View file

@ -1,7 +1,7 @@
pub use error::{ClientError, MethodError, ObjectError}; pub use error::{ClientError, MethodError, ObjectError};
use { use {
crate::{ crate::{
async_engine::{AsyncFd, SpawnedFuture}, async_engine::SpawnedFuture,
client::{error::LookupError, objects::Objects}, client::{error::LookupError, objects::Objects},
ifs::{wl_display::WlDisplay, wl_registry::WlRegistry}, ifs::{wl_display::WlDisplay, wl_registry::WlRegistry},
leaks::Tracker, leaks::Tracker,
@ -130,7 +130,7 @@ impl Clients {
id, id,
state: global.clone(), state: global.clone(),
checking_queue_size: Cell::new(false), checking_queue_size: Cell::new(false),
socket: global.eng.fd(&Rc::new(socket))?, socket: Rc::new(socket),
objects: Objects::new(), objects: Objects::new(),
swapchain: Default::default(), swapchain: Default::default(),
flush_request: Default::default(), flush_request: Default::default(),
@ -236,7 +236,7 @@ pub struct Client {
pub id: ClientId, pub id: ClientId,
pub state: Rc<State>, pub state: Rc<State>,
checking_queue_size: Cell<bool>, checking_queue_size: Cell<bool>,
socket: AsyncFd, socket: Rc<OwnedFd>,
pub objects: Objects, pub objects: Objects,
swapchain: Rc<RefCell<OutBufferSwapchain>>, swapchain: Rc<RefCell<OutBufferSwapchain>>,
flush_request: AsyncEvent, flush_request: AsyncEvent,

View file

@ -37,7 +37,7 @@ pub async fn client(data: Rc<Client>) {
async fn receive(data: Rc<Client>) { async fn receive(data: Rc<Client>) {
let display = data.display().unwrap(); let display = data.display().unwrap();
let recv = async { let recv = async {
let mut buf = BufFdIn::new(data.socket.clone()); let mut buf = BufFdIn::new(&data.socket, &data.state.ring);
let mut data_buf = Vec::<u32>::new(); let mut data_buf = Vec::<u32>::new();
loop { loop {
let mut hdr = [0u32, 0]; let mut hdr = [0u32, 0];
@ -100,7 +100,7 @@ async fn receive(data: Rc<Client>) {
async fn send(data: Rc<Client>) { async fn send(data: Rc<Client>) {
let send = async { let send = async {
let mut out = BufFdOut::new(data.socket.clone(), &data.state.wheel); let mut out = BufFdOut::new(&data.socket, &data.state.ring, &data.state.wheel);
let mut buffers = VecDeque::new(); let mut buffers = VecDeque::new();
loop { loop {
data.flush_request.triggered().await; data.flush_request.triggered().await;

View file

@ -119,8 +119,8 @@ fn start_compositor2(
let xkb_ctx = XkbContext::new().unwrap(); let xkb_ctx = XkbContext::new().unwrap();
let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap(); let xkb_keymap = xkb_ctx.keymap_from_str(include_str!("keymap.xkb")).unwrap();
let engine = AsyncEngine::install(&el)?; let engine = AsyncEngine::install(&el)?;
let wheel = Wheel::new(&engine)?; let ring = IoUring::new(&engine, 32)?;
let io_uring = IoUring::new(&engine, 32)?; let wheel = Wheel::new(&engine, &ring)?;
let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine); let (_run_toplevel_future, run_toplevel) = RunToplevel::install(&engine);
let node_ids = NodeIds::default(); let node_ids = NodeIds::default();
let state = Rc::new(State { let state = Rc::new(State {
@ -156,7 +156,7 @@ fn start_compositor2(
pending_container_render_data: Default::default(), pending_container_render_data: Default::default(),
pending_float_layout: Default::default(), pending_float_layout: Default::default(),
pending_float_titles: Default::default(), pending_float_titles: Default::default(),
dbus: Dbus::new(&engine, &run_toplevel), dbus: Dbus::new(&engine, &ring, &run_toplevel),
fdcloser: FdCloser::new(), fdcloser: FdCloser::new(),
logger, logger,
connectors: Default::default(), connectors: Default::default(),
@ -186,7 +186,7 @@ fn start_compositor2(
tracker: Default::default(), tracker: Default::default(),
data_offer_ids: Default::default(), data_offer_ids: Default::default(),
drm_dev_ids: Default::default(), drm_dev_ids: Default::default(),
io_uring, ring,
}); });
state.tracker.register(ClientId::from_raw(0)); state.tracker.register(ClientId::from_raw(0));
create_dummy_output(&state); create_dummy_output(&state);

View file

@ -1,16 +1,18 @@
pub use types::*; pub use types::*;
use { use {
crate::{ crate::{
async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture}, async_engine::{AsyncEngine, SpawnedFuture},
dbus::{ dbus::{
property::GetReply, property::GetReply,
types::{ObjectPath, Signature, Variant}, types::{ObjectPath, Signature, Variant},
}, },
io_uring::{IoUring, IoUringError},
utils::{ utils::{
bufio::{BufIo, BufIoError}, bufio::{BufIo, BufIoError},
clonecell::CloneCell, clonecell::CloneCell,
copyhashmap::CopyHashMap, copyhashmap::CopyHashMap,
numcell::NumCell, numcell::NumCell,
oserror::OsError,
run_toplevel::RunToplevel, run_toplevel::RunToplevel,
vecstorage::VecStorage, vecstorage::VecStorage,
xrd::{xrd, XRD}, xrd::{xrd, XRD},
@ -78,15 +80,15 @@ pub enum DbusError {
#[error("Variant has an invalid type")] #[error("Variant has an invalid type")]
InvalidVariantType, InvalidVariantType,
#[error("Could not create a socket")] #[error("Could not create a socket")]
Socket(#[source] crate::utils::oserror::OsError), Socket(#[source] OsError),
#[error("Could not connect")] #[error("Could not connect")]
Connect(#[source] crate::utils::oserror::OsError), Connect(#[source] OsError),
#[error("Could not write to the dbus socket")] #[error("Could not write to the dbus socket")]
WriteError(#[source] crate::utils::oserror::OsError), WriteError(#[source] OsError),
#[error("Could not read from the dbus socket")] #[error("Could not read from the dbus socket")]
ReadError(#[source] crate::utils::oserror::OsError), ReadError(#[source] OsError),
#[error("timeout")] #[error("timeout")]
AsyncError(#[source] Box<AsyncError>), IoUringError(#[source] Box<IoUringError>),
#[error("Server did not accept our authentication")] #[error("Server did not accept our authentication")]
Auth, Auth,
#[error("Array length is not a multiple of the element size")] #[error("Array length is not a multiple of the element size")]
@ -126,17 +128,18 @@ pub enum DbusError {
#[error(transparent)] #[error(transparent)]
DbusError(Rc<DbusError>), DbusError(Rc<DbusError>),
} }
efrom!(DbusError, AsyncError); efrom!(DbusError, IoUringError);
pub struct Dbus { pub struct Dbus {
eng: Rc<AsyncEngine>, eng: Rc<AsyncEngine>,
ring: Rc<IoUring>,
system: Rc<DbusHolder>, system: Rc<DbusHolder>,
session: Rc<DbusHolder>, session: Rc<DbusHolder>,
user_path: Option<String>, user_path: Option<String>,
} }
impl Dbus { impl Dbus {
pub fn new(eng: &Rc<AsyncEngine>, run_toplevel: &Rc<RunToplevel>) -> Self { pub fn new(eng: &Rc<AsyncEngine>, ring: &Rc<IoUring>, run_toplevel: &Rc<RunToplevel>) -> Self {
let user_path = match xrd() { let user_path = match xrd() {
Some(path) => Some(format!("{}/bus", path)), Some(path) => Some(format!("{}/bus", path)),
_ => { _ => {
@ -147,6 +150,7 @@ impl Dbus {
log::info!("dbus path = {:?}", user_path); log::info!("dbus path = {:?}", user_path);
Self { Self {
eng: eng.clone(), eng: eng.clone(),
ring: ring.clone(),
system: Rc::new(DbusHolder::new(run_toplevel)), system: Rc::new(DbusHolder::new(run_toplevel)),
session: Rc::new(DbusHolder::new(run_toplevel)), session: Rc::new(DbusHolder::new(run_toplevel)),
user_path, user_path,
@ -159,8 +163,12 @@ impl Dbus {
} }
pub fn system(&self) -> Result<Rc<DbusSocket>, DbusError> { pub fn system(&self) -> Result<Rc<DbusSocket>, DbusError> {
self.system self.system.get(
.get(&self.eng, "/var/run/dbus/system_bus_socket", "System bus") &self.eng,
&self.ring,
"/var/run/dbus/system_bus_socket",
"System bus",
)
} }
pub fn session(&self) -> Result<Rc<DbusSocket>, DbusError> { pub fn session(&self) -> Result<Rc<DbusSocket>, DbusError> {
@ -168,7 +176,7 @@ impl Dbus {
None => return Err(DbusError::SessionBusAddressNotSet), None => return Err(DbusError::SessionBusAddressNotSet),
Some(sba) => sba, Some(sba) => sba,
}; };
self.session.get(&self.eng, sba, "Session bus") self.session.get(&self.eng, &self.ring, sba, "Session bus")
} }
} }
@ -186,7 +194,8 @@ unsafe trait ReplyHandler {
pub struct DbusSocket { pub struct DbusSocket {
bus_name: &'static str, bus_name: &'static str,
fd: AsyncFd, fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
bufio: Rc<BufIo>, bufio: Rc<BufIo>,
eng: Rc<AsyncEngine>, eng: Rc<AsyncEngine>,
next_serial: NumCell<u32>, next_serial: NumCell<u32>,

View file

@ -86,7 +86,7 @@ impl Auth {
match uapi::read(self.socket.fd.raw(), &mut self.buf[..]) { match uapi::read(self.socket.fd.raw(), &mut self.buf[..]) {
Ok(n) => self.buf_stop = n.len(), Ok(n) => self.buf_stop = n.len(),
Err(Errno(c::EAGAIN)) => { Err(Errno(c::EAGAIN)) => {
self.socket.fd.readable().await?; self.socket.ring.readable(&self.socket.fd).await?;
} }
Err(e) => return Err(DbusError::ReadError(e.into())), Err(e) => return Err(DbusError::ReadError(e.into())),
} }
@ -99,7 +99,7 @@ impl Auth {
match uapi::write(self.socket.fd.raw(), &buf[start..]) { match uapi::write(self.socket.fd.raw(), &buf[start..]) {
Ok(n) => start += n, Ok(n) => start += n,
Err(Errno(c::EAGAIN)) => { Err(Errno(c::EAGAIN)) => {
self.socket.fd.writable().await?; self.socket.ring.writable(&self.socket.fd).await?;
} }
Err(e) => return Err(DbusError::WriteError(e.into())), Err(e) => return Err(DbusError::WriteError(e.into())),
} }

View file

@ -2,6 +2,7 @@ use {
crate::{ crate::{
async_engine::AsyncEngine, async_engine::AsyncEngine,
dbus::{auth::handle_auth, DbusError, DbusHolder, DbusSocket}, dbus::{auth::handle_auth, DbusError, DbusHolder, DbusSocket},
io_uring::IoUring,
utils::{bufio::BufIo, errorfmt::ErrorFmt, numcell::NumCell, run_toplevel::RunToplevel}, utils::{bufio::BufIo, errorfmt::ErrorFmt, numcell::NumCell, run_toplevel::RunToplevel},
wire_dbus::org, wire_dbus::org,
}, },
@ -13,6 +14,7 @@ impl DbusHolder {
pub(super) fn get( pub(super) fn get(
self: &Rc<Self>, self: &Rc<Self>,
eng: &Rc<AsyncEngine>, eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
addr: &str, addr: &str,
name: &'static str, name: &'static str,
) -> Result<Rc<DbusSocket>, DbusError> { ) -> Result<Rc<DbusSocket>, DbusError> {
@ -23,7 +25,7 @@ impl DbusHolder {
return Ok(c); return Ok(c);
} }
} }
let socket = connect(eng, addr, name, &self.run_toplevel)?; let socket = connect(eng, ring, addr, name, &self.run_toplevel)?;
self.socket.set(Some(socket.clone())); self.socket.set(Some(socket.clone()));
Ok(socket) Ok(socket)
} }
@ -31,6 +33,7 @@ impl DbusHolder {
fn connect( fn connect(
eng: &Rc<AsyncEngine>, eng: &Rc<AsyncEngine>,
ring: &Rc<IoUring>,
addr: &str, addr: &str,
name: &'static str, name: &'static str,
run_toplevel: &Rc<RunToplevel>, run_toplevel: &Rc<RunToplevel>,
@ -50,11 +53,12 @@ fn connect(
if let Err(e) = uapi::connect(socket.raw(), &sadr) { if let Err(e) = uapi::connect(socket.raw(), &sadr) {
return Err(DbusError::Connect(e.into())); return Err(DbusError::Connect(e.into()));
} }
let fd = eng.fd(&Rc::new(socket))?; let fd = Rc::new(socket);
let socket = Rc::new(DbusSocket { let socket = Rc::new(DbusSocket {
bus_name: name, bus_name: name,
fd: fd.clone(), fd: fd.clone(),
bufio: Rc::new(BufIo::new(fd)), ring: ring.clone(),
bufio: Rc::new(BufIo::new(&fd, ring)),
eng: eng.clone(), eng: eng.clone(),
next_serial: NumCell::new(1), next_serial: NumCell::new(1),
unique_name: Default::default(), unique_name: Default::default(),

View file

@ -3,13 +3,14 @@ mod io;
use { use {
crate::{ crate::{
async_engine::{AsyncEngine, AsyncFd, SpawnedFuture}, async_engine::{AsyncEngine, SpawnedFuture},
compositor::{DISPLAY, WAYLAND_DISPLAY}, compositor::{DISPLAY, WAYLAND_DISPLAY},
event_loop::EventLoop, event_loop::EventLoop,
forker::{ forker::{
clone3::{fork_with_pidfd, Forked}, clone3::{fork_with_pidfd, Forked},
io::{IoIn, IoOut}, io::{IoIn, IoOut},
}, },
io_uring::IoUring,
state::State, state::State,
utils::{ utils::{
buffd::BufFdError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, buffd::BufFdError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell,
@ -112,17 +113,13 @@ impl ForkerProxy {
pub fn install(self: &Rc<Self>, state: &Rc<State>) { pub fn install(self: &Rc<Self>, state: &Rc<State>) {
state.forker.set(Some(self.clone())); state.forker.set(Some(self.clone()));
let socket = state.eng.fd(&self.socket).unwrap();
self.task_proc.set(Some( self.task_proc.set(Some(
state.eng.spawn(self.clone().check_process(state.clone())), state.eng.spawn(self.clone().check_process(state.clone())),
)); ));
self.task_in self.task_in
.set(Some(state.eng.spawn(self.clone().incoming(socket.clone())))); .set(Some(state.eng.spawn(self.clone().incoming(state.clone()))));
self.task_out.set(Some( self.task_out
state .set(Some(state.eng.spawn(self.clone().outgoing(state.clone()))));
.eng
.spawn(self.clone().outgoing(state.clone(), socket.clone())),
));
} }
pub fn setenv(&self, key: &[u8], val: &[u8]) { pub fn setenv(&self, key: &[u8], val: &[u8]) {
@ -191,8 +188,8 @@ impl ForkerProxy {
}) })
} }
async fn incoming(self: Rc<Self>, socket: AsyncFd) { async fn incoming(self: Rc<Self>, state: Rc<State>) {
let mut io = IoIn::new(socket); let mut io = IoIn::new(&self.socket, &state.ring);
loop { loop {
let msg = match io.read_msg().await { let msg = match io.read_msg().await {
Ok(msg) => msg, Ok(msg) => msg,
@ -240,8 +237,8 @@ impl ForkerProxy {
log::log!(level, "{}", msg); log::log!(level, "{}", msg);
} }
async fn outgoing(self: Rc<Self>, state: Rc<State>, socket: AsyncFd) { async fn outgoing(self: Rc<Self>, state: Rc<State>) {
let mut io = IoOut::new(socket, &state.wheel); let mut io = IoOut::new(&self.socket, &state.ring, &state.wheel);
loop { loop {
let msg = self.outgoing.pop().await; let msg = self.outgoing.pop().await;
for fd in self.fds.borrow_mut().drain(..) { for fd in self.fds.borrow_mut().drain(..) {
@ -257,8 +254,7 @@ impl ForkerProxy {
} }
async fn check_process(self: Rc<Self>, state: Rc<State>) { async fn check_process(self: Rc<Self>, state: Rc<State>) {
let pidfd = state.eng.fd(&self.pidfd).unwrap(); if let Err(e) = state.ring.readable(&self.pidfd).await {
if let Err(e) = pidfd.readable().await {
log::error!( log::error!(
"Cannot wait for the forker pidfd to become readable: {}", "Cannot wait for the forker pidfd to become readable: {}",
ErrorFmt(e) ErrorFmt(e)
@ -303,8 +299,9 @@ enum ForkerMessage {
} }
struct Forker { struct Forker {
socket: AsyncFd, socket: Rc<OwnedFd>,
ae: Rc<AsyncEngine>, ae: Rc<AsyncEngine>,
ring: Rc<IoUring>,
wheel: Rc<Wheel>, wheel: Rc<Wheel>,
fds: RefCell<Vec<Rc<OwnedFd>>>, fds: RefCell<Vec<Rc<OwnedFd>>>,
outgoing: AsyncQueue<ForkerMessage>, outgoing: AsyncQueue<ForkerMessage>,
@ -333,10 +330,12 @@ impl Forker {
}); });
let el = EventLoop::new().unwrap(); let el = EventLoop::new().unwrap();
let ae = AsyncEngine::install(&el).unwrap(); let ae = AsyncEngine::install(&el).unwrap();
let wheel = Wheel::new(&ae).unwrap(); let ring = IoUring::new(&ae, 32).unwrap();
let wheel = Wheel::new(&ae, &ring).unwrap();
let forker = Rc::new(Forker { let forker = Rc::new(Forker {
socket: ae.fd(&socket).unwrap(), socket,
ae: ae.clone(), ae: ae.clone(),
ring,
wheel, wheel,
fds: RefCell::new(vec![]), fds: RefCell::new(vec![]),
outgoing: Default::default(), outgoing: Default::default(),
@ -349,7 +348,7 @@ impl Forker {
} }
async fn outgoing(self: Rc<Self>) { async fn outgoing(self: Rc<Self>) {
let mut io = IoOut::new(self.socket.clone(), &self.wheel); let mut io = IoOut::new(&self.socket, &self.ring, &self.wheel);
loop { loop {
let msg = self.outgoing.pop().await; let msg = self.outgoing.pop().await;
for fd in self.fds.borrow_mut().drain(..) { for fd in self.fds.borrow_mut().drain(..) {
@ -360,7 +359,7 @@ impl Forker {
} }
async fn incoming(self: Rc<Self>) { async fn incoming(self: Rc<Self>) {
let mut io = IoIn::new(self.socket.clone()); let mut io = IoIn::new(&self.socket, &self.ring);
loop { loop {
let msg = io.read_msg().await.unwrap(); let msg = io.read_msg().await.unwrap();
self.handle_msg(msg, &mut io); self.handle_msg(msg, &mut io);
@ -456,8 +455,8 @@ impl Forker {
drop(write); drop(write);
let slf = self.clone(); let slf = self.clone();
let spawn = self.ae.spawn(async move { let spawn = self.ae.spawn(async move {
let read = slf.ae.fd(&Rc::new(read)).unwrap(); let read = Rc::new(read);
if let Err(e) = read.readable().await { if let Err(e) = slf.ring.readable(&read).await {
log::error!( log::error!(
"Cannot wait for the child fd to become readable: {}", "Cannot wait for the child fd to become readable: {}",
ErrorFmt(e) ErrorFmt(e)

View file

@ -5,8 +5,8 @@ use {
use { use {
crate::{ crate::{
async_engine::AsyncFd,
forker::ForkerError, forker::ForkerError,
io_uring::IoUring,
utils::{ utils::{
buffd::{BufFdIn, BufFdOut}, buffd::{BufFdIn, BufFdOut},
vec_ext::VecExt, vec_ext::VecExt,
@ -23,9 +23,9 @@ pub struct IoIn {
} }
impl IoIn { impl IoIn {
pub fn new(fd: AsyncFd) -> Self { pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>) -> Self {
Self { Self {
incoming: BufFdIn::new(fd), incoming: BufFdIn::new(fd, ring),
scratch: vec![], scratch: vec![],
} }
} }
@ -63,9 +63,9 @@ pub struct IoOut {
} }
impl IoOut { impl IoOut {
pub fn new(fd: AsyncFd, wheel: &Rc<Wheel>) -> Self { pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>, wheel: &Rc<Wheel>) -> Self {
Self { Self {
outgoing: BufFdOut::new(fd, wheel), outgoing: BufFdOut::new(fd, ring, wheel),
scratch: vec![], scratch: vec![],
fds: vec![], fds: vec![],
} }

View file

@ -2,7 +2,7 @@ use {
crate::{ crate::{
async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture}, async_engine::{AsyncEngine, AsyncError, AsyncFd, FdStatus, Phase, SpawnedFuture},
io_uring::{ io_uring::{
ops::{async_cancel::AsyncCancelTask, write::WriteTask}, ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask},
pending_result::PendingResults, pending_result::PendingResults,
sys::{ sys::{
io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe, io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe,
@ -46,6 +46,7 @@ macro_rules! map_err {
} }
}}; }};
} }
pub use ops::TaskResultExt;
mod ops; mod ops;
mod pending_result; mod pending_result;
@ -82,7 +83,7 @@ impl Drop for IoUring {
} }
impl IoUring { impl IoUring {
pub fn new(eng: &Rc<AsyncEngine>, entries: u32) -> Result<Self, IoUringError> { pub fn new(eng: &Rc<AsyncEngine>, entries: u32) -> Result<Rc<Self>, IoUringError> {
let mut params = io_uring_params::default(); let mut params = io_uring_params::default();
let fd = match io_uring_setup(entries, &mut params) { let fd = match io_uring_setup(entries, &mut params) {
Ok(f) => Rc::new(f), Ok(f) => Rc::new(f),
@ -196,6 +197,7 @@ impl IoUring {
pending_results: Default::default(), pending_results: Default::default(),
cached_writes: Default::default(), cached_writes: Default::default(),
cached_cancels: Default::default(), cached_cancels: Default::default(),
cached_polls: Default::default(),
reader: Cell::new(None), reader: Cell::new(None),
submitter: Cell::new(None), submitter: Cell::new(None),
}); });
@ -203,7 +205,7 @@ impl IoUring {
let reader = eng.spawn(data.clone().reader()); let reader = eng.spawn(data.clone().reader());
data.reader.set(Some(reader)); data.reader.set(Some(reader));
data.submitter.set(Some(submitter)); data.submitter.set(Some(submitter));
Ok(Self { ring: data }) Ok(Rc::new(Self { ring: data }))
} }
} }
@ -237,6 +239,7 @@ struct IoUringData {
pending_results: PendingResults, pending_results: PendingResults,
cached_writes: Stack<Box<WriteTask>>, cached_writes: Stack<Box<WriteTask>>,
cached_cancels: Stack<Box<AsyncCancelTask>>, cached_cancels: Stack<Box<AsyncCancelTask>>,
cached_polls: Stack<Box<PollTask>>,
reader: Cell<Option<SpawnedFuture<()>>>, reader: Cell<Option<SpawnedFuture<()>>>,
submitter: Cell<Option<SpawnedFuture<()>>>, submitter: Cell<Option<SpawnedFuture<()>>>,
@ -313,6 +316,7 @@ impl IoUringData {
let idx = (tail & self.sqmask) as usize; let idx = (tail & self.sqmask) as usize;
let mut sqe = self.sqesmap.deref()[idx].get().deref_mut(); let mut sqe = self.sqesmap.deref()[idx].get().deref_mut();
self.sqmap.deref()[idx].set(idx as _); self.sqmap.deref()[idx].set(idx as _);
*sqe = Default::default();
sqe.user_data = id; sqe.user_data = id;
task.encode(sqe); task.encode(sqe);
tail = tail.wrapping_add(1); tail = tail.wrapping_add(1);

View file

@ -1,2 +1,21 @@
use crate::{io_uring::IoUringError, utils::oserror::OsError};
pub mod async_cancel; pub mod async_cancel;
pub mod poll;
pub mod write; pub mod write;
pub type TaskResult<T> = Result<Result<T, OsError>, IoUringError>;
pub trait TaskResultExt<T> {
fn merge(self) -> Result<T, IoUringError>;
}
impl<T> TaskResultExt<T> for TaskResult<T> {
fn merge(self) -> Result<T, IoUringError> {
match self {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(IoUringError::OsError(e)),
Err(e) => Err(e),
}
}
}

74
src/io_uring/ops/poll.rs Normal file
View file

@ -0,0 +1,74 @@
use {
crate::io_uring::{
ops::TaskResult,
pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_POLL_ADD},
IoUring, IoUringData, IoUringError, Task, TaskResultExt,
},
std::{
cell::{Cell, RefCell},
rc::Rc,
},
uapi::{c, OwnedFd},
};
impl IoUring {
pub async fn poll(&self, fd: &Rc<OwnedFd>, events: c::c_short) -> TaskResult<c::c_short> {
self.ring.check_destroyed()?;
let id = self.ring.id();
let pr = self.ring.pending_results.acquire();
{
let pw = self.ring.cached_polls.pop().unwrap_or_default();
pw.id.set(id.id);
*pw.data.borrow_mut() = Some(Data {
pr: pr.clone(),
fd: fd.clone(),
events: events as _,
});
self.ring.schedule(pw);
}
Ok(pr.await.map(|v| v as c::c_short))
}
pub async fn readable(&self, fd: &Rc<OwnedFd>) -> Result<c::c_short, IoUringError> {
self.poll(fd, c::POLLIN).await.merge()
}
pub async fn writable(&self, fd: &Rc<OwnedFd>) -> Result<c::c_short, IoUringError> {
self.poll(fd, c::POLLOUT).await.merge()
}
}
struct Data {
pr: PendingResult,
fd: Rc<OwnedFd>,
events: u16,
}
#[derive(Default)]
pub struct PollTask {
id: Cell<u64>,
data: RefCell<Option<Data>>,
}
unsafe impl Task for PollTask {
fn id(&self) -> u64 {
self.id.get()
}
fn complete(self: Box<Self>, ring: &IoUringData, res: i32) {
let data = self.data.borrow_mut().take();
if let Some(data) = data {
data.pr.complete(res);
}
ring.cached_polls.push(self);
}
fn encode(&self, sqe: &mut io_uring_sqe) {
let data = self.data.borrow_mut();
let data = data.as_ref().unwrap();
sqe.opcode = IORING_OP_POLL_ADD;
sqe.fd = data.fd.raw();
sqe.u3.poll_events = data.events;
}
}

View file

@ -1,8 +1,9 @@
use { use {
crate::io_uring::{ crate::io_uring::{
ops::TaskResult,
pending_result::PendingResult, pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_WRITE}, sys::{io_uring_sqe, IORING_OP_WRITE},
IoUring, IoUringData, IoUringError, Task, IoUring, IoUringData, Task,
}, },
std::{ std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
@ -19,7 +20,7 @@ impl IoUring {
buf: &Rc<Vec<u8>>, buf: &Rc<Vec<u8>>,
offset: usize, offset: usize,
n: usize, n: usize,
) -> Result<usize, IoUringError> { ) -> TaskResult<usize> {
self.ring.check_destroyed()?; self.ring.check_destroyed()?;
let id = self.ring.id(); let id = self.ring.id();
let pr = self.ring.pending_results.acquire(); let pr = self.ring.pending_results.acquire();
@ -39,7 +40,7 @@ impl IoUring {
}); });
self.ring.schedule(pw); self.ring.schedule(pw);
} }
Ok(pr.await? as usize) Ok(pr.await.map(|v| v as usize))
} }
} }

View file

@ -1,8 +1,5 @@
use { use {
crate::{ crate::utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack},
io_uring::IoUringError,
utils::{numcell::NumCell, oserror::OsError, ptr_ext::PtrExt, stack::Stack},
},
std::{ std::{
cell::Cell, cell::Cell,
future::Future, future::Future,
@ -104,13 +101,13 @@ impl Clone for PendingResult {
} }
impl Future for PendingResult { impl Future for PendingResult {
type Output = Result<i32, IoUringError>; type Output = Result<i32, OsError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pr = unsafe { self.pr.deref() }; let pr = unsafe { self.pr.deref() };
if let Some(res) = pr.res.take() { if let Some(res) = pr.res.take() {
let res = if res < 0 { let res = if res < 0 {
Err(IoUringError::OsError(OsError::from(-res as c::c_int))) Err(OsError::from(-res as c::c_int))
} else { } else {
Ok(res) Ok(res)
}; };

View file

@ -2,6 +2,7 @@
use { use {
crate::utils::oserror::OsError, crate::utils::oserror::OsError,
std::mem::MaybeUninit,
uapi::{c, OwnedFd}, uapi::{c, OwnedFd},
}; };
@ -23,6 +24,12 @@ pub struct io_uring_sqe {
pub __pad2: [u64; 2], pub __pad2: [u64; 2],
} }
impl Default for io_uring_sqe {
fn default() -> Self {
unsafe { MaybeUninit::zeroed().assume_init() }
}
}
#[repr(C)] #[repr(C)]
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub union io_uring_sqe_union1 { pub union io_uring_sqe_union1 {

View file

@ -1,6 +1,6 @@
use { use {
crate::{ crate::{
async_engine::{AsyncFd, SpawnedFuture}, async_engine::SpawnedFuture,
client::{Client, ClientId, EventFormatter}, client::{Client, ClientId, EventFormatter},
it::{ it::{
test_error::{StdError, TestError}, test_error::{StdError, TestError},
@ -28,11 +28,12 @@ use {
rc::Rc, rc::Rc,
task::Poll, task::Poll,
}, },
uapi::OwnedFd,
}; };
pub struct TestTransport { pub struct TestTransport {
pub run: Rc<TestRun>, pub run: Rc<TestRun>,
pub fd: AsyncFd, pub socket: Rc<OwnedFd>,
pub client_id: Cell<ClientId>, pub client_id: Cell<ClientId>,
pub bufs: Stack<Vec<u32>>, pub bufs: Stack<Vec<u32>>,
pub swapchain: Rc<RefCell<OutBufferSwapchain>>, pub swapchain: Rc<RefCell<OutBufferSwapchain>>,
@ -132,7 +133,7 @@ impl TestTransport {
self.run.state.eng.spawn( self.run.state.eng.spawn(
Incoming { Incoming {
tc: self.clone(), tc: self.clone(),
buf: BufFdIn::new(self.fd.clone()), buf: BufFdIn::new(&self.socket, &self.run.state.ring),
} }
.run(), .run(),
), ),
@ -141,7 +142,7 @@ impl TestTransport {
self.run.state.eng.spawn( self.run.state.eng.spawn(
Outgoing { Outgoing {
tc: self.clone(), tc: self.clone(),
buf: BufFdOut::new(self.fd.clone(), &self.run.state.wheel), buf: BufFdOut::new(&self.socket, &self.run.state.ring, &self.run.state.wheel),
buffers: Default::default(), buffers: Default::default(),
} }
.run(), .run(),

View file

@ -51,17 +51,12 @@ impl TestRun {
uapi::connect(socket.raw(), &self.server_addr) uapi::connect(socket.raw(), &self.server_addr)
.to_os_error() .to_os_error()
.with_context(|| "Could not connect to the compositor")?; .with_context(|| "Could not connect to the compositor")?;
let fd = self
.state
.eng
.fd(&socket)
.with_context(|| "Could not create an async fd")?;
let mut obj_ids = Bitfield::default(); let mut obj_ids = Bitfield::default();
obj_ids.take(0); obj_ids.take(0);
obj_ids.take(1); obj_ids.take(1);
let tran = Rc::new(TestTransport { let tran = Rc::new(TestTransport {
run: self.clone(), run: self.clone(),
fd, socket,
client_id: Cell::new(ClientId::from_raw(0)), client_id: Cell::new(ClientId::from_raw(0)),
bufs: Default::default(), bufs: Default::default(),
swapchain: Default::default(), swapchain: Default::default(),

View file

@ -109,7 +109,7 @@ pub struct State {
pub config_file_id: NumCell<u64>, pub config_file_id: NumCell<u64>,
pub tracker: Tracker<Self>, pub tracker: Tracker<Self>,
pub data_offer_ids: NumCell<u64>, pub data_offer_ids: NumCell<u64>,
pub io_uring: IoUring, pub ring: Rc<IoUring>,
} }
// impl Drop for State { // impl Drop for State {

View file

@ -4,6 +4,7 @@ use {
client::{EventFormatter, RequestParser}, client::{EventFormatter, RequestParser},
compositor::WAYLAND_DISPLAY, compositor::WAYLAND_DISPLAY,
event_loop::{EventLoop, EventLoopError}, event_loop::{EventLoop, EventLoopError},
io_uring::{IoUring, IoUringError},
logger::Logger, logger::Logger,
object::{ObjectId, WL_DISPLAY_ID}, object::{ObjectId, WL_DISPLAY_ID},
utils::{ utils::{
@ -49,6 +50,8 @@ pub enum ToolClientError {
CreateWheel(#[source] WheelError), CreateWheel(#[source] WheelError),
#[error("Could not create an async engine")] #[error("Could not create an async engine")]
CreateEngine(#[source] AsyncError), CreateEngine(#[source] AsyncError),
#[error("Could not create an io-uring")]
CreateRing(#[source] IoUringError),
#[error("XDG_RUNTIME_DIR is not set")] #[error("XDG_RUNTIME_DIR is not set")]
XrdNotSet, XrdNotSet,
#[error("WAYLAND_DISPLAY is not set")] #[error("WAYLAND_DISPLAY is not set")]
@ -59,8 +62,6 @@ pub enum ToolClientError {
SocketPathTooLong, SocketPathTooLong,
#[error("Could not connect to the compositor")] #[error("Could not connect to the compositor")]
Connect(#[source] OsError), Connect(#[source] OsError),
#[error("Could not create an async fd")]
AsyncFd(#[source] AsyncError),
#[error("The message length is smaller than 8 bytes")] #[error("The message length is smaller than 8 bytes")]
MsgLenTooSmall, MsgLenTooSmall,
#[error("The size of the message is not a multiple of 4")] #[error("The size of the message is not a multiple of 4")]
@ -78,6 +79,7 @@ pub enum ToolClientError {
pub struct ToolClient { pub struct ToolClient {
pub logger: Arc<Logger>, pub logger: Arc<Logger>,
pub el: Rc<EventLoop>, pub el: Rc<EventLoop>,
pub ring: Rc<IoUring>,
pub wheel: Rc<Wheel>, pub wheel: Rc<Wheel>,
pub eng: Rc<AsyncEngine>, pub eng: Rc<AsyncEngine>,
obj_ids: RefCell<Bitfield>, obj_ids: RefCell<Bitfield>,
@ -131,7 +133,11 @@ impl ToolClient {
Ok(e) => e, Ok(e) => e,
Err(e) => return Err(ToolClientError::CreateEngine(e)), Err(e) => return Err(ToolClientError::CreateEngine(e)),
}; };
let wheel = match Wheel::new(&eng) { let ring = match IoUring::new(&eng, 32) {
Ok(e) => e,
Err(e) => return Err(ToolClientError::CreateRing(e)),
};
let wheel = match Wheel::new(&eng, &ring) {
Ok(w) => w, Ok(w) => w,
Err(e) => return Err(ToolClientError::CreateWheel(e)), Err(e) => return Err(ToolClientError::CreateWheel(e)),
}; };
@ -163,16 +169,13 @@ impl ToolClient {
if let Err(e) = uapi::connect(socket.raw(), &addr) { if let Err(e) = uapi::connect(socket.raw(), &addr) {
return Err(ToolClientError::Connect(e.into())); return Err(ToolClientError::Connect(e.into()));
} }
let fd = match eng.fd(&socket) {
Ok(fd) => fd,
Err(e) => return Err(ToolClientError::AsyncFd(e)),
};
let mut obj_ids = Bitfield::default(); let mut obj_ids = Bitfield::default();
obj_ids.take(0); obj_ids.take(0);
obj_ids.take(1); obj_ids.take(1);
let slf = Rc::new(Self { let slf = Rc::new(Self {
logger, logger,
el, el,
ring,
wheel, wheel,
eng, eng,
obj_ids: RefCell::new(obj_ids), obj_ids: RefCell::new(obj_ids),
@ -197,7 +200,7 @@ impl ToolClient {
slf.eng.spawn( slf.eng.spawn(
Incoming { Incoming {
tc: slf.clone(), tc: slf.clone(),
buf: BufFdIn::new(fd.clone()), buf: BufFdIn::new(&socket, &slf.ring),
} }
.run(), .run(),
), ),
@ -206,7 +209,7 @@ impl ToolClient {
slf.eng.spawn( slf.eng.spawn(
Outgoing { Outgoing {
tc: slf.clone(), tc: slf.clone(),
buf: BufFdOut::new(fd.clone(), &slf.wheel), buf: BufFdOut::new(&socket, &slf.ring, &slf.wheel),
buffers: Default::default(), buffers: Default::default(),
} }
.run(), .run(),

View file

@ -1,4 +1,7 @@
use {crate::async_engine::AsyncError, thiserror::Error}; use {
crate::{io_uring::IoUringError, utils::oserror::OsError},
thiserror::Error,
};
pub use { pub use {
buf_in::BufFdIn, buf_in::BufFdIn,
buf_out::{BufFdOut, OutBuffer, OutBufferSwapchain}, buf_out::{BufFdOut, OutBuffer, OutBufferSwapchain},
@ -14,9 +17,9 @@ mod parser;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum BufFdError { pub enum BufFdError {
#[error("An IO error occurred")] #[error("An IO error occurred")]
Io(#[source] crate::utils::oserror::OsError), Io(#[source] OsError),
#[error("An async error occurred")] #[error("An io-uring error occurred")]
Async(#[from] AsyncError), Ring(#[from] IoUringError),
#[error("The peer did not send a file descriptor")] #[error("The peer did not send a file descriptor")]
NoFd, NoFd,
#[error("The peer sent too many file descriptors")] #[error("The peer sent too many file descriptors")]

View file

@ -1,14 +1,15 @@
use { use {
crate::{ crate::{
async_engine::AsyncFd, io_uring::IoUring,
utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE, MAX_IN_FD}, utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE, MAX_IN_FD},
}, },
std::{collections::VecDeque, mem::MaybeUninit}, std::{collections::VecDeque, mem::MaybeUninit, rc::Rc},
uapi::{c, Errno, OwnedFd, Pod}, uapi::{c, Errno, OwnedFd, Pod},
}; };
pub struct BufFdIn { pub struct BufFdIn {
fd: AsyncFd, fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
in_fd: VecDeque<OwnedFd>, in_fd: VecDeque<OwnedFd>,
@ -19,9 +20,10 @@ pub struct BufFdIn {
} }
impl BufFdIn { impl BufFdIn {
pub fn new(fd: AsyncFd) -> Self { pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>) -> Self {
Self { Self {
fd, fd: fd.clone(),
ring: ring.clone(),
in_fd: Default::default(), in_fd: Default::default(),
in_buf: Box::new([MaybeUninit::uninit(); BUF_SIZE]), in_buf: Box::new([MaybeUninit::uninit(); BUF_SIZE]),
in_cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]), in_cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]),
@ -35,7 +37,7 @@ impl BufFdIn {
let mut offset = 0; let mut offset = 0;
while offset < bytes.len() { while offset < bytes.len() {
if self.read_full_(bytes, &mut offset)? { if self.read_full_(bytes, &mut offset)? {
self.fd.readable().await?; self.ring.readable(&self.fd).await?;
} }
} }
Ok(()) Ok(())

View file

@ -1,6 +1,6 @@
use { use {
crate::{ crate::{
async_engine::AsyncFd, io_uring::IoUring,
utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE}, utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE},
wheel::{Wheel, WheelTimeoutFuture}, wheel::{Wheel, WheelTimeoutFuture},
}, },
@ -79,16 +79,18 @@ impl OutBufferSwapchain {
} }
pub struct BufFdOut { pub struct BufFdOut {
fd: AsyncFd, fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
wheel: Rc<Wheel>, wheel: Rc<Wheel>,
cmsg_buf: Box<[MaybeUninit<u8>; CMSG_BUF_SIZE]>, cmsg_buf: Box<[MaybeUninit<u8>; CMSG_BUF_SIZE]>,
fd_ids: Vec<i32>, fd_ids: Vec<i32>,
} }
impl BufFdOut { impl BufFdOut {
pub fn new(fd: AsyncFd, wheel: &Rc<Wheel>) -> Self { pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>, wheel: &Rc<Wheel>) -> Self {
Self { Self {
fd, fd: fd.clone(),
ring: ring.clone(),
wheel: wheel.clone(), wheel: wheel.clone(),
cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]), cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]),
fd_ids: vec![], fd_ids: vec![],
@ -109,7 +111,7 @@ impl BufFdOut {
_ = timeout.as_mut().unwrap() => { _ = timeout.as_mut().unwrap() => {
return Err(BufFdError::Timeout); return Err(BufFdError::Timeout);
}, },
res = self.fd.writable().fuse() => { res = self.ring.writable(&self.fd).fuse() => {
res?; res?;
}, },
} }
@ -123,7 +125,7 @@ impl BufFdOut {
pub async fn flush_no_timeout(&mut self, buf: &mut OutBuffer) -> Result<(), BufFdError> { pub async fn flush_no_timeout(&mut self, buf: &mut OutBuffer) -> Result<(), BufFdError> {
while buf.read_pos < buf.write_pos { while buf.read_pos < buf.write_pos {
if self.flush_sync(buf)? { if self.flush_sync(buf)? {
self.fd.writable().await?; let _ = self.ring.writable(&self.fd).await?;
} }
} }
buf.read_pos = 0; buf.read_pos = 0;
@ -186,7 +188,7 @@ impl BufFdOut {
let mut read_pos = 0; let mut read_pos = 0;
while read_pos < buf.len() { while read_pos < buf.len() {
if self.flush_sync2(&mut read_pos, buf, fds)? { if self.flush_sync2(&mut read_pos, buf, fds)? {
self.fd.writable().await?; self.ring.writable(&self.fd).await?;
} }
} }
Ok(()) Ok(())

View file

@ -1,6 +1,6 @@
use { use {
crate::{ crate::{
async_engine::{AsyncError, AsyncFd}, io_uring::{IoUring, IoUringError},
utils::{ utils::{
oserror::OsError, oserror::OsError,
queue::AsyncQueue, queue::AsyncQueue,
@ -26,9 +26,9 @@ pub enum BufIoError {
#[error("Could not read from the socket")] #[error("Could not read from the socket")]
ReadError(#[source] OsError), ReadError(#[source] OsError),
#[error("Cannot wait for fd to become writable")] #[error("Cannot wait for fd to become writable")]
Writable(#[source] AsyncError), Writable(#[source] IoUringError),
#[error("Cannot wait for fd to become readable")] #[error("Cannot wait for fd to become readable")]
Readable(#[source] AsyncError), Readable(#[source] IoUringError),
#[error("The socket is closed")] #[error("The socket is closed")]
Closed, Closed,
} }
@ -44,7 +44,8 @@ struct MessageOffset {
} }
pub struct BufIo { pub struct BufIo {
fd: AsyncFd, fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
bufs: Stack<Vec<u8>>, bufs: Stack<Vec<u8>>,
outgoing: AsyncQueue<BufIoMessage>, outgoing: AsyncQueue<BufIoMessage>,
} }
@ -69,9 +70,10 @@ struct Outgoing {
} }
impl BufIo { impl BufIo {
pub fn new(fd: AsyncFd) -> Self { pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>) -> Self {
Self { Self {
fd, fd: fd.clone(),
ring: ring.clone(),
bufs: Default::default(), bufs: Default::default(),
outgoing: Default::default(), outgoing: Default::default(),
} }
@ -130,7 +132,7 @@ impl BufIoIncoming {
if e.0 != c::EAGAIN { if e.0 != c::EAGAIN {
return Err(BufIoError::ReadError(e.into())); return Err(BufIoError::ReadError(e.into()));
} }
if let Err(e) = self.bufio.fd.readable().await { if let Err(e) = self.bufio.ring.readable(&self.bufio.fd).await {
return Err(BufIoError::Readable(e)); return Err(BufIoError::Readable(e));
} }
} }
@ -184,7 +186,7 @@ impl Outgoing {
if e != Errno(c::EAGAIN) { if e != Errno(c::EAGAIN) {
return Err(BufIoError::FlushError(e.into())); return Err(BufIoError::FlushError(e.into()));
} }
if let Err(e) = self.bufio.fd.writable().await { if let Err(e) = self.bufio.ring.writable(&self.bufio.fd).await {
return Err(BufIoError::Writable(e)); return Err(BufIoError::Writable(e));
} }
} }

View file

@ -1,6 +1,7 @@
use { use {
crate::{ crate::{
async_engine::{AsyncEngine, AsyncError, AsyncFd, SpawnedFuture}, async_engine::{AsyncEngine, AsyncError, SpawnedFuture},
io_uring::IoUring,
time::{Time, TimeError}, time::{Time, TimeError},
utils::{ utils::{
copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell, oserror::OsError,
@ -18,7 +19,7 @@ use {
time::Duration, time::Duration,
}, },
thiserror::Error, thiserror::Error,
uapi::c, uapi::{c, OwnedFd},
}; };
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -99,7 +100,8 @@ impl Future for WheelTimeoutFuture {
pub struct WheelData { pub struct WheelData {
destroyed: Cell<bool>, destroyed: Cell<bool>,
fd: AsyncFd, ring: Rc<IoUring>,
fd: Rc<OwnedFd>,
next_id: NumCell<u64>, next_id: NumCell<u64>,
start: Time, start: Time,
current_expiration: Cell<Option<Time>>, current_expiration: Cell<Option<Time>>,
@ -110,14 +112,14 @@ pub struct WheelData {
} }
impl Wheel { impl Wheel {
pub fn new(eng: &Rc<AsyncEngine>) -> Result<Rc<Self>, WheelError> { pub fn new(eng: &Rc<AsyncEngine>, ring: &Rc<IoUring>) -> Result<Rc<Self>, WheelError> {
let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) { let fd = match uapi::timerfd_create(c::CLOCK_MONOTONIC, c::TFD_CLOEXEC | c::TFD_NONBLOCK) {
Ok(fd) => Rc::new(fd), Ok(fd) => Rc::new(fd),
Err(e) => return Err(WheelError::CreateFailed(e.into())), Err(e) => return Err(WheelError::CreateFailed(e.into())),
}; };
let fd = eng.fd(&fd)?;
let data = Rc::new(WheelData { let data = Rc::new(WheelData {
destroyed: Cell::new(false), destroyed: Cell::new(false),
ring: ring.clone(),
fd, fd,
next_id: NumCell::new(1), next_id: NumCell::new(1),
start: Time::now()?, start: Time::now()?,
@ -207,7 +209,7 @@ impl WheelData {
async fn dispatch(self: Rc<Self>) { async fn dispatch(self: Rc<Self>) {
loop { loop {
if let Err(e) = self.fd.readable().await { if let Err(e) = self.ring.readable(&self.fd).await {
log::error!( log::error!(
"Could not wait for the timerfd to become readable: {}", "Could not wait for the timerfd to become readable: {}",
ErrorFmt(e) ErrorFmt(e)

View file

@ -5,8 +5,9 @@ pub use crate::xcon::{
}; };
use { use {
crate::{ crate::{
async_engine::{AsyncEngine, AsyncError, Phase, SpawnedFuture}, async_engine::{AsyncError, Phase, SpawnedFuture},
compositor::DISPLAY, compositor::DISPLAY,
state::State,
utils::{ utils::{
bufio::{BufIo, BufIoError, BufIoMessage}, bufio::{BufIo, BufIoError, BufIoMessage},
clonecell::CloneCell, clonecell::CloneCell,
@ -385,7 +386,7 @@ impl Xcon {
Ok(id) Ok(id)
} }
pub async fn connect(eng: Rc<AsyncEngine>) -> Result<Rc<Self>, XconError> { pub async fn connect(state: &Rc<State>) -> Result<Rc<Self>, XconError> {
let authority = match XAuthority::load() { let authority = match XAuthority::load() {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
@ -433,18 +434,17 @@ impl Xcon {
} }
(&[], &[]) (&[], &[])
}; };
Self::connect_to_fd(&eng, &fd, auth_method, auth_value).await Self::connect_to_fd(state, &fd, auth_method, auth_value).await
} }
pub async fn connect_to_fd( pub async fn connect_to_fd(
eng: &Rc<AsyncEngine>, state: &Rc<State>,
fd: &Rc<OwnedFd>, fd: &Rc<OwnedFd>,
auth_method: &[u8], auth_method: &[u8],
auth_value: &[u8], auth_value: &[u8],
) -> Result<Rc<Self>, XconError> { ) -> Result<Rc<Self>, XconError> {
let fd = eng.fd(fd)?;
let data = Rc::new(XconData { let data = Rc::new(XconData {
bufio: Rc::new(BufIo::new(fd)), bufio: Rc::new(BufIo::new(fd, &state.ring)),
next_serial: NumCell::new(1), next_serial: NumCell::new(1),
last_recv_serial: Cell::new(0), last_recv_serial: Cell::new(0),
reply_handlers: Default::default(), reply_handlers: Default::default(),
@ -454,7 +454,9 @@ impl Xcon {
xorg: CloneCell::new(Weak::new()), xorg: CloneCell::new(Weak::new()),
events: Default::default(), events: Default::default(),
}); });
let outgoing = eng.spawn2(Phase::PostLayout, handle_outgoing(data.clone())); let outgoing = state
.eng
.spawn2(Phase::PostLayout, handle_outgoing(data.clone()));
let mut buf = data.bufio.buf(); let mut buf = data.bufio.buf();
let mut fds = vec![]; let mut fds = vec![];
{ {
@ -497,7 +499,7 @@ impl Xcon {
return Err(XconError::Authenticate(reason.to_owned())); return Err(XconError::Authenticate(reason.to_owned()));
} }
let setup = Setup::deserialize(&mut parser)?; let setup = Setup::deserialize(&mut parser)?;
let incoming = eng.spawn(handle_incoming(data.clone(), incoming)); let incoming = state.eng.spawn(handle_incoming(data.clone(), incoming));
let slf = Rc::new(Self { let slf = Rc::new(Self {
extensions: data.fetch_extension_data().await?, extensions: data.fetch_extension_data().await?,
outgoing: Cell::new(Some(outgoing)), outgoing: Cell::new(Some(outgoing)),

View file

@ -3,7 +3,6 @@ mod xwm;
use { use {
crate::{ crate::{
async_engine::AsyncError,
client::ClientError, client::ClientError,
compositor::DISPLAY, compositor::DISPLAY,
forker::{ForkerError, ForkerProxy}, forker::{ForkerError, ForkerProxy},
@ -19,6 +18,7 @@ use {
WlSurface, WlSurface,
}, },
}, },
io_uring::IoUringError,
state::State, state::State,
user_session::import_environment, user_session::import_environment,
utils::{errorfmt::ErrorFmt, oserror::OsError, tri::Try}, utils::{errorfmt::ErrorFmt, oserror::OsError, tri::Try},
@ -59,8 +59,8 @@ enum XWaylandError {
BindFailed(#[source] OsError), BindFailed(#[source] OsError),
#[error("All X displays in the range 0..1000 are already in use")] #[error("All X displays in the range 0..1000 are already in use")]
AddressesInUse, AddressesInUse,
#[error("The async engine returned an error")] #[error("The io-uring returned an error")]
AsyncError(#[from] AsyncError), RingError(#[from] IoUringError),
#[error("pipe(2) failed")] #[error("pipe(2) failed")]
Pipe(#[source] OsError), Pipe(#[source] OsError),
#[error("socketpair(2) failed")] #[error("socketpair(2) failed")]
@ -120,12 +120,7 @@ pub async fn manage(state: Rc<State>) {
if state.backend.get().import_environment() { if state.backend.get().import_environment() {
import_environment(&state, DISPLAY, &display); import_environment(&state, DISPLAY, &display);
} }
let res = XWaylandError::tria(async { if let Err(e) = state.ring.readable(&socket).await {
state.eng.fd(&socket)?.readable().await?;
Ok(())
})
.await;
if let Err(e) = res {
log::error!("{}", ErrorFmt(e)); log::error!("{}", ErrorFmt(e));
return; return;
} }
@ -192,7 +187,7 @@ async fn run(
Ok(c) => c, Ok(c) => c,
Err(e) => return Err(XWaylandError::SpawnClient(e)), Err(e) => return Err(XWaylandError::SpawnClient(e)),
}; };
state.eng.fd(&Rc::new(dfdread))?.readable().await?; state.ring.readable(&Rc::new(dfdread)).await?;
state.xwayland.queue.clear(); state.xwayland.queue.clear();
{ {
let shared = Rc::new(XwmShared::default()); let shared = Rc::new(XwmShared::default());
@ -201,7 +196,7 @@ async fn run(
Err(e) => return Err(XWaylandError::CreateWm(Box::new(e))), Err(e) => return Err(XWaylandError::CreateWm(Box::new(e))),
}; };
let _wm = state.eng.spawn(wm.run()); let _wm = state.eng.spawn(wm.run());
state.eng.fd(&Rc::new(pidfd))?.readable().await?; state.ring.readable(&Rc::new(pidfd)).await?;
} }
state.xwayland.queue.clear(); state.xwayland.queue.clear();
stderr_read.await; stderr_read.await;
@ -226,6 +221,7 @@ pub fn build_args(fds: &[OwnedFd]) -> (String, Vec<String>) {
} }
async fn log_xwayland(state: Rc<State>, stderr: OwnedFd) { async fn log_xwayland(state: Rc<State>, stderr: OwnedFd) {
let stderr = Rc::new(stderr);
let res = Errno::tri(|| { let res = Errno::tri(|| {
uapi::fcntl_setfl( uapi::fcntl_setfl(
stderr.raw(), stderr.raw(),
@ -237,21 +233,11 @@ async fn log_xwayland(state: Rc<State>, stderr: OwnedFd) {
log::error!("Could not set stderr fd to nonblock: {}", ErrorFmt(e)); log::error!("Could not set stderr fd to nonblock: {}", ErrorFmt(e));
return; return;
} }
let afd = match state.eng.fd(&Rc::new(stderr)) {
Ok(f) => f,
Err(e) => {
log::error!(
"Could not turn the stderr fd into an async fd: {}",
ErrorFmt(e)
);
return;
}
};
let mut buf = vec![]; let mut buf = vec![];
let mut buf2 = [0; 128]; let mut buf2 = [0; 128];
let mut done = false; let mut done = false;
while !done { while !done {
if let Err(e) = afd.readable().await { if let Err(e) = state.ring.readable(&stderr).await {
log::error!( log::error!(
"Cannot wait for the xwayland stderr to become readable: {}", "Cannot wait for the xwayland stderr to become readable: {}",
ErrorFmt(e) ErrorFmt(e)
@ -259,7 +245,7 @@ async fn log_xwayland(state: Rc<State>, stderr: OwnedFd) {
return; return;
} }
loop { loop {
match uapi::read(afd.raw(), &mut buf2[..]) { match uapi::read(stderr.raw(), &mut buf2[..]) {
Ok(buf2) if buf2.len() > 0 => { Ok(buf2) if buf2.len() > 0 => {
buf.extend_from_slice(buf2); buf.extend_from_slice(buf2);
} }

View file

@ -1,6 +1,6 @@
use { use {
crate::{ crate::{
async_engine::{AsyncFd, SpawnedFuture}, async_engine::SpawnedFuture,
client::Client, client::Client,
ifs::{ ifs::{
ipc::{ ipc::{
@ -18,6 +18,7 @@ use {
WlSurface, WlSurface,
}, },
}, },
io_uring::{IoUring, TaskResultExt},
rect::Rect, rect::Rect,
state::State, state::State,
tree::ToplevelNode, tree::ToplevelNode,
@ -282,7 +283,7 @@ impl Wm {
socket: OwnedFd, socket: OwnedFd,
shared: &Rc<XwmShared>, shared: &Rc<XwmShared>,
) -> Result<Self, XWaylandError> { ) -> Result<Self, XWaylandError> {
let c = match Xcon::connect_to_fd(&state.eng, &Rc::new(socket), &[], &[]).await { let c = match Xcon::connect_to_fd(&state, &Rc::new(socket), &[], &[]).await {
Ok(c) => c, Ok(c) => c,
Err(e) => return Err(XWaylandError::Connect(e)), Err(e) => return Err(XWaylandError::Connect(e)),
}; };
@ -1576,19 +1577,13 @@ impl Wm {
log::error!("Could not make pipe nonblocking: {}", e); log::error!("Could not make pipe nonblocking: {}", e);
break 'convert; break 'convert;
} }
let fd = match self.state.eng.fd(&Rc::new(rx)) {
Ok(afd) => afd,
Err(e) => {
log::error!("Could not create an async fd: {}", ErrorFmt(e));
break 'convert;
}
};
success = None; success = None;
receive_data_offer::<T>(&offer.offer, &mt, Rc::new(tx)); receive_data_offer::<T>(&offer.offer, &mt, Rc::new(tx));
let id = self.transfer_ids.fetch_add(1); let id = self.transfer_ids.fetch_add(1);
let wtx = WaylandToXTransfer { let wtx = WaylandToXTransfer {
id, id,
fd, fd: Rc::new(rx),
ring: self.state.ring.clone(),
c: self.c.clone(), c: self.c.clone(),
window: event.requestor, window: event.requestor,
time: event.time, time: event.time,
@ -2391,11 +2386,11 @@ impl XToWaylandTransfer {
while pos < self.data.len() { while pos < self.data.len() {
let f1 = self let f1 = self
.state .state
.io_uring .ring
.write(&self.fd, &self.data, pos, self.data.len() - pos); .write(&self.fd, &self.data, pos, self.data.len() - pos);
pin_mut!(f1); pin_mut!(f1);
match future::select(f1, &mut timeout).await { match future::select(f1, &mut timeout).await {
Either::Left((res, _)) => match res { Either::Left((res, _)) => match res.merge() {
Ok(n) => pos += n, Ok(n) => pos += n,
Err(e) => { Err(e) => {
log::error!("Could not write to wayland client: {}", ErrorFmt(e)); log::error!("Could not write to wayland client: {}", ErrorFmt(e));
@ -2414,7 +2409,8 @@ impl XToWaylandTransfer {
struct WaylandToXTransfer { struct WaylandToXTransfer {
id: u64, id: u64,
fd: AsyncFd, fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
c: Rc<Xcon>, c: Rc<Xcon>,
window: u32, window: u32,
time: u32, time: u32,
@ -2449,7 +2445,7 @@ impl WaylandToXTransfer {
} }
} }
Err(Errno(c::EAGAIN)) => { Err(Errno(c::EAGAIN)) => {
if let Err(e) = self.fd.readable().await { if let Err(e) = self.ring.readable(&self.fd).await {
log::error!("Could not wait for fd to become readable: {}", ErrorFmt(e)); log::error!("Could not wait for fd to become readable: {}", ErrorFmt(e));
break; break;
} }