From caccfec44e43696703bc1a3a07c959afdb646890 Mon Sep 17 00:00:00 2001 From: dailz Date: Sun, 7 Jun 2026 17:20:54 +0800 Subject: [PATCH] fix(portal): compositor stall detection + filler frames + PipeWire state logging P0: Detect compositor frame delivery stalls (>100ms no frames) and log stall/resume events with duration. Rate-limited to 1 warn/sec. P1: Insert duplicate raw CpuNv12Frame filler during stalls at target fps. Keeps WebRTC stream smooth (sent_fps 20-40 instead of 3-5 during compositor pauses). Stops after 2s max stale. WebRTC mode only. P2: Replace silent _ => {} in PipeWire state_changed callback with explicit Paused/Streaming/Connecting log messages. P4: Add PwCtrlEvent::FormatChanged for mid-stream dimension changes. param_changed detects resolution renegotiation (skips first call). Logs warning in poll_and_encode; full encoder reinit deferred. Verified: cargo check 0 errors, 70/70 tests, release build, --stats live. --- src/bin/sw_encode_bench.rs | 2 + src/bin/vaapi_import_bench.rs | 3 + src/cap_portal.rs | 22 +++++- src/state_portal.rs | 141 +++++++++++++++++++++++++++++++++- 4 files changed, 163 insertions(+), 5 deletions(-) diff --git a/src/bin/sw_encode_bench.rs b/src/bin/sw_encode_bench.rs index 46d010d..bb44fc4 100644 --- a/src/bin/sw_encode_bench.rs +++ b/src/bin/sw_encode_bench.rs @@ -66,6 +66,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result bail!("PipeWire stream ended before first frame"), + PwCtrlEvent::FormatChanged { .. } => {} PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"), } } @@ -328,6 +329,7 @@ fn main() -> Result<()> { eprintln!("PipeWire error after {} frames: {}", frames_encoded, e); break; } + PwCtrlEvent::FormatChanged { .. } => {} } } diff --git a/src/bin/vaapi_import_bench.rs b/src/bin/vaapi_import_bench.rs index 91272bb..aac77a5 100644 --- a/src/bin/vaapi_import_bench.rs +++ b/src/bin/vaapi_import_bench.rs @@ -138,6 +138,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result bail!("PipeWire stream ended before first frame"), + PwCtrlEvent::FormatChanged { .. } => {} PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"), } } @@ -519,6 +520,7 @@ fn run_cpu_pipeline( "PipeWire error after {} CPU frames: {e}", stats.frames_encoded ), + PwCtrlEvent::FormatChanged { .. } => {} } } @@ -659,6 +661,7 @@ fn run_gpu_pipeline( "PipeWire error after {} GPU frames: {e}", stats.frames_encoded ), + PwCtrlEvent::FormatChanged { .. } => {} } } diff --git a/src/cap_portal.rs b/src/cap_portal.rs index bc22966..194859b 100644 --- a/src/cap_portal.rs +++ b/src/cap_portal.rs @@ -54,6 +54,8 @@ pub struct PwDmaBufFrame { pub enum PwCtrlEvent { /// 流已结束(PipeWire 流断开连接或进入错误状态) StreamEnded, + /// Format/dimensions changed mid-stream + FormatChanged { width: u32, height: u32 }, /// 发生错误,包含错误描述信息 Error(String), } @@ -554,7 +556,13 @@ fn pipewire_thread(ctx: PwThreadCtx) { pw::stream::StreamState::Unconnected => { let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded); } - _ => {} + pw::stream::StreamState::Paused => { + tracing::warn!("PipeWire stream paused (compositor may be switching content)"); + } + pw::stream::StreamState::Streaming => { + tracing::info!("PipeWire stream (re)started"); + } + pw::stream::StreamState::Connecting => {} } }) // 参数变化回调(格式协商) @@ -562,6 +570,7 @@ fn pipewire_thread(ctx: PwThreadCtx) { // id 为参数类型,param 包含具体的格式参数(分辨率、像素格式等) .param_changed({ let format_info = format_info.clone(); + let event_tx = event_tx.clone(); move |_, _, id, param| { // 仅处理 Format 类型的参数变化 let Some(param) = param else { return }; @@ -583,7 +592,18 @@ fn pipewire_thread(ctx: PwThreadCtx) { let framerate = info.framerate(); let max_framerate = info.max_framerate(); // 保存协商后的格式信息,供 process 回调读取 + let previous_format = format_info.get(); format_info.set(Some((width, height, drm_format, modifier))); + if let Some((previous_width, previous_height, _, _)) = previous_format { + if width != previous_width || height != previous_height { + tracing::warn!( + "PipeWire dimensions changed: {}x{} (format renegotiation)", + width, + height + ); + let _ = event_tx.try_send(PwCtrlEvent::FormatChanged { width, height }); + } + } tracing::info!( "PipeWire format negotiated: {width}x{height}, \ drm_format={drm_format:#010x}, modifier={modifier:#x}, \ diff --git a/src/state_portal.rs b/src/state_portal.rs index 409ab37..1f9f0f7 100644 --- a/src/state_portal.rs +++ b/src/state_portal.rs @@ -3,7 +3,7 @@ use std::os::fd::AsRawFd; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{bail, Result}; // 错误处理工具 @@ -54,6 +54,12 @@ pub struct StatePortal { webrtc_rx: Option>>, webrtc_frames_sent: u64, webrtc_paused: Option>, + last_capture_arrival: Option, // timestamp of last real frame arrival + stall_start: Option, // when current stall began + last_stall_log: Option, // rate-limiting for stall warnings + last_fillable_frame: Option, // cached last frame for filler duplication + next_filler_at: Option, // when to send next filler frame + filler_frames_sent: u64, } impl StatePortal { @@ -95,6 +101,12 @@ impl StatePortal { webrtc_rx: None, webrtc_frames_sent: 0, webrtc_paused, + last_capture_arrival: None, + stall_start: None, + last_stall_log: None, + last_fillable_frame: None, + next_filler_at: None, + filler_frames_sent: 0, }) } @@ -121,6 +133,15 @@ impl StatePortal { self.errored = true; return Ok(true); } + PwCtrlEvent::FormatChanged { width, height } => { + tracing::warn!( + "PipeWire format renegotiation: new dimensions {}x{} — encoder output remains at original resolution", + width, + height + ); + // No action yet — VAAPI import/scale handles the conversion. + // Full encoder reinit is a future enhancement. + } } } @@ -129,15 +150,22 @@ impl StatePortal { // 阻塞模式:最多等待 10ms 接收帧 match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) { Ok(frame) => frame, - Err(_) => return Ok(false), + Err(_) => { + self.record_capture_timeout(); + return Ok(false); + } } } else { // 非阻塞模式:立即尝试接收,无数据则返回 match self.cap.frame_receiver().try_recv() { Ok(frame) => frame, - Err(_) => return Ok(false), + Err(_) => { + self.record_capture_timeout(); + return Ok(false); + } } }; + self.record_frame_arrival(); match self.stage { PortalStage::WaitingForFormat => { @@ -231,6 +259,7 @@ impl StatePortal { PortalStage::Streaming => { // 记录采集帧到达(用于 capture gap 和 capture_fps 统计) self.stats.record_capture(); + self.last_capture_arrival = Some(Instant::now()); // 流式编码阶段:直接处理帧 self.handle_pw_frame(frame)?; } @@ -247,12 +276,107 @@ impl StatePortal { let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0); self.stats.set_queue_depths(0, enc_q); let snap = self.stats.snapshot_and_reset(); - tracing::info!("stats: {snap}"); + if self.filler_frames_sent > 0 { + tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent); + } else { + tracing::info!("stats: {snap}"); + } } Ok(true) } + fn record_capture_timeout(&mut self) { + let Some(last_capture_arrival) = self.last_capture_arrival else { + return; + }; + + let now = Instant::now(); + let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1))); + let stall_threshold = Duration::from_millis(100).max(frame_interval * 3); + if now.duration_since(last_capture_arrival) <= stall_threshold { + return; + } + + if self.stall_start.is_none() { + self.stall_start = Some(now); + self.last_stall_log = Some(now); + tracing::warn!("compositor frame delivery stalled"); + } else { + let should_log = self + .last_stall_log + .map_or(true, |last_log| now.duration_since(last_log) >= Duration::from_secs(1)); + if should_log { + self.last_stall_log = Some(now); + tracing::warn!("compositor frame delivery stalled"); + } + } + + self.maybe_send_filler_frame(); + } + + fn maybe_send_filler_frame(&mut self) { + if self.webrtc.is_none() || self.stall_start.is_none() { + return; + } + let Some(cached) = &self.last_fillable_frame else { + return; + }; + + const MAX_FILLER_DURATION: Duration = Duration::from_secs(2); + if let Some(stall_start) = self.stall_start { + if stall_start.elapsed() > MAX_FILLER_DURATION { + return; + } + } + + let now = Instant::now(); + let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1))); + + let Some(next) = self.next_filler_at else { + self.next_filler_at = Some(now + frame_interval); + return; + }; + if now < next { + return; + } + + let filler = CpuNv12Frame { + y_data: cached.y_data.clone(), + uv_data: cached.uv_data.clone(), + y_stride: cached.y_stride, + uv_stride: cached.uv_stride, + pts: self.frames_encoded as i64, + }; + + if let Some(enc_thread) = &self.enc_thread { + match enc_thread.input_tx.try_send(filler) { + Ok(()) => { + self.frames_encoded += 1; + self.filler_frames_sent += 1; + self.next_filler_at = Some(next + frame_interval); + } + Err(crossbeam_channel::TrySendError::Full(_)) => {} + Err(crossbeam_channel::TrySendError::Disconnected(_)) => { + tracing::error!("Encode thread disconnected during filler"); + self.errored = true; + } + } + } + } + + fn record_frame_arrival(&mut self) { + if let Some(stall_start) = self.stall_start.take() { + tracing::info!( + "compositor frame delivery resumed after {:.0}ms", + stall_start.elapsed().as_secs_f64() * 1000.0 + ); + self.last_stall_log = None; + } + self.last_capture_arrival = Some(Instant::now()); + self.next_filler_at = None; + } + /// 为当前帧解析可用的 DRM 渲染设备 /// /// 如果用户已通过 `--drm-device` 指定设备,直接返回; @@ -373,9 +497,17 @@ impl StatePortal { let enc_thread = self.enc_thread.as_ref() .ok_or_else(|| anyhow::anyhow!("internal invariant broken: encode thread missing while async import is active"))?; + let fillable_frame = CpuNv12Frame { + y_data: cpu_nv12.y_data.clone(), + uv_data: cpu_nv12.uv_data.clone(), + y_stride: cpu_nv12.y_stride, + uv_stride: cpu_nv12.uv_stride, + pts: 0, + }; match enc_thread.input_tx.try_send(cpu_nv12) { Ok(()) => { self.frames_encoded += 1; + self.last_fillable_frame = Some(fillable_frame); } Err(crossbeam_channel::TrySendError::Full(_frame)) => { tracing::warn!("Encode thread input full, dropping portal frame"); @@ -396,6 +528,7 @@ impl StatePortal { /// /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 pub fn shutdown(&mut self) { + self.last_fillable_frame = None; // 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected // 而非在满通道上阻塞(修复 issue #8 死锁) self.webrtc_rx = None;