fix(webrtc): SO_SNDBUF 2MB + VBV rate limiting + stats integration

P0 - UDP send buffer: set SO_SNDBUF=2MB to prevent EAGAIN on large IDR
frames (218KB/256KB keyframes caused 18+ EAGAIN bursts). Actual Linux
buffer 4096KB confirmed.

P1 - VBV rate limiting: cap rc_max_rate=bitrate and rc_buffer_size=
bitrate/4 for WebRTC encode path, preventing oversized IDR frames.

Stats: integrate PipelineStats into cap_portal (dropped_count), state.rs
(wlroots path), webrtc.rs (browser getStats enhancement + stats panel).
This commit is contained in:
dailz
2026-06-07 16:55:07 +08:00
parent 029fe13e37
commit aae030f309
3 changed files with 231 additions and 39 deletions

View File

@@ -73,6 +73,7 @@ pub struct CapPortal {
event_rx: Receiver<PwCtrlEvent>,
pw_thread: Option<JoinHandle<()>>,
rt: Runtime,
pw_dropped: Arc<AtomicU64>,
}
/// PipeWire 捕获线程的上下文数据
@@ -149,6 +150,7 @@ impl CapPortal {
event_rx,
pw_thread: Some(pw_thread),
rt,
pw_dropped,
})
}
@@ -160,6 +162,16 @@ impl CapPortal {
&self.event_rx
}
/// Returns the total number of PipeWire frames dropped due to channel backlog.
pub fn dropped_count(&self) -> u64 {
self.pw_dropped.load(Ordering::Relaxed)
}
/// Returns the number of frames currently waiting in the capture channel.
pub fn capture_queue_depth(&self) -> usize {
self.frame_rx.len()
}
/// 通过 XDG Desktop Portal 建立屏幕录制会话
///
/// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。
@@ -476,7 +488,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let mainloop = match pw::main_loop::MainLoopBox::new(None) {
Ok(ml) => ml,
Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}")));
if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))) {
tracing::error!("MainLoop::new failed and error channel also failed: {e}");
}
return;
}
};
@@ -484,7 +498,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let context = match pw::context::ContextBox::new(mainloop.loop_(), None) {
Ok(c) => c,
Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}")));
if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))) {
tracing::error!("Context::new failed and error channel also failed: {e}");
}
return;
}
};
@@ -492,7 +508,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let core = match context.connect_fd(pw_fd, None) {
Ok(c) => c,
Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}")));
if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}"))) {
tracing::error!("connect_fd failed and error channel also failed: {e}");
}
return;
}
};
@@ -514,7 +532,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
) {
Ok(s) => s,
Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}")));
if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))) {
tracing::error!("Stream::new failed and error channel also failed: {e}");
}
return;
}
};
@@ -584,12 +604,14 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let raw_buf = unsafe { stream.dequeue_raw_buffer() };
if raw_buf.is_null() {
tracing::trace!("process: null raw_buf");
return;
}
// 获取 SPA buffer 结构体,包含数据数组、元数据等
let spa_buf = unsafe { (*raw_buf).buffer };
if spa_buf.is_null() {
tracing::trace!("process: null spa_buf");
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
@@ -599,6 +621,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let n_datas = unsafe { (*spa_buf).n_datas };
let datas_ptr = unsafe { (*spa_buf).datas };
if n_datas == 0 || datas_ptr.is_null() {
tracing::trace!("process: no data (n_datas={n_datas})");
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
@@ -609,11 +632,13 @@ fn pipewire_thread(ctx: PwThreadCtx) {
unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) };
let fd = data_ref.fd();
if fd < 0 {
tracing::trace!("process: invalid fd={fd}");
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
if data_ref.as_raw().chunk.is_null() {
tracing::trace!("process: null chunk");
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
@@ -651,6 +676,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
return;
};
if width == 0 || height == 0 || format == 0 {
tracing::trace!("process: invalid dimensions {width}x{height} format={format}");
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
@@ -695,7 +721,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
&mut params,
) {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}")));
if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))) {
tracing::error!("stream.connect failed and error channel also failed: {e}");
}
return;
}

View File

@@ -46,6 +46,7 @@ use crate::args::Args;
use crate::avhw::{AvHwDevCtx, EncState, SwEncState};
use crate::cap_wlr_screencopy::CapWlrScreencopy;
use crate::fps_limit::FpsLimit;
use crate::stats::{FrameTimings, PipelineStats};
use crate::transform::{transpose_if_transform_transposed, Transform};
use crate::webrtc::WebRtcState;
@@ -213,6 +214,9 @@ pub struct State<S: CaptureSource> {
pub stage: EncConstructionStage<S>,
pub in_flight_surface: InFlightSurface<S>,
pub starting_timestamp: Option<i64>,
pub stats_start_time: Option<Instant>,
pub stats_last_time: Option<Instant>,
pub stats_frames: u64,
pub first_frame: bool,
pub args: Args,
pub errored: bool,
@@ -226,6 +230,7 @@ pub struct State<S: CaptureSource> {
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
webrtc_frames_sent: u64,
webrtc_paused: Option<Arc<AtomicBool>>,
stats: PipelineStats,
}
// ---------------------------------------------------------------------------
@@ -302,6 +307,9 @@ impl<S: CaptureSource> State<S> {
},
in_flight_surface: InFlightSurface::None,
starting_timestamp: None,
stats_start_time: None,
stats_last_time: None,
stats_frames: 0,
first_frame: true,
fps_limit: FpsLimit::new(fps),
args,
@@ -315,6 +323,7 @@ impl<S: CaptureSource> State<S> {
webrtc_rx,
webrtc_frames_sent: 0,
webrtc_paused,
stats: PipelineStats::new(),
};
// registry_queue_init consumes registry events internally during its
@@ -492,7 +501,7 @@ impl<S: CaptureSource> State<S> {
// is a freshly allocated empty Video frame.
let ret = unsafe { ffi::av_hwframe_get_buffer(frames_rgb_ctx, surface.as_mut_ptr(), 0) };
if ret < 0 {
tracing::error!("av_hwframe_get_buffer failed: error {}", ret);
tracing::error!("av_hwframe_get_buffer failed: {}", crate::avhw::ff_err(ret));
self.errored = true;
return;
}
@@ -505,7 +514,7 @@ impl<S: CaptureSource> State<S> {
}
let ret = unsafe { ffi::av_hwframe_map(map_frame.as_mut_ptr(), surface.as_ptr(), 0) };
if ret < 0 {
tracing::error!("av_hwframe_map failed: error {}", ret);
tracing::error!("av_hwframe_map failed: {}", crate::avhw::ff_err(ret));
self.errored = true;
return;
}
@@ -530,7 +539,7 @@ impl<S: CaptureSource> State<S> {
// takes ownership of the fd, and the original fd is owned by map_frame.
let fd_dup = unsafe { libc::dup(obj.fd) };
if fd_dup < 0 {
tracing::error!("failed to dup dma-buf fd");
tracing::error!("failed to dup dma-buf fd: {}", std::io::Error::last_os_error());
// wayland-client does not auto-destroy params on Drop.
params.destroy();
self.errored = true;
@@ -574,6 +583,8 @@ impl<S: CaptureSource> State<S> {
where
S::Frame: Default,
{
self.stats.record_capture();
let (mut surface, _drm_map, frame, buffer) =
match mem::replace(&mut self.in_flight_surface, InFlightSurface::None) {
InFlightSurface::CopyQueued {
@@ -614,10 +625,29 @@ impl<S: CaptureSource> State<S> {
.is_some()
};
if should_encode {
let encode_start = Instant::now();
if let Err(e) = enc.encode_frame(&surface) {
tracing::error!("encode_frame failed: {}", e);
self.errored = true;
}
let encode_elapsed = encode_start.elapsed().as_micros() as u64;
self.stats.record_encode(&FrameTimings {
total_us: encode_elapsed,
..Default::default()
});
}
self.stats_frames += 1;
if let Some(last) = self.stats_last_time {
if last.elapsed() >= std::time::Duration::from_secs(10) {
let delta = self.stats_frames;
let fps = delta as f64 / last.elapsed().as_secs_f64();
tracing::info!(frames = self.stats_frames, fps = format!("{fps:.1}"), "encoding stats");
self.stats_last_time = Some(std::time::Instant::now());
self.stats_frames = 0;
}
} else {
self.stats_start_time = Some(std::time::Instant::now());
self.stats_last_time = Some(std::time::Instant::now());
}
}
@@ -670,13 +700,23 @@ impl<S: CaptureSource> State<S> {
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
self.stats.record_send(0.0, None);
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
}
if count > 0 {
tracing::info!("WebRTC forwarded {count} frames from channel");
tracing::debug!("WebRTC forwarded {count} frames from channel");
}
}
if self.args.stats && self.stats.should_snapshot() {
self.stats.set_queue_depths(
0,
self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0),
);
let snap = self.stats.snapshot_and_reset();
tracing::info!("stats: {snap}");
}
Ok(())
}
@@ -995,8 +1035,7 @@ impl<S: CaptureSource> Dispatch<WlRegistry, GlobalListContents> for State<S> {
qhandle: &QueueHandle<State<S>>,
) {
use wayland_client::protocol::wl_registry::Event as RegistryEvent;
tracing::debug!("Dispatch<WlRegistry>::event fired: {:?}", event);
match event {
RegistryEvent::Global {
name,

View File

@@ -19,11 +19,13 @@ const HTML_PAGE: &str = r#"<!DOCTYPE html>
video{max-width:90vw;max-height:80vh;border:1px solid #333}
#status{margin:12px;font-size:14px;color:#aaa}
#debug{position:fixed;bottom:8px;left:8px;font-size:11px;color:#666;max-width:90vw;white-space:pre-wrap}
#stats-panel{position:fixed;top:8px;right:8px;background:rgba(0,0,0,0.7);color:#0f0;font:11px monospace;padding:6px 10px;border-radius:4px;z-index:100;pointer-events:none;max-width:90vw;white-space:pre;line-height:1.5}
</style></head>
<body>
<div id="status">Connecting...</div>
<video id="video" autoplay playsinline muted></video>
<pre id="debug"></pre>
<div id="stats-panel"></div>
<script>
const status = document.getElementById('status');
const video = document.getElementById('video');
@@ -51,25 +53,92 @@ function preferH264(sdp) {
}
function installStatsLogger(peer) {
const panel = document.getElementById('stats-panel');
let prev = null;
const intervalSecs = 1;
setInterval(() => {
if (peer !== pc) return;
const v = video;
log(`video: readyState=${v.readyState} currentTime=${v.currentTime.toFixed(2)} ` +
`paused=${v.paused} width=${v.videoWidth} height=${v.videoHeight} ` +
`srcObject=${v.srcObject ? 'yes' : 'no'}`);
peer.getStats().then(stats => {
let rtp = null, rtt = null, codecStr = '';
let freezeCount = null, totalFreezesDuration = null;
stats.forEach(report => {
if (report.type === 'inbound-rtp' && report.kind === 'video') {
log(`RTP-in: packetsReceived=${report.packetsReceived} packetsLost=${report.packetsLost} ` +
`bytesReceived=${report.bytesReceived} framesDecoded=${report.framesDecoded} ` +
`framesDropped=${report.framesDropped} codecId=${report.codecId}`);
}
if (report.type === 'codec' && report.mimeType && report.mimeType.includes('H264')) {
log(`Codec: ${report.mimeType} ${report.payloadType} sdpFmtpLine=${report.sdpFmtpLine}`);
if (report.type === 'inbound-rtp' && report.kind === 'video') rtp = report;
if (report.type === 'codec' && report.mimeType && report.mimeType.includes('H264'))
codecStr = report.mimeType + ' ' + (report.payloadType || '');
// candidate-pair: feature-detect 'selected' property
if (report.type === 'candidate-pair') {
const isSel = ('selected' in report) ? report.selected : report.state === 'succeeded';
if (isSel && typeof report.currentRoundTripTime === 'number') rtt = report.currentRoundTripTime;
}
});
// Freeze stats (feature-detect)
if (rtp && typeof rtp.freezeCount !== 'undefined') {
freezeCount = rtp.freezeCount;
totalFreezesDuration = rtp.totalFreezesDuration;
}
if (!rtp) return;
const cur = {
framesDecoded: rtp.framesDecoded || 0,
framesDropped: rtp.framesDropped || 0,
framesPerSecond: rtp.framesPerSecond || 0,
packetsLost: rtp.packetsLost || 0,
jitter: rtp.jitter || 0,
bytesReceived: rtp.bytesReceived || 0,
totalDecodeTime: rtp.totalDecodeTime || 0,
jitterBufferDelay: rtp.jitterBufferDelay || 0,
jitterBufferEmittedCount: rtp.jitterBufferEmittedCount || 0,
freezeCount: freezeCount,
totalFreezesDuration: totalFreezesDuration,
rtt: rtt,
};
// Raw log to debug element (backward compat)
log('RTP-in: decoded=' + cur.framesDecoded + ' lost=' + cur.packetsLost +
' bytes=' + cur.bytesReceived + ' fps=' + cur.framesPerSecond +
(codecStr ? ' codec=' + codecStr : ''));
if (!prev) { prev = cur; return; }
// Compute deltas
const dFrames = cur.framesDecoded - prev.framesDecoded;
const dDropped = cur.framesDropped - prev.framesDropped;
const dLost = cur.packetsLost - prev.packetsLost;
const dBytes = cur.bytesReceived - prev.bytesReceived;
const dDecodeTime = cur.totalDecodeTime - prev.totalDecodeTime;
const dJitterBufDelay = cur.jitterBufferDelay - prev.jitterBufferDelay;
const dJitterBufCount = cur.jitterBufferEmittedCount - prev.jitterBufferEmittedCount;
const kbps = Math.round(dBytes * 8 / intervalSecs / 1000);
const decodeMs = dFrames > 0 ? (dDecodeTime / dFrames * 1000).toFixed(1) : '—';
const jitterBufMs = dJitterBufCount > 0 ? (dJitterBufDelay / dJitterBufCount * 1000).toFixed(1) : '—';
const jitterMs = (cur.jitter * 1000).toFixed(1);
const rttMs = cur.rtt !== null ? (cur.rtt * 1000).toFixed(1) : null;
let line = 'FPS:' + cur.framesPerSecond +
' Decoded:' + cur.framesDecoded + '(+' + dFrames + ')' +
' Dropped:' + cur.framesDropped + (dDropped > 0 ? '(+' + dDropped + ')' : '') +
' Lost:' + dLost +
' Jitter:' + jitterMs + 'ms' +
(rttMs !== null ? ' RTT:' + rttMs + 'ms' : '') +
' Decode:' + decodeMs + 'ms' +
' JBuf:' + jitterBufMs + 'ms';
if (freezeCount !== null) {
const dFreeze = cur.freezeCount - (prev.freezeCount || 0);
if (cur.freezeCount > 0 || dFreeze > 0)
line += ' Freeze:' + cur.freezeCount + '(+' + dFreeze + ')';
}
line += ' ' + kbps + 'kbps';
panel.textContent = line;
prev = cur;
}).catch(() => {});
}, 2000);
}, intervalSecs * 1000);
}
function connect() {
@@ -178,12 +247,16 @@ impl WebRtcState {
HTML_PAGE.len(),
HTML_PAGE
);
let _ = stream.write_all(resp.as_bytes());
if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
} else if req_str.starts_with("POST /sdp") {
let body = extract_body(&req_str);
if body.is_empty() {
let resp = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\nempty body";
let _ = stream.write_all(resp.as_bytes());
if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
continue;
}
@@ -206,17 +279,23 @@ impl WebRtcState {
answer_json.len(),
answer_json
);
let _ = stream.write_all(resp.as_bytes());
if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
}
Err(e) => {
tracing::error!("SDP offer handling failed: {e}");
let resp = "HTTP/1.1 500 Internal Server Error\r\nConnection: close\r\n\r\n";
let _ = stream.write_all(resp.as_bytes());
if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
}
}
} else {
let resp = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n";
let _ = stream.write_all(resp.as_bytes());
if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
}
}
Ok(handled)
@@ -225,7 +304,7 @@ impl WebRtcState {
pub fn poll_rtc(&mut self) -> Result<()> {
if let Some(inner) = self.inner.as_mut() {
if inner.poll_rtc()? {
tracing::warn!("WebRTC connection closed/failed; clearing connection state");
tracing::info!("WebRTC connection closed; clearing connection state");
self.inner = None;
}
}
@@ -252,7 +331,7 @@ impl WebRtcState {
false
};
if should_destroy {
tracing::warn!("WebRTC connection failed during write; clearing connection state");
tracing::info!("WebRTC connection failed during write; clearing connection state");
self.inner = None;
}
Ok(())
@@ -270,10 +349,49 @@ impl WebRtcInner {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;
// Increase UDP send buffer to absorb IDR frame bursts (256KB IDR → ~145 RTP
// packets in a single poll_rtc loop). Default Linux wmem is ~208KB which
// causes EAGAIN on large keyframes. 2MB comfortably buffers several IDRs.
const SND_BUF_REQ: usize = 2 * 1024 * 1024;
// SAFETY: fd is a valid UDP socket; setsockopt/getsockopt with SOL_SOCKET +
// SO_SNDBUF are safe on Linux. We check the return value and log the actual
// kernel-assigned buffer (Linux may cap at wmem_max and/or double the value).
unsafe {
let fd = std::os::unix::io::AsRawFd::as_raw_fd(&socket);
let val: libc::c_int = SND_BUF_REQ as libc::c_int;
let ret = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_SNDBUF,
&val as *const libc::c_int as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
);
if ret < 0 {
tracing::warn!("setsockopt SO_SNDBUF failed (errno {})", std::io::Error::last_os_error());
}
let mut actual: libc::c_int = 0;
let mut actual_len: libc::socklen_t = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let gret = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_SNDBUF,
&mut actual as *mut libc::c_int as *mut libc::c_void,
&mut actual_len,
);
if gret == 0 {
tracing::info!(
"UDP send buffer: requested {}KB, actual {}KB",
SND_BUF_REQ / 1024,
actual / 1024,
);
}
}
let local_addr = socket.local_addr()?;
let lan_ip = local_ip().unwrap_or_else(|| {
tracing::warn!("Failed to detect LAN IP, falling back to 127.0.0.1");
tracing::debug!("Failed to detect LAN IP, falling back to 127.0.0.1");
"127.0.0.1".to_string()
});
let candidate_addr: SocketAddr = format!("{lan_ip}:{}", local_addr.port()).parse()?;
@@ -320,7 +438,7 @@ impl WebRtcInner {
let mid = match self.video_mid {
Some(m) => m,
None => {
tracing::warn!("discover_video_params: no video_mid yet");
tracing::debug!("discover_video_params: no video_mid yet");
return;
}
};
@@ -344,13 +462,20 @@ impl WebRtcInner {
loop {
match self.rtc.poll_output() {
Ok(Output::Transmit(t)) => {
tracing::info!("TX {} bytes -> {}", t.contents.len(), t.destination);
tracing::trace!("TX {} bytes -> {}", t.contents.len(), t.destination);
if let Err(e) = self.socket.send_to(&t.contents, t.destination) {
tracing::warn!("UDP send error: {e}");
if e.kind() == std::io::ErrorKind::WouldBlock {
tracing::debug!(
"UDP send WouldBlock ({} bytes) — send buffer full",
t.contents.len(),
);
} else {
tracing::warn!("UDP send error to {}: {e}", t.destination);
}
}
}
Ok(Output::Event(e)) => {
tracing::info!("RTC event: {e:?}");
tracing::debug!("RTC event: {e:?}");
match &e {
Event::Connected => {
tracing::info!("WebRTC connected!");
@@ -400,7 +525,7 @@ impl WebRtcInner {
Ok((n, source)) => {
recv_count += 1;
if recv_count <= 5 {
tracing::info!("UDP recv {} bytes from {}", n, source);
tracing::trace!("UDP recv {} bytes from {}", n, source);
}
let input = Input::Receive(
Instant::now(),
@@ -438,14 +563,14 @@ impl WebRtcInner {
let mid = match self.video_mid {
Some(m) => m,
None => {
tracing::warn!("write_h264: no video_mid");
tracing::debug!("write_h264: no video_mid");
return Ok(false);
}
};
let pt = match self.video_pt {
Some(p) => p,
None => {
tracing::warn!("write_h264: no video_pt");
tracing::debug!("write_h264: no video_pt");
return Ok(false);
}
};
@@ -474,7 +599,7 @@ impl WebRtcInner {
let writer = match self.rtc.writer(mid) {
Some(w) => w,
None => {
tracing::warn!("write_h264: no writer for mid={mid}");
tracing::debug!("write_h264: no writer for mid={mid}");
return Ok(false);
}
};