feat(stats): add --stats flag and PipelineStats windowed diagnostics

Add lightweight per-second pipeline statistics for stutter diagnosis:
- --stats CLI flag enables structured stats logging
- PipelineStats tracks capture/encode/send timing with p95/pmax
- FrameTimings records import/scale/transfer/sws/encode per-frame
- StatsSnapshot produces one structured log line per second
This commit is contained in:
dailz
2026-06-07 16:54:45 +08:00
parent f3da1e4e6c
commit 029fe13e37
4 changed files with 498 additions and 0 deletions

4
.gitignore vendored
View File

@@ -17,3 +17,7 @@ Thumbs.db
# Sisyphus orchestration artifacts
.sisyphus/
.omo/
.playwright-mcp/
wl-webrtc.log
webrtc-p0-success.png

View File

@@ -50,4 +50,8 @@ pub struct Args {
/// Force re-authorization dialog (ignore saved portal restore token)
#[arg(long)]
pub no_persist: bool,
/// Enable per-second pipeline statistics output for stutter diagnosis
#[arg(long)]
pub stats: bool,
}

View File

@@ -4,6 +4,7 @@ pub mod backend_detect;
pub mod cap_portal;
pub mod cap_wlr_screencopy;
pub mod fps_limit;
pub mod stats;
pub mod state;
pub mod state_portal;
pub mod transform;

489
src/stats.rs Normal file
View File

@@ -0,0 +1,489 @@
// stats.rs — Lightweight windowed pipeline statistics for stutter diagnosis
//
// Tracks per-second snapshots of capture/encode/send pipeline metrics.
// Designed for low overhead: only counters and timing samples are collected,
// with one structured log line emitted per second when `--stats` is enabled.
use std::time::Instant;
/// Per-stage timing for a single encode pipeline frame.
///
/// All values are in microseconds. The caller records timestamps around
/// each stage and passes the deltas to [`PipelineStats::record_frame`].
#[derive(Debug, Default)]
pub struct FrameTimings {
/// DMA-BUF import (av_hwframe_map)
pub import_us: u64,
/// GPU scale (scale_vaapi filter)
pub scale_us: u64,
/// GPU→CPU transfer (av_hwframe_transfer_data)
pub transfer_us: u64,
/// sws_scale NV12→YUV420P
pub sws_us: u64,
/// H.264 encode (avcodec_send_frame + receive_packet)
pub encode_us: u64,
/// Wall-clock total for this frame (import through encode output)
pub total_us: u64,
/// Encoded output size in bytes
pub output_bytes: usize,
}
/// Windowed statistics aggregator for the encode/send pipeline.
///
/// Collects counters and timing samples within a one-second window,
/// then computes avg/p95/max when the snapshot is taken.
pub struct PipelineStats {
// --- counters (reset each window) ---
capture_frames: u64,
encoded_frames: u64,
sent_frames: u64,
pipewire_dropped: u64,
over_budget_count: u64,
// --- queue depth at last observation ---
capture_queue_depth: usize,
encoded_queue_depth: usize,
// --- timing samples ---
capture_gaps_ms: Vec<f64>,
encoded_gaps_ms: Vec<f64>,
sent_gaps_ms: Vec<f64>,
frame_age_ms: Vec<f64>,
send_wait_ms: Vec<f64>,
// --- per-stage timing (microseconds) ---
import_us: Vec<u64>,
scale_us: Vec<u64>,
transfer_us: Vec<u64>,
sws_us: Vec<u64>,
encode_us: Vec<u64>,
total_us: Vec<u64>,
output_bytes: Vec<usize>,
// --- timing state ---
last_capture_time: Option<Instant>,
last_encode_time: Option<Instant>,
last_send_time: Option<Instant>,
window_start: Instant,
}
impl PipelineStats {
pub fn new() -> Self {
Self {
capture_frames: 0,
encoded_frames: 0,
sent_frames: 0,
pipewire_dropped: 0,
over_budget_count: 0,
capture_queue_depth: 0,
encoded_queue_depth: 0,
capture_gaps_ms: Vec::new(),
encoded_gaps_ms: Vec::new(),
sent_gaps_ms: Vec::new(),
frame_age_ms: Vec::new(),
send_wait_ms: Vec::new(),
import_us: Vec::new(),
scale_us: Vec::new(),
transfer_us: Vec::new(),
sws_us: Vec::new(),
encode_us: Vec::new(),
total_us: Vec::new(),
output_bytes: Vec::new(),
last_capture_time: None,
last_encode_time: None,
last_send_time: None,
window_start: Instant::now(),
}
}
/// Record that a capture frame was received from PipeWire.
pub fn record_capture(&mut self) {
let now = Instant::now();
if let Some(last) = self.last_capture_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.capture_gaps_ms.push(gap_ms);
}
self.last_capture_time = Some(now);
self.capture_frames += 1;
}
/// Record that a frame completed encoding with the given timings.
pub fn record_encode(&mut self, timings: &FrameTimings) {
let now = Instant::now();
if let Some(last) = self.last_encode_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.encoded_gaps_ms.push(gap_ms);
}
self.last_encode_time = Some(now);
self.encoded_frames += 1;
self.import_us.push(timings.import_us);
self.scale_us.push(timings.scale_us);
self.transfer_us.push(timings.transfer_us);
self.sws_us.push(timings.sws_us);
self.encode_us.push(timings.encode_us);
self.total_us.push(timings.total_us);
self.output_bytes.push(timings.output_bytes);
}
pub fn record_import(&mut self, import_us: u64) {
self.import_us.push(import_us);
}
pub fn record_encode_thread(&mut self, sws_us: u64, encode_us: u64, output_bytes: usize) {
let now = Instant::now();
if let Some(last) = self.last_encode_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.encoded_gaps_ms.push(gap_ms);
}
self.last_encode_time = Some(now);
self.encoded_frames += 1;
self.sws_us.push(sws_us);
self.encode_us.push(encode_us);
self.total_us.push(sws_us.saturating_add(encode_us));
self.output_bytes.push(output_bytes);
}
/// Record that a frame was sent via WebRTC.
/// `wait_ms` is time spent blocked waiting to send into the channel.
/// `capture_time` is when the frame was originally captured (for frame age).
pub fn record_send(&mut self, wait_ms: f64, capture_time: Option<Instant>) {
let now = Instant::now();
if let Some(last) = self.last_send_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.sent_gaps_ms.push(gap_ms);
}
self.last_send_time = Some(now);
self.sent_frames += 1;
if wait_ms > 0.0 {
self.send_wait_ms.push(wait_ms);
}
if let Some(ct) = capture_time {
let age_ms = ct.elapsed().as_secs_f64() * 1000.0;
self.frame_age_ms.push(age_ms);
}
}
/// Update PipeWire dropped counter (absolute value from AtomicU64).
pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) {
self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped);
}
/// Update queue depth observations.
pub fn set_queue_depths(&mut self, capture: usize, encoded: usize) {
self.capture_queue_depth = capture;
self.encoded_queue_depth = encoded;
}
/// Record that a frame exceeded its time budget.
pub fn record_over_budget(&mut self) {
self.over_budget_count += 1;
}
/// Returns true if at least 1 second has elapsed since the last snapshot
/// (or since creation). If true, call `snapshot_and_reset` to get the stats.
pub fn should_snapshot(&self) -> bool {
self.window_start.elapsed().as_secs() >= 1
}
/// Compute a snapshot of the current window and reset all counters.
pub fn snapshot_and_reset(&mut self) -> StatsSnapshot {
let elapsed = self.window_start.elapsed().as_secs_f64();
let snap = StatsSnapshot {
elapsed_secs: elapsed,
capture_fps: self.capture_frames as f64 / elapsed,
encoded_fps: self.encoded_frames as f64 / elapsed,
sent_fps: self.sent_frames as f64 / elapsed,
capture_frames: self.capture_frames,
encoded_frames: self.encoded_frames,
sent_frames: self.sent_frames,
pipewire_dropped: self.pipewire_dropped,
over_budget_count: self.over_budget_count,
capture_queue_depth: self.capture_queue_depth,
encoded_queue_depth: self.encoded_queue_depth,
capture_gap_avg_ms: avg_f64(&self.capture_gaps_ms),
capture_gap_p95_ms: p95_f64(&self.capture_gaps_ms),
capture_gap_max_ms: max_f64(&self.capture_gaps_ms),
encoded_gap_avg_ms: avg_f64(&self.encoded_gaps_ms),
encoded_gap_p95_ms: p95_f64(&self.encoded_gaps_ms),
encoded_gap_max_ms: max_f64(&self.encoded_gaps_ms),
sent_gap_avg_ms: avg_f64(&self.sent_gaps_ms),
sent_gap_p95_ms: p95_f64(&self.sent_gaps_ms),
sent_gap_max_ms: max_f64(&self.sent_gaps_ms),
frame_age_avg_ms: avg_f64(&self.frame_age_ms),
frame_age_p95_ms: p95_f64(&self.frame_age_ms),
frame_age_max_ms: max_f64(&self.frame_age_ms),
send_wait_p95_ms: p95_f64(&self.send_wait_ms),
import_avg_ms: avg_ms(&self.import_us),
import_p95_ms: p95_ms(&self.import_us),
scale_avg_ms: avg_ms(&self.scale_us),
scale_p95_ms: p95_ms(&self.scale_us),
transfer_avg_ms: avg_ms(&self.transfer_us),
transfer_p95_ms: p95_ms(&self.transfer_us),
sws_avg_ms: avg_ms(&self.sws_us),
sws_p95_ms: p95_ms(&self.sws_us),
encode_avg_ms: avg_ms(&self.encode_us),
encode_p95_ms: p95_ms(&self.encode_us),
total_avg_ms: avg_ms(&self.total_us),
total_p95_ms: p95_ms(&self.total_us),
output_bytes_per_sec: sum_usize(&self.output_bytes) as f64 / elapsed,
output_frame_bytes_p95: p95_usize(&self.output_bytes),
output_frame_bytes_max: max_usize(&self.output_bytes),
};
// Reset all counters and sample buffers
self.capture_frames = 0;
self.encoded_frames = 0;
self.sent_frames = 0;
self.pipewire_dropped = 0;
self.over_budget_count = 0;
self.capture_queue_depth = 0;
self.encoded_queue_depth = 0;
self.capture_gaps_ms.clear();
self.encoded_gaps_ms.clear();
self.sent_gaps_ms.clear();
self.frame_age_ms.clear();
self.send_wait_ms.clear();
self.import_us.clear();
self.scale_us.clear();
self.transfer_us.clear();
self.sws_us.clear();
self.encode_us.clear();
self.total_us.clear();
self.output_bytes.clear();
self.window_start = Instant::now();
snap
}
}
/// A one-second snapshot of pipeline statistics.
#[derive(Debug)]
pub struct StatsSnapshot {
pub elapsed_secs: f64,
// FPS
pub capture_fps: f64,
pub encoded_fps: f64,
pub sent_fps: f64,
// Counters
pub capture_frames: u64,
pub encoded_frames: u64,
pub sent_frames: u64,
pub pipewire_dropped: u64,
pub over_budget_count: u64,
// Queue depths
pub capture_queue_depth: usize,
pub encoded_queue_depth: usize,
// Gap timing (ms)
pub capture_gap_avg_ms: f64,
pub capture_gap_p95_ms: f64,
pub capture_gap_max_ms: f64,
pub encoded_gap_avg_ms: f64,
pub encoded_gap_p95_ms: f64,
pub encoded_gap_max_ms: f64,
pub sent_gap_avg_ms: f64,
pub sent_gap_p95_ms: f64,
pub sent_gap_max_ms: f64,
// Frame age (capture → send)
pub frame_age_avg_ms: f64,
pub frame_age_p95_ms: f64,
pub frame_age_max_ms: f64,
// Send wait
pub send_wait_p95_ms: f64,
// Per-stage encode timing (ms)
pub import_avg_ms: f64,
pub import_p95_ms: f64,
pub scale_avg_ms: f64,
pub scale_p95_ms: f64,
pub transfer_avg_ms: f64,
pub transfer_p95_ms: f64,
pub sws_avg_ms: f64,
pub sws_p95_ms: f64,
pub encode_avg_ms: f64,
pub encode_p95_ms: f64,
pub total_avg_ms: f64,
pub total_p95_ms: f64,
// Output size
pub output_bytes_per_sec: f64,
pub output_frame_bytes_p95: usize,
pub output_frame_bytes_max: usize,
}
impl std::fmt::Display for StatsSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"capture_fps={:.1} encoded_fps={:.1} sent_fps={:.1} \
pw_dropped={} over_budget={} \
cap_q={} enc_q={} \
cap_gap_p95={:.1}ms cap_gap_max={:.1}ms \
enc_gap_p95={:.1}ms enc_gap_max={:.1}ms \
sent_gap_p95={:.1}ms sent_gap_max={:.1}ms \
frame_age_p95={:.1}ms frame_age_max={:.1}ms \
send_wait_p95={:.1}ms \
import_p95={:.1}ms scale_p95={:.1}ms transfer_p95={:.1}ms \
sws_p95={:.1}ms encode_p95={:.1}ms total_p95={:.1}ms \
output_bps={:.0} frame_bytes_max={}",
self.capture_fps,
self.encoded_fps,
self.sent_fps,
self.pipewire_dropped,
self.over_budget_count,
self.capture_queue_depth,
self.encoded_queue_depth,
self.capture_gap_p95_ms,
self.capture_gap_max_ms,
self.encoded_gap_p95_ms,
self.encoded_gap_max_ms,
self.sent_gap_p95_ms,
self.sent_gap_max_ms,
self.frame_age_p95_ms,
self.frame_age_max_ms,
self.send_wait_p95_ms,
self.import_p95_ms,
self.scale_p95_ms,
self.transfer_p95_ms,
self.sws_p95_ms,
self.encode_p95_ms,
self.total_p95_ms,
self.output_bytes_per_sec,
self.output_frame_bytes_max,
)
}
}
// ---------------------------------------------------------------------------
// Statistics helpers
// ---------------------------------------------------------------------------
fn avg_f64(data: &[f64]) -> f64 {
if data.is_empty() {
return 0.0;
}
data.iter().sum::<f64>() / data.len() as f64
}
fn p95_f64(data: &[f64]) -> f64 {
if data.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = data.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn max_f64(data: &[f64]) -> f64 {
data.iter().copied().fold(0.0_f64, f64::max)
}
fn avg_ms(data: &[u64]) -> f64 {
if data.is_empty() {
return 0.0;
}
data.iter().sum::<u64>() as f64 / data.len() as f64 / 1000.0
}
fn p95_ms(data: &[u64]) -> f64 {
if data.is_empty() {
return 0.0;
}
let mut sorted = data.to_vec();
sorted.sort_unstable();
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)] as f64 / 1000.0
}
fn sum_usize(data: &[usize]) -> usize {
data.iter().sum()
}
fn p95_usize(data: &[usize]) -> usize {
if data.is_empty() {
return 0;
}
let mut sorted = data.to_vec();
sorted.sort_unstable();
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn max_usize(data: &[usize]) -> usize {
data.iter().copied().max().unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_stats_snapshot() {
let mut stats = PipelineStats::new();
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 0);
assert_eq!(snap.encoded_frames, 0);
assert_eq!(snap.sent_frames, 0);
}
#[test]
fn record_and_snapshot_counts() {
let mut stats = PipelineStats::new();
stats.record_capture();
stats.record_capture();
stats.record_encode(&FrameTimings {
total_us: 5000,
output_bytes: 1000,
..Default::default()
});
stats.record_send(0.1, None);
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 2);
assert_eq!(snap.encoded_frames, 1);
assert_eq!(snap.sent_frames, 1);
}
#[test]
fn p95_computation() {
// 100 values: 0.0 through 99.0
let data: Vec<f64> = (0..100).map(|i| i as f64).collect();
let result = p95_f64(&data);
assert!((result - 95.0).abs() < 1.0, "p95 of 0..100 should be ~95, got {result}");
}
#[test]
fn p95_ms_microseconds() {
let data: Vec<u64> = (0..100).map(|i| i * 1000).collect(); // 0ms..99ms
let result = p95_ms(&data);
assert!((result - 95.0).abs() < 1.0, "p95_ms should be ~95ms, got {result}");
}
#[test]
fn snapshot_resets_counters() {
let mut stats = PipelineStats::new();
stats.record_capture();
let _ = stats.snapshot_and_reset();
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 0);
}
#[test]
fn display_format_contains_key_fields() {
let mut stats = PipelineStats::new();
stats.record_capture();
stats.record_encode(&FrameTimings {
total_us: 10000,
output_bytes: 5000,
..Default::default()
});
stats.record_send(0.5, None);
let snap = stats.snapshot_and_reset();
let text = format!("{snap}");
assert!(text.contains("capture_fps="));
assert!(text.contains("encoded_fps="));
assert!(text.contains("sent_fps="));
assert!(text.contains("total_p95="));
}
}