fix(portal): replace blocking send with try_send in PW process callback
PipeWire .process callback called frame_tx.send() on a bounded(3) channel. If the encoder stalled, this blocked the PipeWire data loop, delaying buffer recycling and potentially causing XRUNs. Replace with try_send + AtomicU64 drop counter. Frames are silently dropped when the channel is full (preferred for screen capture: latest frame wins). A warning is logged every 30 dropped frames. Fixes #2 from BUGS.md.
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
|
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::thread::{self, JoinHandle};
|
use std::thread::{self, JoinHandle};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
@@ -33,6 +34,7 @@ pub struct CapPortal {
|
|||||||
|
|
||||||
struct PwThreadCtx {
|
struct PwThreadCtx {
|
||||||
frame_tx: Sender<PwEvent>,
|
frame_tx: Sender<PwEvent>,
|
||||||
|
dropped: AtomicU64,
|
||||||
shutdown_read: OwnedFd,
|
shutdown_read: OwnedFd,
|
||||||
pw_fd: OwnedFd,
|
pw_fd: OwnedFd,
|
||||||
node_id: u32,
|
node_id: u32,
|
||||||
@@ -69,6 +71,7 @@ impl CapPortal {
|
|||||||
|
|
||||||
let ctx = PwThreadCtx {
|
let ctx = PwThreadCtx {
|
||||||
frame_tx,
|
frame_tx,
|
||||||
|
dropped: AtomicU64::new(0),
|
||||||
shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) },
|
shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) },
|
||||||
pw_fd,
|
pw_fd,
|
||||||
node_id,
|
node_id,
|
||||||
@@ -178,6 +181,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
|
|||||||
|
|
||||||
let PwThreadCtx {
|
let PwThreadCtx {
|
||||||
frame_tx,
|
frame_tx,
|
||||||
|
dropped,
|
||||||
shutdown_read,
|
shutdown_read,
|
||||||
pw_fd,
|
pw_fd,
|
||||||
node_id,
|
node_id,
|
||||||
@@ -269,6 +273,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
|
|||||||
.process({
|
.process({
|
||||||
let format_info = format_info.clone();
|
let format_info = format_info.clone();
|
||||||
let frame_tx = frame_tx.clone();
|
let frame_tx = frame_tx.clone();
|
||||||
|
let dropped = dropped;
|
||||||
move |stream, _| {
|
move |stream, _| {
|
||||||
let raw_buf = unsafe { stream.dequeue_raw_buffer() };
|
let raw_buf = unsafe { stream.dequeue_raw_buffer() };
|
||||||
if raw_buf.is_null() {
|
if raw_buf.is_null() {
|
||||||
@@ -341,7 +346,14 @@ fn pipewire_thread(ctx: PwThreadCtx) {
|
|||||||
pts,
|
pts,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = frame_tx.send(PwEvent::Frame(frame));
|
if let Err(crossbeam_channel::TrySendError::Full(_)) =
|
||||||
|
frame_tx.try_send(PwEvent::Frame(frame))
|
||||||
|
{
|
||||||
|
let prev = dropped.fetch_add(1, Ordering::Relaxed);
|
||||||
|
if prev > 0 && prev % 30 == 0 {
|
||||||
|
tracing::warn!("dropped {prev} frames total: encoder backlog");
|
||||||
|
}
|
||||||
|
}
|
||||||
unsafe { stream.queue_raw_buffer(raw_buf) };
|
unsafe { stream.queue_raw_buffer(raw_buf) };
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user