From 503e4dbc224da380f3ad521fdf1a0918c84d2c32 Mon Sep 17 00:00:00 2001 From: dailz Date: Sun, 7 Jun 2026 18:30:09 +0800 Subject: [PATCH] feat(portal): independent WebRTC thread + channel tuning for 60fps mouse latency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move WebRTC send to dedicated wl-webrtc-webrtc thread (was inline in main loop) - Reduce frame_rx 16→1, input_tx 2→1 (drop-on-full), webrtc_tx 32→2 - Recv_timeout 10ms→2ms to reduce pipeline latency - Fix sent_gap_p95 stats bug: compute gap at actual send time in WebRTC thread instead of batch-draining at snapshot time (was always 0.0ms) - High profile via AVCodecContext.profile, veryfast preset, 5x bitrate - Stats drain via sent_gap channel with record_send_from_thread() - Shutdown: drop input_tx → join encode → drop webrtc_tx → join webrtc --- src/avhw.rs | 22 ++--- src/cap_portal.rs | 13 +-- src/state_portal.rs | 209 ++++++++++++++++++++++++++------------------ src/stats.rs | 11 +++ 4 files changed, 156 insertions(+), 99 deletions(-) diff --git a/src/avhw.rs b/src/avhw.rs index ad5dbf6..81a38b4 100644 --- a/src/avhw.rs +++ b/src/avhw.rs @@ -1330,7 +1330,7 @@ fn create_software_h264_muxer( enc.set_bit_rate(bitrate as usize); enc.set_gop(gop_size); enc.set_time_base(ff::Rational::new(1, fps as i32)); - enc.set_max_b_frames(0); + enc.set_max_b_frames(3); // SAFETY: global headers are needed by MP4 and harmless for other common muxers. unsafe { @@ -1338,17 +1338,16 @@ fn create_software_h264_muxer( } if codec_name == "libx264" { - // SAFETY: priv_data belongs to the unopened encoder; strings live for each call. + // SAFETY: priv_data and codec context belong to the unopened encoder; + // strings live for each av_opt_set call. unsafe { let key = CString::new("preset").unwrap(); - let val = CString::new("ultrafast").unwrap(); - ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); - let key = CString::new("tune").unwrap(); - let val = CString::new("zerolatency").unwrap(); + let val = CString::new("fast").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); let key = CString::new("threads").unwrap(); let val = CString::new("6").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); + (*enc.as_mut_ptr()).profile = ffi::AV_PROFILE_H264_HIGH as i32; } } @@ -1458,11 +1457,11 @@ fn create_software_h264_encoder( enc.set_max_b_frames(0); if codec_name == "libx264" { - // SAFETY: priv_data belongs to the unopened encoder context; each - // CString lives for the duration of its av_opt_set call. + // SAFETY: priv_data and codec context belong to the unopened encoder; + // each CString lives for the duration of its av_opt_set call. unsafe { let key = CString::new("preset").unwrap(); - let val = CString::new("ultrafast").unwrap(); + let val = CString::new("veryfast").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); let key = CString::new("tune").unwrap(); let val = CString::new("zerolatency").unwrap(); @@ -1470,6 +1469,9 @@ fn create_software_h264_encoder( let key = CString::new("threads").unwrap(); let val = CString::new("6").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); + // High profile via AVCodecContext.profile (not x264opts — x264 rejects it there). + // High enables CABAC + 8x8dct automatically. + (*enc.as_mut_ptr()).profile = ffi::AV_PROFILE_H264_HIGH as i32; let key = CString::new("x264opts").unwrap(); let val = CString::new("repeat_headers=1").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); @@ -1479,7 +1481,7 @@ fn create_software_h264_encoder( let opened = enc .open() .map_err(|e| anyhow::anyhow!("Failed to open {codec_name} encoder: {e}"))?; - tracing::info!("WebRTC encoder: {codec_name} {width}x{height} @ {fps}fps {bitrate}bps"); + tracing::info!("WebRTC encoder: {codec_name} {width}x{height} @ {fps}fps {bitrate}bps (profile High, preset veryfast)"); Ok(opened.0) } diff --git a/src/cap_portal.rs b/src/cap_portal.rs index 194859b..da661d5 100644 --- a/src/cap_portal.rs +++ b/src/cap_portal.rs @@ -98,7 +98,7 @@ impl CapPortal { /// 执行流程: /// 1. 创建 Tokio 运行时(用于异步 Portal 调用) /// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id - /// 3. 创建有界通道(容量 16)用于帧传递 + /// 3. 创建有界通道(容量 1)用于帧传递(最新帧优先,避免队列积压延迟) /// 4. 创建 eventfd 对,用于线程安全的关闭信号传递 /// 5. 启动 PipeWire 捕获线程 pub fn new(args: &Args) -> Result { @@ -107,7 +107,7 @@ impl CapPortal { let no_persist = args.no_persist; let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?; - let (frame_tx, frame_rx) = bounded(16); + let (frame_tx, frame_rx) = bounded(1); let (event_tx, event_rx) = bounded(8); let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; @@ -722,11 +722,12 @@ fn pipewire_thread(ctx: PwThreadCtx) { pts, }; - if let Err(crossbeam_channel::TrySendError::Full(_)) = frame_tx.try_send(frame) { - let prev = dropped.fetch_add(1, Ordering::Relaxed); - if prev > 0 && prev % 30 == 0 { - tracing::warn!("dropped {prev} frames total: encoder backlog"); + match frame_tx.try_send(frame) { + Ok(()) => {} + Err(crossbeam_channel::TrySendError::Full(_)) => { + dropped.fetch_add(1, Ordering::Relaxed); } + Err(crossbeam_channel::TrySendError::Disconnected(_)) => {} } unsafe { stream.queue_raw_buffer(raw_buf) }; } diff --git a/src/state_portal.rs b/src/state_portal.rs index 1f9f0f7..87147e2 100644 --- a/src/state_portal.rs +++ b/src/state_portal.rs @@ -33,6 +33,11 @@ struct EncodeThread { timing_rx: crossbeam_channel::Receiver, } +struct WebrtcThread { + handle: Option>, + sent_gap_rx: crossbeam_channel::Receiver, +} + /// 门户模式的主状态机 /// /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 @@ -50,9 +55,8 @@ pub struct StatePortal { start_time: Option, // 编码开始时间 stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照) pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算) - webrtc: Option, // WebRTC 状态(仅 WebRTC 模式启用) - webrtc_rx: Option>>, - webrtc_frames_sent: u64, + webrtc: Option, + webrtc_thread: Option, webrtc_paused: Option>, last_capture_arrival: Option, // timestamp of last real frame arrival stall_start: Option, // when current stall began @@ -98,8 +102,7 @@ impl StatePortal { stats: PipelineStats::new(), pw_dropped_prev: 0, webrtc, - webrtc_rx: None, - webrtc_frames_sent: 0, + webrtc_thread: None, webrtc_paused, last_capture_arrival: None, stall_start: None, @@ -112,14 +115,10 @@ impl StatePortal { /// 轮询 PipeWire 事件并编码帧 /// - /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms), + /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 2ms), /// `block=false` 时使用 try_recv 非阻塞检查。 /// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。 pub fn poll_and_encode(&mut self, block: bool) -> Result { - // 先处理 WebRTC 信令、网络轮询,并转发已编码帧 - // WebRTC: process signaling, network, and forward encoded frames - self.poll_webrtc()?; - // 检查 PipeWire 控制事件(流结束 / 错误) if let Ok(ctrl) = self.cap.event_receiver().try_recv() { match ctrl { @@ -147,8 +146,8 @@ impl StatePortal { // 根据阻塞模式选择不同的帧接收策略 let frame = if block { - // 阻塞模式:最多等待 10ms 接收帧 - match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) { + // 阻塞模式:最多等待 2ms 接收帧 + match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(2)) { Ok(frame) => frame, Err(_) => { self.record_capture_timeout(); @@ -193,7 +192,7 @@ impl StatePortal { ); // 码率:未指定时按分辨率 × 帧率动态计算 let actual_bitrate = self.args.bitrate.unwrap_or_else(|| { - 2 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100 + 5 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100 }); // GOP 大小:WebRTC 模式使用更小的 GOP(fps/2,最低10),MP4 模式使用 fps let actual_gop_size = self.args.gop_size.unwrap_or_else(|| { @@ -216,8 +215,8 @@ impl StatePortal { enc_height, self.args.fps, )?; - let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(32); - let (input_tx, input_rx) = crossbeam_channel::bounded::(2); + let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(2); + let (input_tx, input_rx) = crossbeam_channel::bounded::(1); let (timing_tx, timing_rx) = crossbeam_channel::bounded::(32); let encode = SwEncEncode::new_webrtc( enc_width, @@ -233,7 +232,18 @@ impl StatePortal { .spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?; self.enc_import = Some(import); self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx }); - self.webrtc_rx = Some(webrtc_rx); + + let wrtc = self.webrtc.take() + .ok_or_else(|| anyhow::anyhow!("internal: WebRtcState missing during init"))?; + let paused = self.webrtc_paused.as_ref() + .ok_or_else(|| anyhow::anyhow!("internal: webrtc_paused missing"))? + .clone(); + let fps = self.args.fps; + let (sent_gap_tx, sent_gap_rx) = crossbeam_channel::bounded(64); + let webrtc_handle = std::thread::Builder::new() + .name("wl-webrtc-webrtc".into()) + .spawn(move || webrtc_thread_loop(wrtc, webrtc_rx, fps, paused, sent_gap_tx))?; + self.webrtc_thread = Some(WebrtcThread { handle: Some(webrtc_handle), sent_gap_rx }); } else { // MP4 模式:编码输出写入文件 let output_path = self.args.output.as_deref() @@ -265,16 +275,24 @@ impl StatePortal { } } - // 在返回前再次轮询 WebRTC,确保本帧编码后的数据及时转发 - // WebRTC: drain encoded frames produced by this poll before returning. - self.poll_webrtc()?; - // 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志) if self.args.stats && self.stats.should_snapshot() { - // PipeWire 丢弃帧数:CapPortal 尚未暴露 dropped_count(),暂用占位 self.stats.set_pipewire_dropped(0, 0); - let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0); - self.stats.set_queue_depths(0, enc_q); + self.stats.set_queue_depths(0, 0); + if let Some(ref enc_thread) = self.enc_thread { + while let Ok(timing) = enc_thread.timing_rx.try_recv() { + self.stats.record_encode_thread( + timing.sws_us, + timing.encode_us, + timing.output_bytes, + ); + } + } + if let Some(ref webrtc_thread) = self.webrtc_thread { + while let Ok(gap_ms) = webrtc_thread.sent_gap_rx.try_recv() { + self.stats.record_send_from_thread(gap_ms); + } + } let snap = self.stats.snapshot_and_reset(); if self.filler_frames_sent > 0 { tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent); @@ -316,7 +334,7 @@ impl StatePortal { } fn maybe_send_filler_frame(&mut self) { - if self.webrtc.is_none() || self.stall_start.is_none() { + if self.webrtc_thread.is_none() || self.stall_start.is_none() { return; } let Some(cached) = &self.last_fillable_frame else { @@ -509,8 +527,8 @@ impl StatePortal { self.frames_encoded += 1; self.last_fillable_frame = Some(fillable_frame); } - Err(crossbeam_channel::TrySendError::Full(_frame)) => { - tracing::warn!("Encode thread input full, dropping portal frame"); + Err(crossbeam_channel::TrySendError::Full(_)) => { + tracing::debug!("Encode thread input full, dropping portal frame"); } Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => { tracing::error!("Encode thread input disconnected"); @@ -529,9 +547,7 @@ impl StatePortal { /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 pub fn shutdown(&mut self) { self.last_fillable_frame = None; - // 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected - // 而非在满通道上阻塞(修复 issue #8 死锁) - self.webrtc_rx = None; + // 1. Stop encode thread (drops webrtc_tx → signals WebRTC thread to exit) if let Some(mut enc_thread) = self.enc_thread.take() { drop(enc_thread.input_tx); if let Some(handle) = enc_thread.handle.take() { @@ -541,6 +557,15 @@ impl StatePortal { } } self.enc_import = None; + // 2. Wait for WebRTC thread (exits when webrtc_tx is dropped by encode thread) + if let Some(mut webrtc_thread) = self.webrtc_thread.take() { + if let Some(handle) = webrtc_thread.handle.take() { + if handle.join().is_err() { + tracing::error!("WebRTC thread panicked during shutdown"); + } + } + } + // 3. Flush MP4 encoder if present if let Some(mut enc) = self.enc.take() { if let Err(e) = enc.flush() { tracing::error!("Flush error during shutdown: {e}"); @@ -565,60 +590,6 @@ impl StatePortal { pub fn is_errored(&self) -> bool { self.errored } - - /// 轮询 WebRTC 信令通道并转发编码帧 - /// - /// 处理信令交换、网络轮询,以及从编码通道中取出已编码的 H.264 数据 - /// 并通过 WebRTC 发送。 - fn poll_webrtc(&mut self) -> Result<()> { - let Some(ref mut wrtc) = self.webrtc else { return Ok(()) }; - - wrtc.handle_signaling()?; - wrtc.poll_and_feed()?; - - let connected = wrtc.is_connected(); - - if let Some(ref paused) = self.webrtc_paused { - let was_paused = paused.load(Ordering::Relaxed); - let now_paused = !connected; - if was_paused && !now_paused { - tracing::info!("WebRTC client connected, resuming encoding"); - } else if !was_paused && now_paused { - tracing::warn!("WebRTC client disconnected, pausing encoding"); - } - paused.store(now_paused, Ordering::Relaxed); - } - - if let Some(ref rx) = self.webrtc_rx { - let mut count = 0u32; - while let Ok(data) = rx.try_recv() { - if !connected { - continue; - } - count += 1; - if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) { - tracing::debug!("WebRTC write frame error: {e}"); - } - self.stats.record_send(0.0, None); - self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1); - } - if count > 0 { - tracing::debug!("WebRTC forwarded {count} frames from channel"); - } - } - - if let Some(ref enc_thread) = self.enc_thread { - while let Ok(timing) = enc_thread.timing_rx.try_recv() { - self.stats.record_encode_thread( - timing.sws_us, - timing.encode_us, - timing.output_bytes, - ); - } - } - - Ok(()) - } } fn encode_thread_loop( @@ -657,6 +628,78 @@ fn encode_thread_loop( tracing::info!("Encode thread exiting"); } +fn webrtc_thread_loop( + mut wrtc: WebRtcState, + webrtc_rx: crossbeam_channel::Receiver>, + fps: u32, + paused: Arc, + sent_gap_tx: crossbeam_channel::Sender, +) { + let mut frames_sent: u64 = 0; + let mut last_send: Option = None; + let timeout = Duration::from_millis(1); + + loop { + if let Err(e) = wrtc.handle_signaling() { + tracing::error!("WebRTC signaling error: {e}"); + break; + } + if let Err(e) = wrtc.poll_and_feed() { + tracing::error!("WebRTC poll error: {e}"); + break; + } + + let connected = wrtc.is_connected(); + let was_paused = paused.load(Ordering::Relaxed); + let now_paused = !connected; + if was_paused && !now_paused { + tracing::info!("WebRTC client connected, resuming encoding"); + } else if !was_paused && now_paused { + tracing::warn!("WebRTC client disconnected, pausing encoding"); + } + paused.store(now_paused, Ordering::Relaxed); + + if connected { + while let Ok(data) = webrtc_rx.try_recv() { + if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) { + tracing::debug!("WebRTC write frame error: {e}"); + } + frames_sent = frames_sent.saturating_add(1); + let gap_ms = last_send + .map(|l| l.elapsed().as_secs_f64() * 1000.0) + .unwrap_or(0.0); + last_send = Some(std::time::Instant::now()); + let _ = sent_gap_tx.try_send(gap_ms); + } + } else { + while webrtc_rx.try_recv().is_ok() {} + } + + match webrtc_rx.recv_timeout(timeout) { + Ok(data) => { + if wrtc.is_connected() { + if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) { + tracing::debug!("WebRTC write frame error: {e}"); + } + frames_sent = frames_sent.saturating_add(1); + let gap_ms = last_send + .map(|l| l.elapsed().as_secs_f64() * 1000.0) + .unwrap_or(0.0); + last_send = Some(std::time::Instant::now()); + let _ = sent_gap_tx.try_send(gap_ms); + } + } + Err(crossbeam_channel::RecvTimeoutError::Timeout) => {} + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { + tracing::info!("WebRTC channel disconnected, exiting thread"); + return; + } + } + } + + tracing::info!("WebRTC thread exiting"); +} + impl Drop for StatePortal { // 析构时自动调用 shutdown,确保编码器被刷新、资源被释放 fn drop(&mut self) { diff --git a/src/stats.rs b/src/stats.rs index 14dbbad..d10ac99 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -166,6 +166,17 @@ impl PipelineStats { } } + /// Record a frame sent from a background WebRTC thread. + /// `gap_ms` is the pre-computed time since the previous send (0.0 = first frame). + /// Unlike `record_send`, this does not sample `Instant::now()`, so it remains + /// accurate even when batch-drained at stats snapshot time. + pub fn record_send_from_thread(&mut self, gap_ms: f64) { + if gap_ms > 0.0 { + self.sent_gaps_ms.push(gap_ms); + } + self.sent_frames += 1; + } + /// Update PipeWire dropped counter (absolute value from AtomicU64). pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) { self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped);