diff --git a/.gitignore b/.gitignore index 3d10387..e95279e 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,7 @@ Thumbs.db # Sisyphus orchestration artifacts .sisyphus/ +.omo/ +.playwright-mcp/ +wl-webrtc.log +webrtc-p0-success.png diff --git a/src/args.rs b/src/args.rs index 57275b7..404369b 100644 --- a/src/args.rs +++ b/src/args.rs @@ -50,4 +50,8 @@ pub struct Args { /// Force re-authorization dialog (ignore saved portal restore token) #[arg(long)] pub no_persist: bool, + + /// Enable per-second pipeline statistics output for stutter diagnosis + #[arg(long)] + pub stats: bool, } diff --git a/src/lib.rs b/src/lib.rs index 22d024d..26513e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod backend_detect; pub mod cap_portal; pub mod cap_wlr_screencopy; pub mod fps_limit; +pub mod stats; pub mod state; pub mod state_portal; pub mod transform; diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 0000000..14dbbad --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,489 @@ +// stats.rs — Lightweight windowed pipeline statistics for stutter diagnosis +// +// Tracks per-second snapshots of capture/encode/send pipeline metrics. +// Designed for low overhead: only counters and timing samples are collected, +// with one structured log line emitted per second when `--stats` is enabled. + +use std::time::Instant; + +/// Per-stage timing for a single encode pipeline frame. +/// +/// All values are in microseconds. The caller records timestamps around +/// each stage and passes the deltas to [`PipelineStats::record_frame`]. +#[derive(Debug, Default)] +pub struct FrameTimings { + /// DMA-BUF import (av_hwframe_map) + pub import_us: u64, + /// GPU scale (scale_vaapi filter) + pub scale_us: u64, + /// GPU→CPU transfer (av_hwframe_transfer_data) + pub transfer_us: u64, + /// sws_scale NV12→YUV420P + pub sws_us: u64, + /// H.264 encode (avcodec_send_frame + receive_packet) + pub encode_us: u64, + /// Wall-clock total for this frame (import through encode output) + pub total_us: u64, + /// Encoded output size in bytes + pub output_bytes: usize, +} + +/// Windowed statistics aggregator for the encode/send pipeline. +/// +/// Collects counters and timing samples within a one-second window, +/// then computes avg/p95/max when the snapshot is taken. +pub struct PipelineStats { + // --- counters (reset each window) --- + capture_frames: u64, + encoded_frames: u64, + sent_frames: u64, + pipewire_dropped: u64, + over_budget_count: u64, + + // --- queue depth at last observation --- + capture_queue_depth: usize, + encoded_queue_depth: usize, + + // --- timing samples --- + capture_gaps_ms: Vec, + encoded_gaps_ms: Vec, + sent_gaps_ms: Vec, + frame_age_ms: Vec, + send_wait_ms: Vec, + + // --- per-stage timing (microseconds) --- + import_us: Vec, + scale_us: Vec, + transfer_us: Vec, + sws_us: Vec, + encode_us: Vec, + total_us: Vec, + output_bytes: Vec, + + // --- timing state --- + last_capture_time: Option, + last_encode_time: Option, + last_send_time: Option, + window_start: Instant, +} + +impl PipelineStats { + pub fn new() -> Self { + Self { + capture_frames: 0, + encoded_frames: 0, + sent_frames: 0, + pipewire_dropped: 0, + over_budget_count: 0, + capture_queue_depth: 0, + encoded_queue_depth: 0, + capture_gaps_ms: Vec::new(), + encoded_gaps_ms: Vec::new(), + sent_gaps_ms: Vec::new(), + frame_age_ms: Vec::new(), + send_wait_ms: Vec::new(), + import_us: Vec::new(), + scale_us: Vec::new(), + transfer_us: Vec::new(), + sws_us: Vec::new(), + encode_us: Vec::new(), + total_us: Vec::new(), + output_bytes: Vec::new(), + last_capture_time: None, + last_encode_time: None, + last_send_time: None, + window_start: Instant::now(), + } + } + + /// Record that a capture frame was received from PipeWire. + pub fn record_capture(&mut self) { + let now = Instant::now(); + if let Some(last) = self.last_capture_time { + let gap_ms = last.elapsed().as_secs_f64() * 1000.0; + self.capture_gaps_ms.push(gap_ms); + } + self.last_capture_time = Some(now); + self.capture_frames += 1; + } + + /// Record that a frame completed encoding with the given timings. + pub fn record_encode(&mut self, timings: &FrameTimings) { + let now = Instant::now(); + if let Some(last) = self.last_encode_time { + let gap_ms = last.elapsed().as_secs_f64() * 1000.0; + self.encoded_gaps_ms.push(gap_ms); + } + self.last_encode_time = Some(now); + self.encoded_frames += 1; + + self.import_us.push(timings.import_us); + self.scale_us.push(timings.scale_us); + self.transfer_us.push(timings.transfer_us); + self.sws_us.push(timings.sws_us); + self.encode_us.push(timings.encode_us); + self.total_us.push(timings.total_us); + self.output_bytes.push(timings.output_bytes); + } + + pub fn record_import(&mut self, import_us: u64) { + self.import_us.push(import_us); + } + + pub fn record_encode_thread(&mut self, sws_us: u64, encode_us: u64, output_bytes: usize) { + let now = Instant::now(); + if let Some(last) = self.last_encode_time { + let gap_ms = last.elapsed().as_secs_f64() * 1000.0; + self.encoded_gaps_ms.push(gap_ms); + } + self.last_encode_time = Some(now); + self.encoded_frames += 1; + + self.sws_us.push(sws_us); + self.encode_us.push(encode_us); + self.total_us.push(sws_us.saturating_add(encode_us)); + self.output_bytes.push(output_bytes); + } + + /// Record that a frame was sent via WebRTC. + /// `wait_ms` is time spent blocked waiting to send into the channel. + /// `capture_time` is when the frame was originally captured (for frame age). + pub fn record_send(&mut self, wait_ms: f64, capture_time: Option) { + let now = Instant::now(); + if let Some(last) = self.last_send_time { + let gap_ms = last.elapsed().as_secs_f64() * 1000.0; + self.sent_gaps_ms.push(gap_ms); + } + self.last_send_time = Some(now); + self.sent_frames += 1; + + if wait_ms > 0.0 { + self.send_wait_ms.push(wait_ms); + } + if let Some(ct) = capture_time { + let age_ms = ct.elapsed().as_secs_f64() * 1000.0; + self.frame_age_ms.push(age_ms); + } + } + + /// Update PipeWire dropped counter (absolute value from AtomicU64). + pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) { + self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped); + } + + /// Update queue depth observations. + pub fn set_queue_depths(&mut self, capture: usize, encoded: usize) { + self.capture_queue_depth = capture; + self.encoded_queue_depth = encoded; + } + + /// Record that a frame exceeded its time budget. + pub fn record_over_budget(&mut self) { + self.over_budget_count += 1; + } + + /// Returns true if at least 1 second has elapsed since the last snapshot + /// (or since creation). If true, call `snapshot_and_reset` to get the stats. + pub fn should_snapshot(&self) -> bool { + self.window_start.elapsed().as_secs() >= 1 + } + + /// Compute a snapshot of the current window and reset all counters. + pub fn snapshot_and_reset(&mut self) -> StatsSnapshot { + let elapsed = self.window_start.elapsed().as_secs_f64(); + let snap = StatsSnapshot { + elapsed_secs: elapsed, + capture_fps: self.capture_frames as f64 / elapsed, + encoded_fps: self.encoded_frames as f64 / elapsed, + sent_fps: self.sent_frames as f64 / elapsed, + capture_frames: self.capture_frames, + encoded_frames: self.encoded_frames, + sent_frames: self.sent_frames, + pipewire_dropped: self.pipewire_dropped, + over_budget_count: self.over_budget_count, + capture_queue_depth: self.capture_queue_depth, + encoded_queue_depth: self.encoded_queue_depth, + capture_gap_avg_ms: avg_f64(&self.capture_gaps_ms), + capture_gap_p95_ms: p95_f64(&self.capture_gaps_ms), + capture_gap_max_ms: max_f64(&self.capture_gaps_ms), + encoded_gap_avg_ms: avg_f64(&self.encoded_gaps_ms), + encoded_gap_p95_ms: p95_f64(&self.encoded_gaps_ms), + encoded_gap_max_ms: max_f64(&self.encoded_gaps_ms), + sent_gap_avg_ms: avg_f64(&self.sent_gaps_ms), + sent_gap_p95_ms: p95_f64(&self.sent_gaps_ms), + sent_gap_max_ms: max_f64(&self.sent_gaps_ms), + frame_age_avg_ms: avg_f64(&self.frame_age_ms), + frame_age_p95_ms: p95_f64(&self.frame_age_ms), + frame_age_max_ms: max_f64(&self.frame_age_ms), + send_wait_p95_ms: p95_f64(&self.send_wait_ms), + import_avg_ms: avg_ms(&self.import_us), + import_p95_ms: p95_ms(&self.import_us), + scale_avg_ms: avg_ms(&self.scale_us), + scale_p95_ms: p95_ms(&self.scale_us), + transfer_avg_ms: avg_ms(&self.transfer_us), + transfer_p95_ms: p95_ms(&self.transfer_us), + sws_avg_ms: avg_ms(&self.sws_us), + sws_p95_ms: p95_ms(&self.sws_us), + encode_avg_ms: avg_ms(&self.encode_us), + encode_p95_ms: p95_ms(&self.encode_us), + total_avg_ms: avg_ms(&self.total_us), + total_p95_ms: p95_ms(&self.total_us), + output_bytes_per_sec: sum_usize(&self.output_bytes) as f64 / elapsed, + output_frame_bytes_p95: p95_usize(&self.output_bytes), + output_frame_bytes_max: max_usize(&self.output_bytes), + }; + + // Reset all counters and sample buffers + self.capture_frames = 0; + self.encoded_frames = 0; + self.sent_frames = 0; + self.pipewire_dropped = 0; + self.over_budget_count = 0; + self.capture_queue_depth = 0; + self.encoded_queue_depth = 0; + self.capture_gaps_ms.clear(); + self.encoded_gaps_ms.clear(); + self.sent_gaps_ms.clear(); + self.frame_age_ms.clear(); + self.send_wait_ms.clear(); + self.import_us.clear(); + self.scale_us.clear(); + self.transfer_us.clear(); + self.sws_us.clear(); + self.encode_us.clear(); + self.total_us.clear(); + self.output_bytes.clear(); + self.window_start = Instant::now(); + + snap + } +} + +/// A one-second snapshot of pipeline statistics. +#[derive(Debug)] +pub struct StatsSnapshot { + pub elapsed_secs: f64, + // FPS + pub capture_fps: f64, + pub encoded_fps: f64, + pub sent_fps: f64, + // Counters + pub capture_frames: u64, + pub encoded_frames: u64, + pub sent_frames: u64, + pub pipewire_dropped: u64, + pub over_budget_count: u64, + // Queue depths + pub capture_queue_depth: usize, + pub encoded_queue_depth: usize, + // Gap timing (ms) + pub capture_gap_avg_ms: f64, + pub capture_gap_p95_ms: f64, + pub capture_gap_max_ms: f64, + pub encoded_gap_avg_ms: f64, + pub encoded_gap_p95_ms: f64, + pub encoded_gap_max_ms: f64, + pub sent_gap_avg_ms: f64, + pub sent_gap_p95_ms: f64, + pub sent_gap_max_ms: f64, + // Frame age (capture → send) + pub frame_age_avg_ms: f64, + pub frame_age_p95_ms: f64, + pub frame_age_max_ms: f64, + // Send wait + pub send_wait_p95_ms: f64, + // Per-stage encode timing (ms) + pub import_avg_ms: f64, + pub import_p95_ms: f64, + pub scale_avg_ms: f64, + pub scale_p95_ms: f64, + pub transfer_avg_ms: f64, + pub transfer_p95_ms: f64, + pub sws_avg_ms: f64, + pub sws_p95_ms: f64, + pub encode_avg_ms: f64, + pub encode_p95_ms: f64, + pub total_avg_ms: f64, + pub total_p95_ms: f64, + // Output size + pub output_bytes_per_sec: f64, + pub output_frame_bytes_p95: usize, + pub output_frame_bytes_max: usize, +} + +impl std::fmt::Display for StatsSnapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "capture_fps={:.1} encoded_fps={:.1} sent_fps={:.1} \ + pw_dropped={} over_budget={} \ + cap_q={} enc_q={} \ + cap_gap_p95={:.1}ms cap_gap_max={:.1}ms \ + enc_gap_p95={:.1}ms enc_gap_max={:.1}ms \ + sent_gap_p95={:.1}ms sent_gap_max={:.1}ms \ + frame_age_p95={:.1}ms frame_age_max={:.1}ms \ + send_wait_p95={:.1}ms \ + import_p95={:.1}ms scale_p95={:.1}ms transfer_p95={:.1}ms \ + sws_p95={:.1}ms encode_p95={:.1}ms total_p95={:.1}ms \ + output_bps={:.0} frame_bytes_max={}", + self.capture_fps, + self.encoded_fps, + self.sent_fps, + self.pipewire_dropped, + self.over_budget_count, + self.capture_queue_depth, + self.encoded_queue_depth, + self.capture_gap_p95_ms, + self.capture_gap_max_ms, + self.encoded_gap_p95_ms, + self.encoded_gap_max_ms, + self.sent_gap_p95_ms, + self.sent_gap_max_ms, + self.frame_age_p95_ms, + self.frame_age_max_ms, + self.send_wait_p95_ms, + self.import_p95_ms, + self.scale_p95_ms, + self.transfer_p95_ms, + self.sws_p95_ms, + self.encode_p95_ms, + self.total_p95_ms, + self.output_bytes_per_sec, + self.output_frame_bytes_max, + ) + } +} + +// --------------------------------------------------------------------------- +// Statistics helpers +// --------------------------------------------------------------------------- + +fn avg_f64(data: &[f64]) -> f64 { + if data.is_empty() { + return 0.0; + } + data.iter().sum::() / data.len() as f64 +} + +fn p95_f64(data: &[f64]) -> f64 { + if data.is_empty() { + return 0.0; + } + let mut sorted: Vec = data.to_vec(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let idx = ((sorted.len() as f64) * 0.95).floor() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn max_f64(data: &[f64]) -> f64 { + data.iter().copied().fold(0.0_f64, f64::max) +} + +fn avg_ms(data: &[u64]) -> f64 { + if data.is_empty() { + return 0.0; + } + data.iter().sum::() as f64 / data.len() as f64 / 1000.0 +} + +fn p95_ms(data: &[u64]) -> f64 { + if data.is_empty() { + return 0.0; + } + let mut sorted = data.to_vec(); + sorted.sort_unstable(); + let idx = ((sorted.len() as f64) * 0.95).floor() as usize; + sorted[idx.min(sorted.len() - 1)] as f64 / 1000.0 +} + +fn sum_usize(data: &[usize]) -> usize { + data.iter().sum() +} + +fn p95_usize(data: &[usize]) -> usize { + if data.is_empty() { + return 0; + } + let mut sorted = data.to_vec(); + sorted.sort_unstable(); + let idx = ((sorted.len() as f64) * 0.95).floor() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn max_usize(data: &[usize]) -> usize { + data.iter().copied().max().unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_stats_snapshot() { + let mut stats = PipelineStats::new(); + let snap = stats.snapshot_and_reset(); + assert_eq!(snap.capture_frames, 0); + assert_eq!(snap.encoded_frames, 0); + assert_eq!(snap.sent_frames, 0); + } + + #[test] + fn record_and_snapshot_counts() { + let mut stats = PipelineStats::new(); + stats.record_capture(); + stats.record_capture(); + stats.record_encode(&FrameTimings { + total_us: 5000, + output_bytes: 1000, + ..Default::default() + }); + stats.record_send(0.1, None); + + let snap = stats.snapshot_and_reset(); + assert_eq!(snap.capture_frames, 2); + assert_eq!(snap.encoded_frames, 1); + assert_eq!(snap.sent_frames, 1); + } + + #[test] + fn p95_computation() { + // 100 values: 0.0 through 99.0 + let data: Vec = (0..100).map(|i| i as f64).collect(); + let result = p95_f64(&data); + assert!((result - 95.0).abs() < 1.0, "p95 of 0..100 should be ~95, got {result}"); + } + + #[test] + fn p95_ms_microseconds() { + let data: Vec = (0..100).map(|i| i * 1000).collect(); // 0ms..99ms + let result = p95_ms(&data); + assert!((result - 95.0).abs() < 1.0, "p95_ms should be ~95ms, got {result}"); + } + + #[test] + fn snapshot_resets_counters() { + let mut stats = PipelineStats::new(); + stats.record_capture(); + let _ = stats.snapshot_and_reset(); + let snap = stats.snapshot_and_reset(); + assert_eq!(snap.capture_frames, 0); + } + + #[test] + fn display_format_contains_key_fields() { + let mut stats = PipelineStats::new(); + stats.record_capture(); + stats.record_encode(&FrameTimings { + total_us: 10000, + output_bytes: 5000, + ..Default::default() + }); + stats.record_send(0.5, None); + let snap = stats.snapshot_and_reset(); + let text = format!("{snap}"); + assert!(text.contains("capture_fps=")); + assert!(text.contains("encoded_fps=")); + assert!(text.contains("sent_fps=")); + assert!(text.contains("total_p95=")); + } +}