fix(portal): compositor stall detection + filler frames + PipeWire state logging

P0: Detect compositor frame delivery stalls (>100ms no frames) and log
    stall/resume events with duration. Rate-limited to 1 warn/sec.

P1: Insert duplicate raw CpuNv12Frame filler during stalls at target fps.
    Keeps WebRTC stream smooth (sent_fps 20-40 instead of 3-5 during
    compositor pauses). Stops after 2s max stale. WebRTC mode only.

P2: Replace silent _ => {} in PipeWire state_changed callback with
    explicit Paused/Streaming/Connecting log messages.

P4: Add PwCtrlEvent::FormatChanged for mid-stream dimension changes.
    param_changed detects resolution renegotiation (skips first call).
    Logs warning in poll_and_encode; full encoder reinit deferred.

Verified: cargo check 0 errors, 70/70 tests, release build, --stats live.
This commit is contained in:
dailz
2026-06-07 17:20:54 +08:00
parent 826f544569
commit caccfec44e
4 changed files with 163 additions and 5 deletions

View File

@@ -66,6 +66,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result<wl_webrtc::cap_portal::PwDmaBu
if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"),
PwCtrlEvent::FormatChanged { .. } => {}
PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"),
}
}
@@ -328,6 +329,7 @@ fn main() -> Result<()> {
eprintln!("PipeWire error after {} frames: {}", frames_encoded, e);
break;
}
PwCtrlEvent::FormatChanged { .. } => {}
}
}

View File

@@ -138,6 +138,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result<wl_webrtc::cap_portal::PwDmaBu
if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"),
PwCtrlEvent::FormatChanged { .. } => {}
PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"),
}
}
@@ -519,6 +520,7 @@ fn run_cpu_pipeline(
"PipeWire error after {} CPU frames: {e}",
stats.frames_encoded
),
PwCtrlEvent::FormatChanged { .. } => {}
}
}
@@ -659,6 +661,7 @@ fn run_gpu_pipeline(
"PipeWire error after {} GPU frames: {e}",
stats.frames_encoded
),
PwCtrlEvent::FormatChanged { .. } => {}
}
}

View File

@@ -54,6 +54,8 @@ pub struct PwDmaBufFrame {
pub enum PwCtrlEvent {
/// 流已结束PipeWire 流断开连接或进入错误状态)
StreamEnded,
/// Format/dimensions changed mid-stream
FormatChanged { width: u32, height: u32 },
/// 发生错误,包含错误描述信息
Error(String),
}
@@ -554,7 +556,13 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pw::stream::StreamState::Unconnected => {
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
}
_ => {}
pw::stream::StreamState::Paused => {
tracing::warn!("PipeWire stream paused (compositor may be switching content)");
}
pw::stream::StreamState::Streaming => {
tracing::info!("PipeWire stream (re)started");
}
pw::stream::StreamState::Connecting => {}
}
})
// 参数变化回调(格式协商)
@@ -562,6 +570,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
// id 为参数类型param 包含具体的格式参数(分辨率、像素格式等)
.param_changed({
let format_info = format_info.clone();
let event_tx = event_tx.clone();
move |_, _, id, param| {
// 仅处理 Format 类型的参数变化
let Some(param) = param else { return };
@@ -583,7 +592,18 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let framerate = info.framerate();
let max_framerate = info.max_framerate();
// 保存协商后的格式信息,供 process 回调读取
let previous_format = format_info.get();
format_info.set(Some((width, height, drm_format, modifier)));
if let Some((previous_width, previous_height, _, _)) = previous_format {
if width != previous_width || height != previous_height {
tracing::warn!(
"PipeWire dimensions changed: {}x{} (format renegotiation)",
width,
height
);
let _ = event_tx.try_send(PwCtrlEvent::FormatChanged { width, height });
}
}
tracing::info!(
"PipeWire format negotiated: {width}x{height}, \
drm_format={drm_format:#010x}, modifier={modifier:#x}, \

View File

@@ -3,7 +3,7 @@ use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use anyhow::{bail, Result}; // 错误处理工具
@@ -54,6 +54,12 @@ pub struct StatePortal {
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
webrtc_frames_sent: u64,
webrtc_paused: Option<Arc<AtomicBool>>,
last_capture_arrival: Option<Instant>, // timestamp of last real frame arrival
stall_start: Option<Instant>, // when current stall began
last_stall_log: Option<Instant>, // rate-limiting for stall warnings
last_fillable_frame: Option<CpuNv12Frame>, // cached last frame for filler duplication
next_filler_at: Option<Instant>, // when to send next filler frame
filler_frames_sent: u64,
}
impl StatePortal {
@@ -95,6 +101,12 @@ impl StatePortal {
webrtc_rx: None,
webrtc_frames_sent: 0,
webrtc_paused,
last_capture_arrival: None,
stall_start: None,
last_stall_log: None,
last_fillable_frame: None,
next_filler_at: None,
filler_frames_sent: 0,
})
}
@@ -121,6 +133,15 @@ impl StatePortal {
self.errored = true;
return Ok(true);
}
PwCtrlEvent::FormatChanged { width, height } => {
tracing::warn!(
"PipeWire format renegotiation: new dimensions {}x{} — encoder output remains at original resolution",
width,
height
);
// No action yet — VAAPI import/scale handles the conversion.
// Full encoder reinit is a future enhancement.
}
}
}
@@ -129,15 +150,22 @@ impl StatePortal {
// 阻塞模式:最多等待 10ms 接收帧
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) {
Ok(frame) => frame,
Err(_) => return Ok(false),
Err(_) => {
self.record_capture_timeout();
return Ok(false);
}
}
} else {
// 非阻塞模式:立即尝试接收,无数据则返回
match self.cap.frame_receiver().try_recv() {
Ok(frame) => frame,
Err(_) => return Ok(false),
Err(_) => {
self.record_capture_timeout();
return Ok(false);
}
}
};
self.record_frame_arrival();
match self.stage {
PortalStage::WaitingForFormat => {
@@ -231,6 +259,7 @@ impl StatePortal {
PortalStage::Streaming => {
// 记录采集帧到达(用于 capture gap 和 capture_fps 统计)
self.stats.record_capture();
self.last_capture_arrival = Some(Instant::now());
// 流式编码阶段:直接处理帧
self.handle_pw_frame(frame)?;
}
@@ -247,12 +276,107 @@ impl StatePortal {
let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0);
self.stats.set_queue_depths(0, enc_q);
let snap = self.stats.snapshot_and_reset();
tracing::info!("stats: {snap}");
if self.filler_frames_sent > 0 {
tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent);
} else {
tracing::info!("stats: {snap}");
}
}
Ok(true)
}
fn record_capture_timeout(&mut self) {
let Some(last_capture_arrival) = self.last_capture_arrival else {
return;
};
let now = Instant::now();
let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1)));
let stall_threshold = Duration::from_millis(100).max(frame_interval * 3);
if now.duration_since(last_capture_arrival) <= stall_threshold {
return;
}
if self.stall_start.is_none() {
self.stall_start = Some(now);
self.last_stall_log = Some(now);
tracing::warn!("compositor frame delivery stalled");
} else {
let should_log = self
.last_stall_log
.map_or(true, |last_log| now.duration_since(last_log) >= Duration::from_secs(1));
if should_log {
self.last_stall_log = Some(now);
tracing::warn!("compositor frame delivery stalled");
}
}
self.maybe_send_filler_frame();
}
fn maybe_send_filler_frame(&mut self) {
if self.webrtc.is_none() || self.stall_start.is_none() {
return;
}
let Some(cached) = &self.last_fillable_frame else {
return;
};
const MAX_FILLER_DURATION: Duration = Duration::from_secs(2);
if let Some(stall_start) = self.stall_start {
if stall_start.elapsed() > MAX_FILLER_DURATION {
return;
}
}
let now = Instant::now();
let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1)));
let Some(next) = self.next_filler_at else {
self.next_filler_at = Some(now + frame_interval);
return;
};
if now < next {
return;
}
let filler = CpuNv12Frame {
y_data: cached.y_data.clone(),
uv_data: cached.uv_data.clone(),
y_stride: cached.y_stride,
uv_stride: cached.uv_stride,
pts: self.frames_encoded as i64,
};
if let Some(enc_thread) = &self.enc_thread {
match enc_thread.input_tx.try_send(filler) {
Ok(()) => {
self.frames_encoded += 1;
self.filler_frames_sent += 1;
self.next_filler_at = Some(next + frame_interval);
}
Err(crossbeam_channel::TrySendError::Full(_)) => {}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
tracing::error!("Encode thread disconnected during filler");
self.errored = true;
}
}
}
}
fn record_frame_arrival(&mut self) {
if let Some(stall_start) = self.stall_start.take() {
tracing::info!(
"compositor frame delivery resumed after {:.0}ms",
stall_start.elapsed().as_secs_f64() * 1000.0
);
self.last_stall_log = None;
}
self.last_capture_arrival = Some(Instant::now());
self.next_filler_at = None;
}
/// 为当前帧解析可用的 DRM 渲染设备
///
/// 如果用户已通过 `--drm-device` 指定设备,直接返回;
@@ -373,9 +497,17 @@ impl StatePortal {
let enc_thread = self.enc_thread.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: encode thread missing while async import is active"))?;
let fillable_frame = CpuNv12Frame {
y_data: cpu_nv12.y_data.clone(),
uv_data: cpu_nv12.uv_data.clone(),
y_stride: cpu_nv12.y_stride,
uv_stride: cpu_nv12.uv_stride,
pts: 0,
};
match enc_thread.input_tx.try_send(cpu_nv12) {
Ok(()) => {
self.frames_encoded += 1;
self.last_fillable_frame = Some(fillable_frame);
}
Err(crossbeam_channel::TrySendError::Full(_frame)) => {
tracing::warn!("Encode thread input full, dropping portal frame");
@@ -396,6 +528,7 @@ impl StatePortal {
///
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
pub fn shutdown(&mut self) {
self.last_fillable_frame = None;
// 先 drop receiver使 flush() 中的 try_send() 立即返回 Disconnected
// 而非在满通道上阻塞(修复 issue #8 死锁)
self.webrtc_rx = None;