feat(portal): independent WebRTC thread + channel tuning for 60fps mouse latency

- Move WebRTC send to dedicated wl-webrtc-webrtc thread (was inline in main loop)
- Reduce frame_rx 16→1, input_tx 2→1 (drop-on-full), webrtc_tx 32→2
- Recv_timeout 10ms→2ms to reduce pipeline latency
- Fix sent_gap_p95 stats bug: compute gap at actual send time in WebRTC
  thread instead of batch-draining at snapshot time (was always 0.0ms)
- High profile via AVCodecContext.profile, veryfast preset, 5x bitrate
- Stats drain via sent_gap channel with record_send_from_thread()
- Shutdown: drop input_tx → join encode → drop webrtc_tx → join webrtc
This commit is contained in:
dailz
2026-06-07 18:30:09 +08:00
parent caccfec44e
commit 503e4dbc22
4 changed files with 156 additions and 99 deletions

View File

@@ -1330,7 +1330,7 @@ fn create_software_h264_muxer(
enc.set_bit_rate(bitrate as usize); enc.set_bit_rate(bitrate as usize);
enc.set_gop(gop_size); enc.set_gop(gop_size);
enc.set_time_base(ff::Rational::new(1, fps as i32)); enc.set_time_base(ff::Rational::new(1, fps as i32));
enc.set_max_b_frames(0); enc.set_max_b_frames(3);
// SAFETY: global headers are needed by MP4 and harmless for other common muxers. // SAFETY: global headers are needed by MP4 and harmless for other common muxers.
unsafe { unsafe {
@@ -1338,17 +1338,16 @@ fn create_software_h264_muxer(
} }
if codec_name == "libx264" { if codec_name == "libx264" {
// SAFETY: priv_data belongs to the unopened encoder; strings live for each call. // SAFETY: priv_data and codec context belong to the unopened encoder;
// strings live for each av_opt_set call.
unsafe { unsafe {
let key = CString::new("preset").unwrap(); let key = CString::new("preset").unwrap();
let val = CString::new("ultrafast").unwrap(); let val = CString::new("fast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("threads").unwrap(); let key = CString::new("threads").unwrap();
let val = CString::new("6").unwrap(); let val = CString::new("6").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
(*enc.as_mut_ptr()).profile = ffi::AV_PROFILE_H264_HIGH as i32;
} }
} }
@@ -1458,11 +1457,11 @@ fn create_software_h264_encoder(
enc.set_max_b_frames(0); enc.set_max_b_frames(0);
if codec_name == "libx264" { if codec_name == "libx264" {
// SAFETY: priv_data belongs to the unopened encoder context; each // SAFETY: priv_data and codec context belong to the unopened encoder;
// CString lives for the duration of its av_opt_set call. // each CString lives for the duration of its av_opt_set call.
unsafe { unsafe {
let key = CString::new("preset").unwrap(); let key = CString::new("preset").unwrap();
let val = CString::new("ultrafast").unwrap(); let val = CString::new("veryfast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap(); let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap(); let val = CString::new("zerolatency").unwrap();
@@ -1470,6 +1469,9 @@ fn create_software_h264_encoder(
let key = CString::new("threads").unwrap(); let key = CString::new("threads").unwrap();
let val = CString::new("6").unwrap(); let val = CString::new("6").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
// High profile via AVCodecContext.profile (not x264opts — x264 rejects it there).
// High enables CABAC + 8x8dct automatically.
(*enc.as_mut_ptr()).profile = ffi::AV_PROFILE_H264_HIGH as i32;
let key = CString::new("x264opts").unwrap(); let key = CString::new("x264opts").unwrap();
let val = CString::new("repeat_headers=1").unwrap(); let val = CString::new("repeat_headers=1").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
@@ -1479,7 +1481,7 @@ fn create_software_h264_encoder(
let opened = enc let opened = enc
.open() .open()
.map_err(|e| anyhow::anyhow!("Failed to open {codec_name} encoder: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to open {codec_name} encoder: {e}"))?;
tracing::info!("WebRTC encoder: {codec_name} {width}x{height} @ {fps}fps {bitrate}bps"); tracing::info!("WebRTC encoder: {codec_name} {width}x{height} @ {fps}fps {bitrate}bps (profile High, preset veryfast)");
Ok(opened.0) Ok(opened.0)
} }

View File

@@ -98,7 +98,7 @@ impl CapPortal {
/// 执行流程: /// 执行流程:
/// 1. 创建 Tokio 运行时(用于异步 Portal 调用) /// 1. 创建 Tokio 运行时(用于异步 Portal 调用)
/// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id /// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id
/// 3. 创建有界通道(容量 16)用于帧传递 /// 3. 创建有界通道(容量 1用于帧传递(最新帧优先,避免队列积压延迟)
/// 4. 创建 eventfd 对,用于线程安全的关闭信号传递 /// 4. 创建 eventfd 对,用于线程安全的关闭信号传递
/// 5. 启动 PipeWire 捕获线程 /// 5. 启动 PipeWire 捕获线程
pub fn new(args: &Args) -> Result<Self> { pub fn new(args: &Args) -> Result<Self> {
@@ -107,7 +107,7 @@ impl CapPortal {
let no_persist = args.no_persist; let no_persist = args.no_persist;
let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?; let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?;
let (frame_tx, frame_rx) = bounded(16); let (frame_tx, frame_rx) = bounded(1);
let (event_tx, event_rx) = bounded(8); let (event_tx, event_rx) = bounded(8);
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
@@ -722,11 +722,12 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pts, pts,
}; };
if let Err(crossbeam_channel::TrySendError::Full(_)) = frame_tx.try_send(frame) { match frame_tx.try_send(frame) {
let prev = dropped.fetch_add(1, Ordering::Relaxed); Ok(()) => {}
if prev > 0 && prev % 30 == 0 { Err(crossbeam_channel::TrySendError::Full(_)) => {
tracing::warn!("dropped {prev} frames total: encoder backlog"); dropped.fetch_add(1, Ordering::Relaxed);
} }
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {}
} }
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
} }

View File

@@ -33,6 +33,11 @@ struct EncodeThread {
timing_rx: crossbeam_channel::Receiver<EncodeThreadTiming>, timing_rx: crossbeam_channel::Receiver<EncodeThreadTiming>,
} }
struct WebrtcThread {
handle: Option<std::thread::JoinHandle<()>>,
sent_gap_rx: crossbeam_channel::Receiver<f64>,
}
/// 门户模式的主状态机 /// 门户模式的主状态机
/// ///
/// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。
@@ -50,9 +55,8 @@ pub struct StatePortal {
start_time: Option<Instant>, // 编码开始时间 start_time: Option<Instant>, // 编码开始时间
stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照) stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照)
pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算) pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算)
webrtc: Option<WebRtcState>, // WebRTC 状态(仅 WebRTC 模式启用) webrtc: Option<WebRtcState>,
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>, webrtc_thread: Option<WebrtcThread>,
webrtc_frames_sent: u64,
webrtc_paused: Option<Arc<AtomicBool>>, webrtc_paused: Option<Arc<AtomicBool>>,
last_capture_arrival: Option<Instant>, // timestamp of last real frame arrival last_capture_arrival: Option<Instant>, // timestamp of last real frame arrival
stall_start: Option<Instant>, // when current stall began stall_start: Option<Instant>, // when current stall began
@@ -98,8 +102,7 @@ impl StatePortal {
stats: PipelineStats::new(), stats: PipelineStats::new(),
pw_dropped_prev: 0, pw_dropped_prev: 0,
webrtc, webrtc,
webrtc_rx: None, webrtc_thread: None,
webrtc_frames_sent: 0,
webrtc_paused, webrtc_paused,
last_capture_arrival: None, last_capture_arrival: None,
stall_start: None, stall_start: None,
@@ -112,14 +115,10 @@ impl StatePortal {
/// 轮询 PipeWire 事件并编码帧 /// 轮询 PipeWire 事件并编码帧
/// ///
/// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 2ms
/// `block=false` 时使用 try_recv 非阻塞检查。 /// `block=false` 时使用 try_recv 非阻塞检查。
/// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。 /// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。
pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> { pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> {
// 先处理 WebRTC 信令、网络轮询,并转发已编码帧
// WebRTC: process signaling, network, and forward encoded frames
self.poll_webrtc()?;
// 检查 PipeWire 控制事件(流结束 / 错误) // 检查 PipeWire 控制事件(流结束 / 错误)
if let Ok(ctrl) = self.cap.event_receiver().try_recv() { if let Ok(ctrl) = self.cap.event_receiver().try_recv() {
match ctrl { match ctrl {
@@ -147,8 +146,8 @@ impl StatePortal {
// 根据阻塞模式选择不同的帧接收策略 // 根据阻塞模式选择不同的帧接收策略
let frame = if block { let frame = if block {
// 阻塞模式:最多等待 10ms 接收帧 // 阻塞模式:最多等待 2ms 接收帧
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) { match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(2)) {
Ok(frame) => frame, Ok(frame) => frame,
Err(_) => { Err(_) => {
self.record_capture_timeout(); self.record_capture_timeout();
@@ -193,7 +192,7 @@ impl StatePortal {
); );
// 码率:未指定时按分辨率 × 帧率动态计算 // 码率:未指定时按分辨率 × 帧率动态计算
let actual_bitrate = self.args.bitrate.unwrap_or_else(|| { let actual_bitrate = self.args.bitrate.unwrap_or_else(|| {
2 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100 5 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100
}); });
// GOP 大小WebRTC 模式使用更小的 GOPfps/2最低10MP4 模式使用 fps // GOP 大小WebRTC 模式使用更小的 GOPfps/2最低10MP4 模式使用 fps
let actual_gop_size = self.args.gop_size.unwrap_or_else(|| { let actual_gop_size = self.args.gop_size.unwrap_or_else(|| {
@@ -216,8 +215,8 @@ impl StatePortal {
enc_height, enc_height,
self.args.fps, self.args.fps,
)?; )?;
let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(32); let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(2);
let (input_tx, input_rx) = crossbeam_channel::bounded::<CpuNv12Frame>(2); let (input_tx, input_rx) = crossbeam_channel::bounded::<CpuNv12Frame>(1);
let (timing_tx, timing_rx) = crossbeam_channel::bounded::<EncodeThreadTiming>(32); let (timing_tx, timing_rx) = crossbeam_channel::bounded::<EncodeThreadTiming>(32);
let encode = SwEncEncode::new_webrtc( let encode = SwEncEncode::new_webrtc(
enc_width, enc_width,
@@ -233,7 +232,18 @@ impl StatePortal {
.spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?; .spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?;
self.enc_import = Some(import); self.enc_import = Some(import);
self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx }); self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx });
self.webrtc_rx = Some(webrtc_rx);
let wrtc = self.webrtc.take()
.ok_or_else(|| anyhow::anyhow!("internal: WebRtcState missing during init"))?;
let paused = self.webrtc_paused.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal: webrtc_paused missing"))?
.clone();
let fps = self.args.fps;
let (sent_gap_tx, sent_gap_rx) = crossbeam_channel::bounded(64);
let webrtc_handle = std::thread::Builder::new()
.name("wl-webrtc-webrtc".into())
.spawn(move || webrtc_thread_loop(wrtc, webrtc_rx, fps, paused, sent_gap_tx))?;
self.webrtc_thread = Some(WebrtcThread { handle: Some(webrtc_handle), sent_gap_rx });
} else { } else {
// MP4 模式:编码输出写入文件 // MP4 模式:编码输出写入文件
let output_path = self.args.output.as_deref() let output_path = self.args.output.as_deref()
@@ -265,16 +275,24 @@ impl StatePortal {
} }
} }
// 在返回前再次轮询 WebRTC确保本帧编码后的数据及时转发
// WebRTC: drain encoded frames produced by this poll before returning.
self.poll_webrtc()?;
// 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志) // 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志)
if self.args.stats && self.stats.should_snapshot() { if self.args.stats && self.stats.should_snapshot() {
// PipeWire 丢弃帧数CapPortal 尚未暴露 dropped_count(),暂用占位
self.stats.set_pipewire_dropped(0, 0); self.stats.set_pipewire_dropped(0, 0);
let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0); self.stats.set_queue_depths(0, 0);
self.stats.set_queue_depths(0, enc_q); if let Some(ref enc_thread) = self.enc_thread {
while let Ok(timing) = enc_thread.timing_rx.try_recv() {
self.stats.record_encode_thread(
timing.sws_us,
timing.encode_us,
timing.output_bytes,
);
}
}
if let Some(ref webrtc_thread) = self.webrtc_thread {
while let Ok(gap_ms) = webrtc_thread.sent_gap_rx.try_recv() {
self.stats.record_send_from_thread(gap_ms);
}
}
let snap = self.stats.snapshot_and_reset(); let snap = self.stats.snapshot_and_reset();
if self.filler_frames_sent > 0 { if self.filler_frames_sent > 0 {
tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent); tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent);
@@ -316,7 +334,7 @@ impl StatePortal {
} }
fn maybe_send_filler_frame(&mut self) { fn maybe_send_filler_frame(&mut self) {
if self.webrtc.is_none() || self.stall_start.is_none() { if self.webrtc_thread.is_none() || self.stall_start.is_none() {
return; return;
} }
let Some(cached) = &self.last_fillable_frame else { let Some(cached) = &self.last_fillable_frame else {
@@ -509,8 +527,8 @@ impl StatePortal {
self.frames_encoded += 1; self.frames_encoded += 1;
self.last_fillable_frame = Some(fillable_frame); self.last_fillable_frame = Some(fillable_frame);
} }
Err(crossbeam_channel::TrySendError::Full(_frame)) => { Err(crossbeam_channel::TrySendError::Full(_)) => {
tracing::warn!("Encode thread input full, dropping portal frame"); tracing::debug!("Encode thread input full, dropping portal frame");
} }
Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => { Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => {
tracing::error!("Encode thread input disconnected"); tracing::error!("Encode thread input disconnected");
@@ -529,9 +547,7 @@ impl StatePortal {
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
pub fn shutdown(&mut self) { pub fn shutdown(&mut self) {
self.last_fillable_frame = None; self.last_fillable_frame = None;
// 先 drop receiver使 flush() 中的 try_send() 立即返回 Disconnected // 1. Stop encode thread (drops webrtc_tx → signals WebRTC thread to exit)
// 而非在满通道上阻塞(修复 issue #8 死锁)
self.webrtc_rx = None;
if let Some(mut enc_thread) = self.enc_thread.take() { if let Some(mut enc_thread) = self.enc_thread.take() {
drop(enc_thread.input_tx); drop(enc_thread.input_tx);
if let Some(handle) = enc_thread.handle.take() { if let Some(handle) = enc_thread.handle.take() {
@@ -541,6 +557,15 @@ impl StatePortal {
} }
} }
self.enc_import = None; self.enc_import = None;
// 2. Wait for WebRTC thread (exits when webrtc_tx is dropped by encode thread)
if let Some(mut webrtc_thread) = self.webrtc_thread.take() {
if let Some(handle) = webrtc_thread.handle.take() {
if handle.join().is_err() {
tracing::error!("WebRTC thread panicked during shutdown");
}
}
}
// 3. Flush MP4 encoder if present
if let Some(mut enc) = self.enc.take() { if let Some(mut enc) = self.enc.take() {
if let Err(e) = enc.flush() { if let Err(e) = enc.flush() {
tracing::error!("Flush error during shutdown: {e}"); tracing::error!("Flush error during shutdown: {e}");
@@ -565,60 +590,6 @@ impl StatePortal {
pub fn is_errored(&self) -> bool { pub fn is_errored(&self) -> bool {
self.errored self.errored
} }
/// 轮询 WebRTC 信令通道并转发编码帧
///
/// 处理信令交换、网络轮询,以及从编码通道中取出已编码的 H.264 数据
/// 并通过 WebRTC 发送。
fn poll_webrtc(&mut self) -> Result<()> {
let Some(ref mut wrtc) = self.webrtc else { return Ok(()) };
wrtc.handle_signaling()?;
wrtc.poll_and_feed()?;
let connected = wrtc.is_connected();
if let Some(ref paused) = self.webrtc_paused {
let was_paused = paused.load(Ordering::Relaxed);
let now_paused = !connected;
if was_paused && !now_paused {
tracing::info!("WebRTC client connected, resuming encoding");
} else if !was_paused && now_paused {
tracing::warn!("WebRTC client disconnected, pausing encoding");
}
paused.store(now_paused, Ordering::Relaxed);
}
if let Some(ref rx) = self.webrtc_rx {
let mut count = 0u32;
while let Ok(data) = rx.try_recv() {
if !connected {
continue;
}
count += 1;
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
self.stats.record_send(0.0, None);
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
}
if count > 0 {
tracing::debug!("WebRTC forwarded {count} frames from channel");
}
}
if let Some(ref enc_thread) = self.enc_thread {
while let Ok(timing) = enc_thread.timing_rx.try_recv() {
self.stats.record_encode_thread(
timing.sws_us,
timing.encode_us,
timing.output_bytes,
);
}
}
Ok(())
}
} }
fn encode_thread_loop( fn encode_thread_loop(
@@ -657,6 +628,78 @@ fn encode_thread_loop(
tracing::info!("Encode thread exiting"); tracing::info!("Encode thread exiting");
} }
fn webrtc_thread_loop(
mut wrtc: WebRtcState,
webrtc_rx: crossbeam_channel::Receiver<Vec<u8>>,
fps: u32,
paused: Arc<AtomicBool>,
sent_gap_tx: crossbeam_channel::Sender<f64>,
) {
let mut frames_sent: u64 = 0;
let mut last_send: Option<std::time::Instant> = None;
let timeout = Duration::from_millis(1);
loop {
if let Err(e) = wrtc.handle_signaling() {
tracing::error!("WebRTC signaling error: {e}");
break;
}
if let Err(e) = wrtc.poll_and_feed() {
tracing::error!("WebRTC poll error: {e}");
break;
}
let connected = wrtc.is_connected();
let was_paused = paused.load(Ordering::Relaxed);
let now_paused = !connected;
if was_paused && !now_paused {
tracing::info!("WebRTC client connected, resuming encoding");
} else if !was_paused && now_paused {
tracing::warn!("WebRTC client disconnected, pausing encoding");
}
paused.store(now_paused, Ordering::Relaxed);
if connected {
while let Ok(data) = webrtc_rx.try_recv() {
if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
frames_sent = frames_sent.saturating_add(1);
let gap_ms = last_send
.map(|l| l.elapsed().as_secs_f64() * 1000.0)
.unwrap_or(0.0);
last_send = Some(std::time::Instant::now());
let _ = sent_gap_tx.try_send(gap_ms);
}
} else {
while webrtc_rx.try_recv().is_ok() {}
}
match webrtc_rx.recv_timeout(timeout) {
Ok(data) => {
if wrtc.is_connected() {
if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
frames_sent = frames_sent.saturating_add(1);
let gap_ms = last_send
.map(|l| l.elapsed().as_secs_f64() * 1000.0)
.unwrap_or(0.0);
last_send = Some(std::time::Instant::now());
let _ = sent_gap_tx.try_send(gap_ms);
}
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
tracing::info!("WebRTC channel disconnected, exiting thread");
return;
}
}
}
tracing::info!("WebRTC thread exiting");
}
impl Drop for StatePortal { impl Drop for StatePortal {
// 析构时自动调用 shutdown确保编码器被刷新、资源被释放 // 析构时自动调用 shutdown确保编码器被刷新、资源被释放
fn drop(&mut self) { fn drop(&mut self) {

View File

@@ -166,6 +166,17 @@ impl PipelineStats {
} }
} }
/// Record a frame sent from a background WebRTC thread.
/// `gap_ms` is the pre-computed time since the previous send (0.0 = first frame).
/// Unlike `record_send`, this does not sample `Instant::now()`, so it remains
/// accurate even when batch-drained at stats snapshot time.
pub fn record_send_from_thread(&mut self, gap_ms: f64) {
if gap_ms > 0.0 {
self.sent_gaps_ms.push(gap_ms);
}
self.sent_frames += 1;
}
/// Update PipeWire dropped counter (absolute value from AtomicU64). /// Update PipeWire dropped counter (absolute value from AtomicU64).
pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) { pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) {
self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped); self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped);