//! Tools for IO operations. use { crate::_private::PollableId, futures_util::{AsyncWrite, io::AsyncRead}, std::{ future::poll_fn, io::{self, ErrorKind, IoSlice, IoSliceMut, Read, Write}, os::fd::{AsFd, AsRawFd}, pin::Pin, task::{Context, Poll, ready}, }, thiserror::Error, uapi::c, }; #[derive(Debug, Error)] enum AsyncError { #[error("Could not retrieve the file description flags")] GetFl(#[source] io::Error), #[error("Could not set the file description flags")] SetFl(#[source] io::Error), #[error("This configuration has already been destroyed")] Destroyed, #[error("The compositor could not create the necessary data structures: {0}")] CompositorSetup(String), #[error("Could not poll the file description: {0}")] Poll(String), } impl From for io::Error { fn from(value: AsyncError) -> Self { io::Error::other(value) } } /// An async adapter for types implementing [`AsFd`]. pub struct Async { id: PollableIdWrapper, t: Option, } impl Unpin for Async {} struct PollableIdWrapper { id: PollableId, } impl Drop for PollableIdWrapper { fn drop(&mut self) { get!().remove_pollable(self.id); } } impl Async where T: AsFd, { /// Creates a new async adapter. /// /// This takes ownership of the file description and duplicates the file descriptor. /// You should not modify the file description while this object is in use, otherwise /// the behavior is undefined. pub fn new(t: T) -> Result { Ok(Self::new_(t)?) } fn new_(t: T) -> Result { let fd = t.as_fd(); let fl = uapi::fcntl_getfl(fd.as_raw_fd()) .map_err(|e| AsyncError::GetFl(io::Error::from_raw_os_error(e.0)))?; uapi::fcntl_setfl(fd.as_raw_fd(), fl | c::O_NONBLOCK) .map_err(|e| AsyncError::SetFl(io::Error::from_raw_os_error(e.0)))?; let id = get!(Err(AsyncError::Destroyed)) .create_pollable(fd.as_raw_fd()) .map_err(AsyncError::CompositorSetup)?; Ok(Self { id: PollableIdWrapper { id }, t: Some(t), }) } } impl Async { /// Unwraps the underlying object. /// /// Note that the underlying object is still non-blocking at this point. pub fn unwrap(self) -> T { self.t.unwrap() } fn poll_(&self, writable: bool, cx: &mut Context<'_>) -> Poll> { get!(Poll::Ready(Err(AsyncError::Destroyed))) .poll_io(self.id.id, writable, cx) .map_err(AsyncError::Poll) } async fn poll(&self, writable: bool) -> Result<(), io::Error> { poll_fn(|cx| self.poll_(writable, cx)).await?; Ok(()) } /// Waits for the file description to become readable. pub async fn readable(&self) -> Result<(), io::Error> { self.poll(false).await } /// Waits for the file description to become writable. pub async fn writable(&self) -> Result<(), io::Error> { self.poll(true).await } } impl AsRef for Async { fn as_ref(&self) -> &T { self.t.as_ref().unwrap() } } impl AsMut for Async { fn as_mut(&mut self) -> &mut T { self.t.as_mut().unwrap() } } fn poll_io( slf: &mut Async, writable: bool, cx: &mut Context<'_>, mut f: impl FnMut(&mut Async) -> io::Result, ) -> Poll> { loop { ready!(slf.poll_(writable, cx))?; match f(slf) { Err(e) if e.kind() == ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } } } impl AsyncRead for Async where T: Read, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { poll_io(self.get_mut(), false, cx, |slf| slf.as_mut().read(buf)) } fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { poll_io(self.get_mut(), false, cx, |slf| { slf.as_mut().read_vectored(bufs) }) } } impl AsyncWrite for Async where T: Write, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().write(buf)) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { poll_io(self.get_mut(), true, cx, |slf| { slf.as_mut().write_vectored(bufs) }) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { poll_io(self.get_mut(), true, cx, |slf| slf.as_mut().flush()) } fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { self.get_mut().t.take(); Poll::Ready(Ok(())) } }