diff --git a/src/cap_portal.rs b/src/cap_portal.rs index bb15feb..bc22966 100644 --- a/src/cap_portal.rs +++ b/src/cap_portal.rs @@ -73,6 +73,7 @@ pub struct CapPortal { event_rx: Receiver, pw_thread: Option>, rt: Runtime, + pw_dropped: Arc, } /// PipeWire 捕获线程的上下文数据 @@ -149,6 +150,7 @@ impl CapPortal { event_rx, pw_thread: Some(pw_thread), rt, + pw_dropped, }) } @@ -160,6 +162,16 @@ impl CapPortal { &self.event_rx } + /// Returns the total number of PipeWire frames dropped due to channel backlog. + pub fn dropped_count(&self) -> u64 { + self.pw_dropped.load(Ordering::Relaxed) + } + + /// Returns the number of frames currently waiting in the capture channel. + pub fn capture_queue_depth(&self) -> usize { + self.frame_rx.len() + } + /// 通过 XDG Desktop Portal 建立屏幕录制会话 /// /// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。 @@ -476,7 +488,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { let mainloop = match pw::main_loop::MainLoopBox::new(None) { Ok(ml) => ml, Err(e) => { - let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))); + if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))) { + tracing::error!("MainLoop::new failed and error channel also failed: {e}"); + } return; } }; @@ -484,7 +498,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { let context = match pw::context::ContextBox::new(mainloop.loop_(), None) { Ok(c) => c, Err(e) => { - let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))); + if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))) { + tracing::error!("Context::new failed and error channel also failed: {e}"); + } return; } }; @@ -492,7 +508,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { let core = match context.connect_fd(pw_fd, None) { Ok(c) => c, Err(e) => { - let _ = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}"))); + if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}"))) { + tracing::error!("connect_fd failed and error channel also failed: {e}"); + } return; } }; @@ -514,7 +532,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { ) { Ok(s) => s, Err(e) => { - let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))); + if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))) { + tracing::error!("Stream::new failed and error channel also failed: {e}"); + } return; } }; @@ -584,12 +604,14 @@ fn pipewire_thread(ctx: PwThreadCtx) { let raw_buf = unsafe { stream.dequeue_raw_buffer() }; if raw_buf.is_null() { + tracing::trace!("process: null raw_buf"); return; } // 获取 SPA buffer 结构体,包含数据数组、元数据等 let spa_buf = unsafe { (*raw_buf).buffer }; if spa_buf.is_null() { + tracing::trace!("process: null spa_buf"); unsafe { stream.queue_raw_buffer(raw_buf) }; return; } @@ -599,6 +621,7 @@ fn pipewire_thread(ctx: PwThreadCtx) { let n_datas = unsafe { (*spa_buf).n_datas }; let datas_ptr = unsafe { (*spa_buf).datas }; if n_datas == 0 || datas_ptr.is_null() { + tracing::trace!("process: no data (n_datas={n_datas})"); unsafe { stream.queue_raw_buffer(raw_buf) }; return; } @@ -609,11 +632,13 @@ fn pipewire_thread(ctx: PwThreadCtx) { unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) }; let fd = data_ref.fd(); if fd < 0 { + tracing::trace!("process: invalid fd={fd}"); unsafe { stream.queue_raw_buffer(raw_buf) }; return; } if data_ref.as_raw().chunk.is_null() { + tracing::trace!("process: null chunk"); unsafe { stream.queue_raw_buffer(raw_buf) }; return; } @@ -651,6 +676,7 @@ fn pipewire_thread(ctx: PwThreadCtx) { return; }; if width == 0 || height == 0 || format == 0 { + tracing::trace!("process: invalid dimensions {width}x{height} format={format}"); unsafe { stream.queue_raw_buffer(raw_buf) }; return; } @@ -695,7 +721,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, &mut params, ) { - let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))); + if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))) { + tracing::error!("stream.connect failed and error channel also failed: {e}"); + } return; } diff --git a/src/state.rs b/src/state.rs index 59ddd2d..62de184 100644 --- a/src/state.rs +++ b/src/state.rs @@ -46,6 +46,7 @@ use crate::args::Args; use crate::avhw::{AvHwDevCtx, EncState, SwEncState}; use crate::cap_wlr_screencopy::CapWlrScreencopy; use crate::fps_limit::FpsLimit; +use crate::stats::{FrameTimings, PipelineStats}; use crate::transform::{transpose_if_transform_transposed, Transform}; use crate::webrtc::WebRtcState; @@ -213,6 +214,9 @@ pub struct State { pub stage: EncConstructionStage, pub in_flight_surface: InFlightSurface, pub starting_timestamp: Option, + pub stats_start_time: Option, + pub stats_last_time: Option, + pub stats_frames: u64, pub first_frame: bool, pub args: Args, pub errored: bool, @@ -226,6 +230,7 @@ pub struct State { webrtc_rx: Option>>, webrtc_frames_sent: u64, webrtc_paused: Option>, + stats: PipelineStats, } // --------------------------------------------------------------------------- @@ -302,6 +307,9 @@ impl State { }, in_flight_surface: InFlightSurface::None, starting_timestamp: None, + stats_start_time: None, + stats_last_time: None, + stats_frames: 0, first_frame: true, fps_limit: FpsLimit::new(fps), args, @@ -315,6 +323,7 @@ impl State { webrtc_rx, webrtc_frames_sent: 0, webrtc_paused, + stats: PipelineStats::new(), }; // registry_queue_init consumes registry events internally during its @@ -492,7 +501,7 @@ impl State { // is a freshly allocated empty Video frame. let ret = unsafe { ffi::av_hwframe_get_buffer(frames_rgb_ctx, surface.as_mut_ptr(), 0) }; if ret < 0 { - tracing::error!("av_hwframe_get_buffer failed: error {}", ret); + tracing::error!("av_hwframe_get_buffer failed: {}", crate::avhw::ff_err(ret)); self.errored = true; return; } @@ -505,7 +514,7 @@ impl State { } let ret = unsafe { ffi::av_hwframe_map(map_frame.as_mut_ptr(), surface.as_ptr(), 0) }; if ret < 0 { - tracing::error!("av_hwframe_map failed: error {}", ret); + tracing::error!("av_hwframe_map failed: {}", crate::avhw::ff_err(ret)); self.errored = true; return; } @@ -530,7 +539,7 @@ impl State { // takes ownership of the fd, and the original fd is owned by map_frame. let fd_dup = unsafe { libc::dup(obj.fd) }; if fd_dup < 0 { - tracing::error!("failed to dup dma-buf fd"); + tracing::error!("failed to dup dma-buf fd: {}", std::io::Error::last_os_error()); // wayland-client does not auto-destroy params on Drop. params.destroy(); self.errored = true; @@ -574,6 +583,8 @@ impl State { where S::Frame: Default, { + self.stats.record_capture(); + let (mut surface, _drm_map, frame, buffer) = match mem::replace(&mut self.in_flight_surface, InFlightSurface::None) { InFlightSurface::CopyQueued { @@ -614,10 +625,29 @@ impl State { .is_some() }; if should_encode { + let encode_start = Instant::now(); if let Err(e) = enc.encode_frame(&surface) { tracing::error!("encode_frame failed: {}", e); self.errored = true; } + let encode_elapsed = encode_start.elapsed().as_micros() as u64; + self.stats.record_encode(&FrameTimings { + total_us: encode_elapsed, + ..Default::default() + }); + } + self.stats_frames += 1; + if let Some(last) = self.stats_last_time { + if last.elapsed() >= std::time::Duration::from_secs(10) { + let delta = self.stats_frames; + let fps = delta as f64 / last.elapsed().as_secs_f64(); + tracing::info!(frames = self.stats_frames, fps = format!("{fps:.1}"), "encoding stats"); + self.stats_last_time = Some(std::time::Instant::now()); + self.stats_frames = 0; + } + } else { + self.stats_start_time = Some(std::time::Instant::now()); + self.stats_last_time = Some(std::time::Instant::now()); } } @@ -670,13 +700,23 @@ impl State { if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) { tracing::debug!("WebRTC write frame error: {e}"); } + self.stats.record_send(0.0, None); self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1); } if count > 0 { - tracing::info!("WebRTC forwarded {count} frames from channel"); + tracing::debug!("WebRTC forwarded {count} frames from channel"); } } + if self.args.stats && self.stats.should_snapshot() { + self.stats.set_queue_depths( + 0, + self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0), + ); + let snap = self.stats.snapshot_and_reset(); + tracing::info!("stats: {snap}"); + } + Ok(()) } @@ -995,8 +1035,7 @@ impl Dispatch for State { qhandle: &QueueHandle>, ) { use wayland_client::protocol::wl_registry::Event as RegistryEvent; - tracing::debug!("Dispatch::event fired: {:?}", event); - + match event { RegistryEvent::Global { name, diff --git a/src/webrtc.rs b/src/webrtc.rs index d52a4b5..6f8ba18 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -19,11 +19,13 @@ const HTML_PAGE: &str = r#" video{max-width:90vw;max-height:80vh;border:1px solid #333} #status{margin:12px;font-size:14px;color:#aaa} #debug{position:fixed;bottom:8px;left:8px;font-size:11px;color:#666;max-width:90vw;white-space:pre-wrap} +#stats-panel{position:fixed;top:8px;right:8px;background:rgba(0,0,0,0.7);color:#0f0;font:11px monospace;padding:6px 10px;border-radius:4px;z-index:100;pointer-events:none;max-width:90vw;white-space:pre;line-height:1.5}
Connecting...

+