diff --git a/src/pipewire/pw_ifs/pw_client_node.rs b/src/pipewire/pw_ifs/pw_client_node.rs index 7b0dd4be..f4e1c1c1 100644 --- a/src/pipewire/pw_ifs/pw_client_node.rs +++ b/src/pipewire/pw_ifs/pw_client_node.rs @@ -24,10 +24,11 @@ use { SPA_PARAM_EnumFormat, SPA_PARAM_Format, SPA_PARAM_META_size, SPA_PARAM_META_type, SPA_PARAM_Meta, SpaDataFlags, SpaDataType, SpaDirection, SpaIoType, SpaMediaSubtype, SpaMediaType, SpaMetaType, SpaNodeBuffersFlags, SpaNodeCommand, - SpaParamType, SpaVideoFormat, SPA_DATA_FLAG_READABLE, SPA_DIRECTION_INPUT, - SPA_DIRECTION_OUTPUT, SPA_NODE_BUFFERS_FLAG_ALLOC, SPA_PARAM_INFO, - SPA_PARAM_INFO_READ, SPA_PARAM_INFO_SERIAL, SPA_PORT_FLAG, - SPA_PORT_FLAG_CAN_ALLOC_BUFFERS, + SpaParamType, SpaVideoFormat, PW_NODE_ACTIVATION_FINISHED, + PW_NODE_ACTIVATION_NOT_TRIGGERED, PW_NODE_ACTIVATION_TRIGGERED, + SPA_DATA_FLAG_READABLE, SPA_DIRECTION_INPUT, SPA_DIRECTION_OUTPUT, + SPA_NODE_BUFFERS_FLAG_ALLOC, SPA_PARAM_INFO, SPA_PARAM_INFO_READ, + SPA_PARAM_INFO_SERIAL, SPA_PORT_FLAG, SPA_PORT_FLAG_CAN_ALLOC_BUFFERS, }, }, utils::{ @@ -41,6 +42,7 @@ use { mem, ops::Deref, rc::Rc, + sync::atomic::Ordering::{Relaxed, Release}, }, thiserror::Error, uapi::OwnedFd, @@ -181,8 +183,8 @@ pub struct PwClientNode { } pub struct PwNodeActivation { - pub _activation: Rc>, - pub _fd: Rc, + pub activation: Rc>, + pub fd: Rc, } // pub struct PwNodeBuffer { @@ -766,8 +768,8 @@ impl PwClientNode { self.activations.set( node, Rc::new(PwNodeActivation { - _activation: typed, - _fd: signalfd, + activation: typed, + fd: signalfd, }), ); } else { @@ -812,27 +814,29 @@ impl PwClientNode { ) { let mut buf = TypedBuf::::new(); loop { - // unsafe { - // log::info!("transport = {:#?}", activation.read()); - // } - // log::info!("transport in"); - // for port in self.ports.lock().values() { - // for io in port.io_buffers.lock().values() { - // unsafe { - // log::info!("status = {:?}", io.read().status); - // } - // } - // } - // unsafe { - // log::info!("state = {:#?}", activation.read().state[0]); - // } if let Err(e) = self.con.ring.read(&fd, buf.buf()).await { log::error!("Could not read from eventfd: {}", ErrorFmt(e)); return; } - let n = buf.t(); - if n > 1 { - log::warn!("Missed {} transport changes", n - 1); + if let Some(activation) = self.activation.get() { + let activation = unsafe { activation.read() }; + activation + .status + .store(PW_NODE_ACTIVATION_FINISHED.0, Relaxed); + } + } + } + + pub fn drive(&self) { + for activation in self.activations.lock().values() { + let a = unsafe { activation.activation.read() }; + let required = a.state[0].required.load(Relaxed); + a.state[0].pending.store(required - 1, Relaxed); + if required == 1 { + a.status.store(PW_NODE_ACTIVATION_TRIGGERED.0, Release); + let _ = uapi::eventfd_write(activation.fd.raw(), 1); + } else { + a.status.store(PW_NODE_ACTIVATION_NOT_TRIGGERED.0, Release); } } } diff --git a/src/pipewire/pw_pod.rs b/src/pipewire/pw_pod.rs index a67d5613..bd720436 100644 --- a/src/pipewire/pw_pod.rs +++ b/src/pipewire/pw_pod.rs @@ -7,7 +7,7 @@ use { bstr::BStr, std::{ fmt::{Debug, Formatter}, - sync::atomic::AtomicU32, + sync::atomic::{AtomicI32, AtomicU32}, }, uapi::{c, Pod}, }; @@ -1344,8 +1344,8 @@ pub struct spa_io_position { #[derive(Debug)] pub struct pw_node_activation_state { pub status: c::c_int, - pub required: i32, - pub pending: i32, + pub required: AtomicI32, + pub pending: AtomicI32, } ty! { @@ -1368,7 +1368,7 @@ ty! { #[repr(C)] #[derive(Debug)] pub struct pw_node_activation { - pub status: PW_NODE_ACTIVATION, + pub status: AtomicU32, pub flags: c::c_uint, diff --git a/src/portal/ptl_screencast.rs b/src/portal/ptl_screencast.rs index 6b096cf1..78f158c0 100644 --- a/src/portal/ptl_screencast.rs +++ b/src/portal/ptl_screencast.rs @@ -454,9 +454,7 @@ impl UsrJayScreencastOwner for StartedScreencast { } io.buffer_id.store(ev.idx, Relaxed); io.status.store(SPA_STATUS_HAVE_DATA.0, Release); - if let Some(wfd) = self.port.node.transport_out.get() { - let _ = uapi::eventfd_write(wfd.raw(), 1); - } + self.port.node.drive(); } fn destroyed(&self) {