1
0
Fork 0
forked from wry/wry

io-uring: add sendmsg

This commit is contained in:
Julian Orth 2022-05-13 17:37:20 +02:00
parent 9416efeabe
commit e4f97287bc
17 changed files with 493 additions and 191 deletions

View file

@ -3,6 +3,7 @@ use {
async_engine::Phase,
client::{Client, ClientError},
object::ObjectId,
time::Time,
utils::{
buffd::{BufFdIn, BufFdOut, MsgParser},
errorfmt::ErrorFmt,
@ -100,7 +101,7 @@ async fn receive(data: Rc<Client>) {
async fn send(data: Rc<Client>) {
let send = async {
let mut out = BufFdOut::new(&data.socket, &data.state.ring, &data.state.wheel);
let mut out = BufFdOut::new(&data.socket, &data.state.ring);
let mut buffers = VecDeque::new();
loop {
data.flush_request.triggered().await;
@ -109,9 +110,9 @@ async fn send(data: Rc<Client>) {
swapchain.commit();
mem::swap(&mut swapchain.pending, &mut buffers);
}
let mut timeout = None;
let timeout = Time::in_ms(5000).unwrap();
while let Some(mut cur) = buffers.pop_front() {
out.flush(&mut cur, &mut timeout).await?;
out.flush(&mut cur, timeout).await?;
data.swapchain.borrow_mut().free.push(cur);
}
}
@ -122,9 +123,9 @@ async fn send(data: Rc<Client>) {
log::info!("Client {} terminated the connection", data.id.0);
} else {
log::error!(
"An error occurred while sending data to client {}: {:#}",
"An error occurred while sending data to client {}: {}",
data.id.0,
e
ErrorFmt(e)
);
}
}

View file

@ -15,7 +15,6 @@ use {
buffd::BufFdError, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, numcell::NumCell,
queue::AsyncQueue,
},
wheel::Wheel,
xwayland,
},
bincode::{
@ -237,7 +236,7 @@ impl ForkerProxy {
}
async fn outgoing(self: Rc<Self>, state: Rc<State>) {
let mut io = IoOut::new(&self.socket, &state.ring, &state.wheel);
let mut io = IoOut::new(&self.socket, &state.ring);
loop {
let msg = self.outgoing.pop().await;
for fd in self.fds.borrow_mut().drain(..) {
@ -301,7 +300,6 @@ struct Forker {
socket: Rc<OwnedFd>,
ae: Rc<AsyncEngine>,
ring: Rc<IoUring>,
wheel: Rc<Wheel>,
fds: RefCell<Vec<Rc<OwnedFd>>>,
outgoing: AsyncQueue<ForkerMessage>,
pending_spawns: CopyHashMap<c::pid_t, SpawnedFuture<()>>,
@ -329,12 +327,10 @@ impl Forker {
});
let ae = AsyncEngine::new();
let ring = IoUring::new(&ae, 32).unwrap();
let wheel = Wheel::new(&ae, &ring).unwrap();
let forker = Rc::new(Forker {
socket,
ae: ae.clone(),
ring: ring.clone(),
wheel,
fds: RefCell::new(vec![]),
outgoing: Default::default(),
pending_spawns: Default::default(),
@ -346,7 +342,7 @@ impl Forker {
}
async fn outgoing(self: Rc<Self>) {
let mut io = IoOut::new(&self.socket, &self.ring, &self.wheel);
let mut io = IoOut::new(&self.socket, &self.ring);
loop {
let msg = self.outgoing.pop().await;
for fd in self.fds.borrow_mut().drain(..) {

View file

@ -11,7 +11,6 @@ use {
buffd::{BufFdIn, BufFdOut},
vec_ext::VecExt,
},
wheel::Wheel,
},
jay_config::_private::bincode_ops,
uapi::OwnedFd,
@ -63,9 +62,9 @@ pub struct IoOut {
}
impl IoOut {
pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>, wheel: &Rc<Wheel>) -> Self {
pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>) -> Self {
Self {
outgoing: BufFdOut::new(fd, ring, wheel),
outgoing: BufFdOut::new(fd, ring),
scratch: vec![],
fds: vec![],
}

View file

@ -1,8 +1,12 @@
pub use ops::TaskResultExt;
use {
crate::{
async_engine::AsyncEngine,
io_uring::{
ops::{async_cancel::AsyncCancelTask, poll::PollTask, write::WriteTask},
ops::{
async_cancel::AsyncCancelTask, poll::PollTask, sendmsg::SendmsgTask,
timeout::TimeoutTask, write::WriteTask,
},
pending_result::PendingResults,
sys::{
io_uring_cqe, io_uring_enter, io_uring_params, io_uring_setup, io_uring_sqe,
@ -24,7 +28,7 @@ use {
},
},
std::{
cell::{Cell, UnsafeCell},
cell::{Cell, RefCell, UnsafeCell},
mem::{self},
rc::Rc,
sync::atomic::{
@ -49,7 +53,6 @@ macro_rules! map_err {
}
}};
}
pub use ops::TaskResultExt;
mod ops;
mod pending_result;
@ -58,7 +61,7 @@ mod sys;
#[derive(Debug, Error)]
pub enum IoUringError {
#[error(transparent)]
OsError(OsError),
OsError(#[from] OsError),
#[error("Could not create an io-uring")]
CreateUring(#[source] OsError),
#[error("The kernel does not support the IORING_FEAT_NODROP feature")]
@ -201,6 +204,9 @@ impl IoUring {
cached_writes: Default::default(),
cached_cancels: Default::default(),
cached_polls: Default::default(),
cached_sendmsg: Default::default(),
cached_timeouts: Default::default(),
fd_ids_scratch: Default::default(),
});
Ok(Rc::new(Self { ring: data }))
}
@ -248,6 +254,10 @@ struct IoUringData {
cached_writes: Stack<Box<WriteTask>>,
cached_cancels: Stack<Box<AsyncCancelTask>>,
cached_polls: Stack<Box<PollTask>>,
cached_sendmsg: Stack<Box<SendmsgTask>>,
cached_timeouts: Stack<Box<TimeoutTask>>,
fd_ids_scratch: RefCell<Vec<c::c_int>>,
}
unsafe trait Task {

View file

@ -2,6 +2,8 @@ use crate::{io_uring::IoUringError, utils::oserror::OsError};
pub mod async_cancel;
pub mod poll;
pub mod sendmsg;
pub mod timeout;
pub mod write;
pub type TaskResult<T> = Result<Result<T, OsError>, IoUringError>;

130
src/io_uring/ops/sendmsg.rs Normal file
View file

@ -0,0 +1,130 @@
use {
crate::{
io_uring::{
pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_SENDMSG, IOSQE_IO_LINK},
IoUring, IoUringData, IoUringError, Task,
},
time::Time,
utils::{buf::Buf, vec_ext::UninitVecExt},
},
std::{
mem::{self, MaybeUninit},
ptr,
rc::Rc,
},
uapi::{c, OwnedFd},
};
impl IoUring {
pub async fn sendmsg(
&self,
fd: &Rc<OwnedFd>,
buf: Buf,
fds: Vec<Rc<OwnedFd>>,
timeout: Option<Time>,
) -> Result<usize, IoUringError> {
self.ring.check_destroyed()?;
let id = self.ring.id();
let pr = self.ring.pending_results.acquire();
{
let mut st = self.ring.cached_sendmsg.pop().unwrap_or_default();
st.fds = fds;
if st.fds.len() > 0 {
let mut fd_ids = self.ring.fd_ids_scratch.borrow_mut();
fd_ids.clear();
fd_ids.extend(st.fds.iter().map(|f| f.raw()));
let space = uapi::cmsg_space(mem::size_of_val(&fd_ids[..]));
st.cmsg.clear();
st.cmsg.reserve(space);
st.cmsg.set_len_safe(space);
let hdr = c::cmsghdr {
cmsg_len: 0,
cmsg_level: c::SOL_SOCKET,
cmsg_type: c::SCM_RIGHTS,
};
uapi::cmsg_write(&mut &mut st.cmsg[..], hdr, &fd_ids[..]).unwrap();
st.msghdr.msg_control = st.cmsg.as_ptr() as _;
st.msghdr.msg_controllen = st.cmsg.len() as _;
} else {
st.msghdr.msg_control = ptr::null_mut();
st.msghdr.msg_controllen = 0;
}
st.id = id.id;
st.fd = fd.raw();
st.iovec.iov_base = buf.as_ptr() as _;
st.iovec.iov_len = buf.len() as _;
st.msghdr.msg_iov = &st.iovec as *const _ as _;
st.msghdr.msg_iovlen = 1;
st.data = Some(SendmsgTaskData {
_fd: fd.clone(),
_buf: buf,
res: pr.clone(),
});
st.has_timeout = timeout.is_some();
self.ring.schedule(st);
if let Some(timeout) = timeout {
self.schedule_timeout(timeout);
}
}
Ok(pr.await? as _)
}
}
struct SendmsgTaskData {
_fd: Rc<OwnedFd>,
_buf: Buf,
res: PendingResult,
}
pub struct SendmsgTask {
id: u64,
iovec: c::iovec,
msghdr: c::msghdr,
fd: i32,
has_timeout: bool,
fds: Vec<Rc<OwnedFd>>,
cmsg: Vec<MaybeUninit<u8>>,
data: Option<SendmsgTaskData>,
}
impl Default for SendmsgTask {
fn default() -> Self {
unsafe {
SendmsgTask {
id: 0,
iovec: MaybeUninit::zeroed().assume_init(),
msghdr: MaybeUninit::zeroed().assume_init(),
fd: 0,
has_timeout: false,
fds: vec![],
cmsg: vec![],
data: None,
}
}
}
}
unsafe impl Task for SendmsgTask {
fn id(&self) -> u64 {
self.id
}
fn complete(mut self: Box<Self>, ring: &IoUringData, res: i32) {
self.fds.clear();
if let Some(data) = self.data.take() {
data.res.complete(res);
}
ring.cached_sendmsg.push(self);
}
fn encode(&self, sqe: &mut io_uring_sqe) {
sqe.opcode = IORING_OP_SENDMSG;
sqe.fd = self.fd;
sqe.u2.addr = &self.msghdr as *const _ as _;
sqe.u3.msg_flags = c::MSG_NOSIGNAL as _;
if self.has_timeout {
sqe.flags = IOSQE_IO_LINK;
}
}
}

View file

@ -0,0 +1,54 @@
use {
crate::{
io_uring::{
sys::{io_uring_sqe, IORING_OP_LINK_TIMEOUT, IORING_TIMEOUT_ABS},
IoUring, IoUringData, Task,
},
time::Time,
},
uapi::c,
};
#[allow(non_camel_case_types)]
#[repr(C)]
#[derive(Default)]
struct timespec64 {
tv_sec: i64,
tv_nsec: c::c_long,
}
#[derive(Default)]
pub struct TimeoutTask {
id: u64,
timespec: timespec64,
}
impl IoUring {
pub(super) fn schedule_timeout(&self, timeout: Time) {
let id = self.ring.id_raw();
{
let mut to = self.ring.cached_timeouts.pop().unwrap_or_default();
to.id = id;
to.timespec.tv_sec = timeout.0.tv_sec as _;
to.timespec.tv_nsec = timeout.0.tv_nsec as _;
self.ring.schedule(to);
}
}
}
unsafe impl Task for TimeoutTask {
fn id(&self) -> u64 {
self.id
}
fn complete(self: Box<Self>, ring: &IoUringData, _res: i32) {
ring.cached_timeouts.push(self);
}
fn encode(&self, sqe: &mut io_uring_sqe) {
sqe.opcode = IORING_OP_LINK_TIMEOUT;
sqe.u2.addr = &self.timespec as *const _ as _;
sqe.len = 1;
sqe.u3.timeout_flags = IORING_TIMEOUT_ABS;
}
}

View file

@ -1,26 +1,22 @@
use {
crate::io_uring::{
ops::TaskResult,
pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_WRITE},
IoUring, IoUringData, Task,
crate::{
io_uring::{
ops::TaskResult,
pending_result::PendingResult,
sys::{io_uring_sqe, IORING_OP_WRITE},
IoUring, IoUringData, Task,
},
utils::buf::Buf,
},
std::{
cell::{Cell, RefCell},
ops::Range,
rc::Rc,
},
uapi::OwnedFd,
};
impl IoUring {
pub async fn write(
&self,
fd: &Rc<OwnedFd>,
buf: &Rc<Vec<u8>>,
offset: usize,
n: usize,
) -> TaskResult<usize> {
pub async fn write(&self, fd: &Rc<OwnedFd>, buf: Buf) -> TaskResult<usize> {
self.ring.check_destroyed()?;
let id = self.ring.id();
let pr = self.ring.pending_results.acquire();
@ -34,8 +30,7 @@ impl IoUring {
pw.id.set(id.id);
*pw.data.borrow_mut() = Some(WriteTaskData {
fd: fd.clone(),
buf: buf.clone(),
range: offset..offset + n,
buf,
res: pr.clone(),
});
self.ring.schedule(pw);
@ -46,8 +41,7 @@ impl IoUring {
struct WriteTaskData {
fd: Rc<OwnedFd>,
buf: Rc<Vec<u8>>,
range: Range<usize>,
buf: Buf,
res: PendingResult,
}
@ -74,8 +68,8 @@ unsafe impl Task for WriteTask {
sqe.opcode = IORING_OP_WRITE;
sqe.fd = data.fd.raw();
sqe.u1.off = !0;
sqe.u2.addr = data.buf[data.range.clone()].as_ptr() as _;
sqe.u2.addr = data.buf.as_ptr() as _;
sqe.u3.rw_flags = 0;
sqe.len = data.range.len() as _;
sqe.len = data.buf.len() as _;
}
}

View file

@ -142,7 +142,7 @@ impl TestTransport {
self.run.state.eng.spawn(
Outgoing {
tc: self.clone(),
buf: BufFdOut::new(&self.socket, &self.run.state.ring, &self.run.state.wheel),
buf: BufFdOut::new(&self.socket, &self.run.state.ring),
buffers: Default::default(),
}
.run(),

View file

@ -36,6 +36,11 @@ impl Time {
Ok(Self(time))
}
pub fn in_ms(ms: u64) -> Result<Time, TimeError> {
let now = Self::now()?;
Ok(now + Duration::from_millis(ms))
}
#[allow(dead_code)]
pub fn now_unchecked() -> Time {
let mut time = uapi::pod_zeroed();

View file

@ -195,7 +195,7 @@ impl ToolClient {
slf.eng.spawn(
Outgoing {
tc: slf.clone(),
buf: BufFdOut::new(&socket, &slf.ring, &slf.wheel),
buf: BufFdOut::new(&socket, &slf.ring),
buffers: Default::default(),
}
.run(),

View file

@ -2,6 +2,7 @@ pub mod array;
pub mod asyncevent;
pub mod bitfield;
pub mod bitflags;
pub mod buf;
pub mod buffd;
pub mod bufio;
pub mod clonecell;

155
src/utils/buf.rs Normal file
View file

@ -0,0 +1,155 @@
use {
crate::utils::{numcell::NumCell, ptr_ext::PtrExt},
std::{
alloc::Layout,
collections::Bound,
ops::{Deref, DerefMut, Range, RangeBounds},
ptr::NonNull,
slice,
},
};
const METADATA_SIZE: u32 = 8;
const METADATA_ALIGN: usize = 4;
const RC_OFF: u32 = 4;
const RC_OFF_INV: u32 = METADATA_SIZE - RC_OFF;
pub struct Buf {
storage: NonNull<u8>,
range: Range<u32>,
}
impl Buf {
pub fn from_slice(vec: &[u8]) -> Buf {
let len = vec.len();
assert!(len <= (u32::MAX - METADATA_SIZE) as usize);
let len = len as u32;
let size = len + METADATA_SIZE;
let layout = Layout::from_size_align(size as _, METADATA_ALIGN).unwrap();
let ptr = unsafe { std::alloc::alloc(layout) };
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
unsafe {
*ptr.cast::<u32>() = size;
*ptr.add(RC_OFF as _).cast::<u32>() = 1;
let mut buf = Buf {
storage: NonNull::new_unchecked(ptr.add(METADATA_SIZE as _)),
range: Range { start: 0, end: len },
};
buf[..].copy_from_slice(vec);
buf
}
}
pub fn new(len: usize) -> Buf {
assert!(len <= (u32::MAX - METADATA_SIZE) as usize);
let len = len as u32;
let size = len + METADATA_SIZE;
let layout = Layout::from_size_align(size as _, METADATA_ALIGN).unwrap();
let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
unsafe {
*ptr.cast::<u32>() = size;
*ptr.add(RC_OFF as _).cast::<u32>() = 1;
Buf {
storage: NonNull::new_unchecked(ptr.add(METADATA_SIZE as _)),
range: Range { start: 0, end: len },
}
}
}
pub fn clone(&mut self) -> Buf {
self.rc().fetch_add(1);
Buf {
storage: self.storage,
range: self.range.clone(),
}
}
pub fn slice(&mut self, range: impl RangeBounds<usize>) -> Buf {
let start = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.wrapping_add(1),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.wrapping_add(1),
Bound::Excluded(&n) => n,
Bound::Unbounded => self.len(),
};
self.slice_(start as _, end as _)
}
fn slice_(&mut self, start: u32, end: u32) -> Buf {
assert!(start <= end);
assert!(end <= self.len32());
self.rc().fetch_add(1);
Buf {
storage: self.storage,
range: Range {
start: self.range.start + start,
end: self.range.start + end,
},
}
}
fn rc(&self) -> &NumCell<u32> {
unsafe {
self.storage
.as_ptr()
.sub(RC_OFF_INV as _)
.cast::<NumCell<u32>>()
.deref()
}
}
fn assert_unique(&self) {
assert_eq!(self.rc().get(), 1);
}
pub fn len32(&self) -> u32 {
self.range.end - self.range.start
}
pub fn len(&self) -> usize {
self.len32() as _
}
pub fn as_ptr(&self) -> *mut u8 {
unsafe { self.storage.as_ptr().add(self.range.start as _) }
}
}
impl Deref for Buf {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.assert_unique();
unsafe { slice::from_raw_parts(self.as_ptr(), self.len()) }
}
}
impl DerefMut for Buf {
fn deref_mut(&mut self) -> &mut Self::Target {
self.assert_unique();
unsafe { slice::from_raw_parts_mut(self.as_ptr(), self.len()) }
}
}
impl Drop for Buf {
fn drop(&mut self) {
unsafe {
let prev = self.rc().fetch_sub(1);
if prev != 1 {
return;
}
let ptr = self.storage.as_ptr().sub(METADATA_SIZE as _).cast::<u32>();
let size = *ptr as _;
let layout = Layout::from_size_align_unchecked(size, METADATA_ALIGN);
std::alloc::dealloc(ptr as _, layout);
}
}
}

View file

@ -1,10 +1,13 @@
use {
crate::{
io_uring::IoUring,
utils::buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE},
wheel::{Wheel, WheelTimeoutFuture},
io_uring::{IoUring, IoUringError},
time::Time,
utils::{
buf::Buf,
buffd::{BufFdError, BUF_SIZE, CMSG_BUF_SIZE},
oserror::OsError,
},
},
futures_util::{future::Fuse, select, FutureExt},
std::{
collections::VecDeque,
mem::{self, MaybeUninit},
@ -21,37 +24,33 @@ pub(super) struct MsgFds {
pub(super) fds: Vec<Rc<OwnedFd>>,
}
pub struct OutBuffer {
pub(super) struct OutBufferMeta {
pub(super) read_pos: usize,
pub(super) write_pos: usize,
pub(super) buf: *mut [MaybeUninit<u8>; OUT_BUF_SIZE],
pub(super) fds: VecDeque<MsgFds>,
}
pub struct OutBuffer {
pub(super) meta: OutBufferMeta,
pub(super) buf: Buf,
}
impl Default for OutBuffer {
fn default() -> Self {
Self {
read_pos: 0,
write_pos: 0,
buf: Box::into_raw(Box::new([MaybeUninit::<u32>::uninit(); OUT_BUF_SIZE / 4])) as _,
fds: Default::default(),
meta: OutBufferMeta {
read_pos: 0,
write_pos: 0,
fds: Default::default(),
},
buf: Buf::new(OUT_BUF_SIZE),
}
}
}
impl OutBuffer {
pub fn write(&mut self, bytes: &[MaybeUninit<u8>]) {
if bytes.len() > OUT_BUF_SIZE - self.write_pos {
panic!("Out buffer overflow");
}
unsafe {
(*self.buf)[self.write_pos..self.write_pos + bytes.len()].copy_from_slice(bytes);
}
self.write_pos += bytes.len();
}
pub fn is_full(&self) -> bool {
self.write_pos > BUF_SIZE
self.meta.write_pos > BUF_SIZE
}
}
@ -70,7 +69,7 @@ impl OutBufferSwapchain {
}
pub fn commit(&mut self) {
if self.cur.write_pos > 0 {
if self.cur.meta.write_pos > 0 {
let new = self.free.pop().unwrap_or_default();
let old = mem::replace(&mut self.cur, new);
self.pending.push_back(old);
@ -81,103 +80,66 @@ impl OutBufferSwapchain {
pub struct BufFdOut {
fd: Rc<OwnedFd>,
ring: Rc<IoUring>,
wheel: Rc<Wheel>,
cmsg_buf: Box<[MaybeUninit<u8>; CMSG_BUF_SIZE]>,
fd_ids: Vec<i32>,
}
impl BufFdOut {
pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>, wheel: &Rc<Wheel>) -> Self {
pub fn new(fd: &Rc<OwnedFd>, ring: &Rc<IoUring>) -> Self {
Self {
fd: fd.clone(),
ring: ring.clone(),
wheel: wheel.clone(),
cmsg_buf: Box::new([MaybeUninit::uninit(); CMSG_BUF_SIZE]),
fd_ids: vec![],
}
}
pub async fn flush(
&mut self,
buf: &mut OutBuffer,
timeout: &mut Option<Fuse<WheelTimeoutFuture>>,
) -> Result<(), BufFdError> {
while buf.read_pos < buf.write_pos {
if self.flush_sync(buf)? {
if timeout.is_none() {
*timeout = Some(self.wheel.timeout(5000).fuse());
}
select! {
_ = timeout.as_mut().unwrap() => {
return Err(BufFdError::Timeout);
},
res = self.ring.writable(&self.fd).fuse() => {
res?;
},
}
}
pub async fn flush(&mut self, buf: &mut OutBuffer, timeout: Time) -> Result<(), BufFdError> {
while buf.meta.read_pos < buf.meta.write_pos {
self.flush_buffer(buf, Some(timeout)).await?;
}
buf.read_pos = 0;
buf.write_pos = 0;
buf.meta.read_pos = 0;
buf.meta.write_pos = 0;
Ok(())
}
pub async fn flush_no_timeout(&mut self, buf: &mut OutBuffer) -> Result<(), BufFdError> {
while buf.read_pos < buf.write_pos {
if self.flush_sync(buf)? {
let _ = self.ring.writable(&self.fd).await?;
}
while buf.meta.read_pos < buf.meta.write_pos {
self.flush_buffer(buf, None).await?;
}
buf.read_pos = 0;
buf.write_pos = 0;
buf.meta.read_pos = 0;
buf.meta.write_pos = 0;
Ok(())
}
fn flush_sync(&mut self, buffer: &mut OutBuffer) -> Result<bool, BufFdError> {
while buffer.read_pos < buffer.write_pos {
let mut buf = unsafe { &(*buffer.buf)[buffer.read_pos..buffer.write_pos] };
let mut cmsg_len = 0;
let mut fds_opt = None;
{
let mut f = buffer.fds.front().map(|f| f.pos);
if f == Some(buffer.read_pos) {
let fds = buffer.fds.pop_front().unwrap();
self.fd_ids.clear();
self.fd_ids.extend(fds.fds.iter().map(|f| f.raw()));
let hdr = c::cmsghdr {
cmsg_len: 0,
cmsg_level: c::SOL_SOCKET,
cmsg_type: c::SCM_RIGHTS,
};
let mut cmsg_buf = &mut self.cmsg_buf[..];
cmsg_len = uapi::cmsg_write(&mut cmsg_buf, hdr, &self.fd_ids[..]).unwrap();
fds_opt = Some(fds);
f = buffer.fds.front().map(|f| f.pos)
}
if let Some(next_pos) = f {
buf = &buf[..next_pos - buffer.read_pos];
}
async fn flush_buffer(
&mut self,
buffer: &mut OutBuffer,
timeout: Option<Time>,
) -> Result<(), BufFdError> {
let mut buf = buffer
.buf
.slice(buffer.meta.read_pos..buffer.meta.write_pos);
let mut fds = vec![];
{
let mut f = buffer.meta.fds.front().map(|f| f.pos);
if f == Some(buffer.meta.read_pos) {
fds = buffer.meta.fds.pop_front().unwrap().fds;
f = buffer.meta.fds.front().map(|f| f.pos)
}
if let Some(next_pos) = f {
buf = buffer.buf.slice(buffer.meta.read_pos..next_pos);
}
let hdr = uapi::Msghdr {
iov: slice::from_ref(&buf),
control: Some(&self.cmsg_buf[..cmsg_len]),
name: uapi::sockaddr_none_ref(),
};
let bytes_sent =
match uapi::sendmsg(self.fd.raw(), &hdr, c::MSG_DONTWAIT | c::MSG_NOSIGNAL) {
Ok(b) => b,
Err(Errno(c::EAGAIN)) => {
if let Some(fds) = fds_opt {
buffer.fds.push_front(fds);
}
return Ok(true);
}
Err(Errno(c::ECONNRESET)) => return Err(BufFdError::Closed),
Err(e) => return Err(BufFdError::Io(e.into())),
};
buffer.read_pos += bytes_sent;
}
Ok(false)
match self.ring.sendmsg(&self.fd, buf, fds, timeout).await {
Ok(n) => {
buffer.meta.read_pos += n;
Ok(())
}
Err(IoUringError::OsError(OsError(c::ECONNRESET))) => return Err(BufFdError::Closed),
Err(IoUringError::OsError(OsError(c::ETIME))) => return Err(BufFdError::Timeout),
Err(e) => return Err(BufFdError::Ring(e)),
}
}
pub async fn flush2(
@ -238,11 +200,3 @@ impl BufFdOut {
Ok(false)
}
}
impl Drop for OutBuffer {
fn drop(&mut self) {
unsafe {
Box::from_raw(self.buf as *mut [MaybeUninit<u32>; OUT_BUF_SIZE / 4]);
}
}
}

View file

@ -2,14 +2,15 @@ use {
crate::{
fixed::Fixed,
object::ObjectId,
utils::buffd::buf_out::{MsgFds, OutBuffer},
utils::buffd::buf_out::{MsgFds, OutBuffer, OutBufferMeta, OUT_BUF_SIZE},
},
std::{mem, mem::MaybeUninit, rc::Rc},
uapi::OwnedFd,
std::{mem, rc::Rc},
uapi::{OwnedFd, Packed},
};
pub struct MsgFormatter<'a> {
buf: &'a mut OutBuffer,
buf: &'a mut [u8],
meta: &'a mut OutBufferMeta,
pos: usize,
fds: &'a mut Vec<Rc<OwnedFd>>,
}
@ -17,24 +18,33 @@ pub struct MsgFormatter<'a> {
impl<'a> MsgFormatter<'a> {
pub fn new(buf: &'a mut OutBuffer, fds: &'a mut Vec<Rc<OwnedFd>>) -> Self {
Self {
pos: buf.write_pos,
buf,
pos: buf.meta.write_pos,
buf: &mut buf.buf[..],
fds,
meta: &mut buf.meta,
}
}
fn write(&mut self, bytes: &[u8]) {
if bytes.len() > OUT_BUF_SIZE - self.meta.write_pos {
panic!("Out buffer overflow");
}
self.buf[self.meta.write_pos..self.meta.write_pos + bytes.len()].copy_from_slice(bytes);
self.meta.write_pos += bytes.len();
}
pub fn int(&mut self, int: i32) -> &mut Self {
self.buf.write(uapi::as_maybe_uninit_bytes(&int));
self.write(uapi::as_bytes(&int));
self
}
pub fn uint(&mut self, int: u32) -> &mut Self {
self.buf.write(uapi::as_maybe_uninit_bytes(&int));
self.write(uapi::as_bytes(&int));
self
}
pub fn fixed(&mut self, fixed: Fixed) -> &mut Self {
self.buf.write(uapi::as_maybe_uninit_bytes(&fixed));
self.write(uapi::as_bytes(&fixed.0));
self
}
@ -50,9 +60,9 @@ impl<'a> MsgFormatter<'a> {
let len = s.len() + 1;
let cap = (len + 3) & !3;
self.uint(len as u32);
self.buf.write(uapi::as_maybe_uninit_bytes(s));
let none = [MaybeUninit::new(0); 4];
self.buf.write(&none[..cap - len + 1]);
self.write(uapi::as_bytes(s));
let none = [0; 4];
self.write(&none[..cap - len + 1]);
self
}
@ -71,46 +81,43 @@ impl<'a> MsgFormatter<'a> {
#[allow(dead_code)]
pub fn array<F: FnOnce(&mut MsgFormatter<'_>)>(&mut self, f: F) -> &mut Self {
let pos = self.buf.write_pos;
let pos = self.meta.write_pos;
self.uint(0);
let len = {
let mut fmt = MsgFormatter {
buf: self.buf,
meta: self.meta,
pos,
fds: self.fds,
};
f(&mut fmt);
let len = self.buf.write_pos - pos - 4;
let none = [MaybeUninit::new(0); 4];
self.buf
.write(&none[..self.buf.write_pos.wrapping_neg() & 3]);
let len = self.meta.write_pos - pos - 4;
let none = [0; 4];
self.write(&none[..self.meta.write_pos.wrapping_neg() & 3]);
len as u32
};
unsafe {
(*self.buf.buf)[pos..pos + 4].copy_from_slice(uapi::as_maybe_uninit_bytes(&len));
}
self.buf[pos..pos + 4].copy_from_slice(uapi::as_bytes(&len));
self
}
pub fn binary<T: ?Sized>(&mut self, t: &T) -> &mut Self {
pub fn binary<T: ?Sized + Packed>(&mut self, t: &T) -> &mut Self {
self.uint(mem::size_of_val(t) as u32);
self.buf.write(uapi::as_maybe_uninit_bytes(t));
let none = [MaybeUninit::new(0); 4];
self.buf
.write(&none[..self.buf.write_pos.wrapping_neg() & 3]);
self.write(uapi::as_bytes(t));
let none = [0; 4];
self.write(&none[..self.meta.write_pos.wrapping_neg() & 3]);
self
}
pub fn write_len(self) {
assert!(self.buf.write_pos - self.pos >= 8);
assert!(self.meta.write_pos - self.pos >= 8);
assert_eq!(self.pos % 4, 0);
unsafe {
let second_ptr = (self.buf.buf as *mut u8).add(self.pos + 4) as *mut u32;
let len = ((self.buf.write_pos - self.pos) as u32) << 16;
let second_ptr = self.buf.as_ptr().add(self.pos + 4) as *mut u32;
let len = ((self.meta.write_pos - self.pos) as u32) << 16;
*second_ptr |= len;
}
if self.fds.len() > 0 {
self.buf.fds.push_back(MsgFds {
self.meta.fds.push_back(MsgFds {
pos: self.pos,
fds: mem::take(self.fds),
})

View file

@ -53,7 +53,7 @@ impl Drop for Wheel {
}
struct WheelTimeoutData {
id: u64,
id: Cell<u64>,
expired: Cell<Option<Result<(), WheelError>>>,
wheel: Rc<WheelData>,
waker: Cell<Option<Waker>>,
@ -74,7 +74,7 @@ pub struct WheelTimeoutFuture {
impl Drop for WheelTimeoutFuture {
fn drop(&mut self) {
self.data.wheel.dispatchers.remove(&self.data.id);
self.data.wheel.dispatchers.remove(&self.data.id.get());
self.data.waker.set(None);
if !self.data.wheel.destroyed.get() {
self.data.expired.take();
@ -135,12 +135,13 @@ impl Wheel {
fn future(&self) -> WheelTimeoutFuture {
let data = self.data.cached_futures.pop().unwrap_or_else(|| {
Rc::new(WheelTimeoutData {
id: self.data.next_id.fetch_add(1),
id: Cell::new(0),
expired: Cell::new(None),
wheel: self.data.clone(),
waker: Cell::new(None),
})
});
data.id.set(self.data.next_id.fetch_add(1));
WheelTimeoutFuture { data }
}
@ -148,7 +149,7 @@ impl Wheel {
if self.data.destroyed.get() {
return WheelTimeoutFuture {
data: Rc::new(WheelTimeoutData {
id: 0,
id: Cell::new(0),
expired: Cell::new(Some(Err(WheelError::Destroyed))),
wheel: self.data.clone(),
waker: Default::default(),
@ -186,11 +187,11 @@ impl Wheel {
}
self.data.expirations.borrow_mut().push(Reverse(WheelEntry {
expiration,
id: future.data.id,
id: future.data.id.get(),
}));
self.data
.dispatchers
.set(future.data.id, future.data.clone());
.set(future.data.id.get(), future.data.clone());
future
}
}
@ -235,7 +236,6 @@ impl WheelData {
}
let now = Time::now()?;
let dist = now - self.start;
let mut to_dispatch = vec![];
{
let mut expirations = self.expirations.borrow_mut();
while let Some(Reverse(entry)) = expirations.peek() {
@ -243,7 +243,7 @@ impl WheelData {
break;
}
if let Some(dispatcher) = self.dispatchers.remove(&entry.id) {
to_dispatch.push(dispatcher);
dispatcher.complete(Ok(()));
}
expirations.pop();
}
@ -267,9 +267,6 @@ impl WheelData {
expirations.pop();
}
}
for dispatcher in to_dispatch {
dispatcher.complete(Ok(()));
}
Ok(())
}
}

View file

@ -23,7 +23,7 @@ use {
state::State,
tree::ToplevelNode,
utils::{
bitflags::BitflagsExt, clonecell::CloneCell, copyhashmap::CopyHashMap,
bitflags::BitflagsExt, buf::Buf, clonecell::CloneCell, copyhashmap::CopyHashMap,
errorfmt::ErrorFmt, linkedlist::LinkedList, numcell::NumCell, oserror::OsError,
rc_eq::rc_eq, tri::Try,
},
@ -1678,7 +1678,7 @@ impl Wm {
log::error!("Could not get converted property: {}", e);
return Ok(());
}
let data = Rc::new(data);
let mut data = Buf::from_slice(&data);
for transfer in transfers {
if event.target != transfer.mime_type {
log::error!("Conversion yielded an incompatible mime type");
@ -1687,7 +1687,7 @@ impl Wm {
let id = self.transfer_ids.fetch_add(1);
let transfer = XToWaylandTransfer {
id,
data: data.clone(),
data: data.slice(..),
fd: transfer.fd,
state: self.state.clone(),
shared: self.shared.clone(),
@ -2372,22 +2372,19 @@ impl Wm {
struct XToWaylandTransfer {
id: u64,
data: Rc<Vec<u8>>,
data: Buf,
fd: Rc<OwnedFd>,
state: Rc<State>,
shared: Rc<XwmShared>,
}
impl XToWaylandTransfer {
async fn run(self) {
async fn run(mut self) {
let timeout = self.state.wheel.timeout(5000);
pin_mut!(timeout);
let mut pos = 0;
while pos < self.data.len() {
let f1 = self
.state
.ring
.write(&self.fd, &self.data, pos, self.data.len() - pos);
let f1 = self.state.ring.write(&self.fd, self.data.slice(pos..));
pin_mut!(f1);
match future::select(f1, &mut timeout).await {
Either::Left((res, _)) => match res.merge() {