From 826f544569494477bd222deb8f0d4a8b68f64552 Mon Sep 17 00:00:00 2001 From: dailz Date: Sun, 7 Jun 2026 16:55:28 +0800 Subject: [PATCH] feat(portal): async encode pipeline - decouple capture from encoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split synchronous encode pipeline so sws_scale + libx264 runs on a dedicated thread, leaving only VAAPI import + GPU scale + GPU→CPU transfer on the main capture thread. Problem: encode_p95 occasionally hit 74ms, blocking the entire capture pipeline and causing capture_gap_max=356ms stutter. Solution: - avhw.rs: Split SwEncState into SwEncImport (main thread: VAAPI import, filter_graph scale, GPU→CPU transfer) and SwEncEncode (encode thread: sws_scale NV12→YUV420P, libx264 encode). New CpuNv12Frame struct carries owned pixel data across threads via crossbeam channel. SwEncState wraps both for backward compat (MP4/sync path untouched). - state_portal.rs: WebRTC portal path spawns 'wl-webrtc-encode' thread with bounded(2) input channel (drop-newest backpressure) and separate timing channel. Graceful shutdown: drop webrtc_rx → drop input_tx → join encode thread → flush sync encoder. - stats.rs: Add record_import() + record_encode_thread() for async timing. Results: encode_p95 stable at 2.9-4.2ms (was 11-74ms), capture_fps stable 59-60fps, cap_gap_p95 17-19ms. Remaining capture stalls traced to PipeWire compositor frame delivery (external, not our code). --- Cargo.lock | 13 + Cargo.toml | 2 +- src/avhw.rs | 496 +++++++++++++++++++++++----------- src/backend_detect.rs | 1 + src/bin/sw_encode_bench.rs | 1 + src/bin/vaapi_import_bench.rs | 1 + src/main.rs | 24 +- src/state_portal.rs | 259 +++++++++++++----- 8 files changed, 561 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc87f7b..83adb4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.8.0" @@ -2025,10 +2034,14 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index d96c203..de356ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ signal-hook = "0.3" signal-hook-mio = { version = "0.2", features = ["support-v1_0"] } clap = { version = "4", features = ["derive"] } tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } anyhow = "1" drm = "0.12" drm-fourcc = "2" diff --git a/src/avhw.rs b/src/avhw.rs index aebc5b8..ad5dbf6 100644 --- a/src/avhw.rs +++ b/src/avhw.rs @@ -3,6 +3,7 @@ use std::mem; use std::os::fd::{AsRawFd, RawFd}; use std::os::raw::c_void; use std::path::Path; +use std::slice; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::ptr; @@ -45,8 +46,9 @@ impl AvHwDevCtx { }; if ret < 0 { bail!( - "Failed to create VAAPI device context from {}: error {ret}", - drm_device.display() + "Failed to create VAAPI device context from {}: {}", + drm_device.display(), + ff_err(ret) ); } Ok(Self { ptr: p }) @@ -108,7 +110,7 @@ impl AvHwFrameCtx { if ret < 0 { // SAFETY: p is valid but init failed; clean up. unsafe { ffi::av_buffer_unref(&mut p) }; - bail!("av_hwframe_ctx_init failed: error {ret}"); + bail!("av_hwframe_ctx_init failed: {}", ff_err(ret)); } Ok(Self { ptr: p }) } @@ -244,8 +246,7 @@ pub unsafe fn import_dma_buf_to_vaapi( ) }; if ret < 0 { - let err_str = av_err_to_string(ret); - bail!("av_hwframe_map failed: error {ret} ({err_str})"); + bail!("av_hwframe_map failed: {}", ff_err(ret)); } Ok(dst) @@ -259,7 +260,8 @@ unsafe extern "C" fn cleanup_drm_descriptor(_opaque: *mut c_void, data: *mut u8) let _ = Box::from_raw(data as *mut ffi::AVDRMFrameDescriptor); } -fn av_err_to_string(err: i32) -> String { +/// Convert an FFmpeg error code to a human-readable string. +pub(crate) fn av_err_to_string(err: i32) -> String { let mut buf = vec![0u8; 128]; // SAFETY: buf points to 128 writable bytes and lives for the duration of // av_strerror. @@ -270,6 +272,12 @@ fn av_err_to_string(err: i32) -> String { .trim_end_matches('\0') .to_string() } + +/// Format an FFmpeg error code with both numeric value and description. +/// Example output: "error -22 (Invalid argument)" +pub(crate) fn ff_err(ret: i32) -> String { + format!("error {ret} ({})", av_err_to_string(ret)) +} // --------------------------------------------------------------------------- // EncState // --------------------------------------------------------------------------- @@ -326,7 +334,7 @@ impl EncState { transform, )?; - let mut sink_ctx = video_filter.get("out").unwrap(); + let mut sink_ctx = video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?; // SAFETY: sink_ctx is a live buffersink; the returned hw_frames_ctx is // borrowed, so av_buffer_ref creates an owned reference. let sink_hw_frames = unsafe { @@ -366,10 +374,19 @@ impl EncState { enc.set_width(enc_width); enc.set_height(enc_height); enc.set_format(ff::format::Pixel::VAAPI); - enc.set_bit_rate(bitrate as usize); - enc.set_gop(gop_size); - enc.set_time_base(ff::Rational::new(1, fps as i32)); - enc.set_max_b_frames(0); + enc.set_bit_rate(bitrate as usize); + enc.set_gop(gop_size); + enc.set_time_base(ff::Rational::new(1, fps as i32)); + enc.set_max_b_frames(0); + + // VBV rate limiting: caps IDR burst size for WebRTC. Without this a 4K + // scene change can produce a 256KB keyframe that overflows the UDP send + // buffer. bufsize=bitrate/4 ≈ 250ms of video at the target bitrate. + unsafe { + let ctx_ptr = enc.as_mut_ptr(); + (*ctx_ptr).rc_max_rate = bitrate as i64; + (*ctx_ptr).rc_buffer_size = (bitrate / 4) as i32; + } // SAFETY: AV_CODEC_FLAG_GLOBAL_HEADER must be set BEFORE opening the encoder. // It triggers SPS/PPS extradata generation needed by the muxer for @@ -395,7 +412,7 @@ impl EncState { ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0) }; if ret < 0 { - tracing::warn!("av_opt_set repeat_pps failed (error {ret}), likely FFmpeg < 7.0; continuing without per-frame PPS"); + tracing::warn!("av_opt_set repeat_pps failed ({}), likely FFmpeg < 7.0; continuing without per-frame PPS", ff_err(ret)); } } @@ -420,7 +437,7 @@ impl EncState { ) }; if ret < 0 || fmt_ctx_ptr.is_null() { - bail!("Failed to allocate output format context: error {ret}"); + bail!("Failed to allocate output format context: {}", ff_err(ret)); } // SAFETY: avformat_query_codec checks codec+format compatibility. @@ -444,7 +461,7 @@ impl EncState { ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr()) }; if ret < 0 { - bail!("Failed to copy encoder parameters to stream: error {ret}"); + bail!("Failed to copy encoder parameters to stream: {}", ff_err(ret)); } // SAFETY: Copy encoder time_base to stream. @@ -462,15 +479,16 @@ impl EncState { }; if ret < 0 { bail!( - "Failed to open output file '{}': error {ret}", - output_path.display() + "Failed to open output file '{}': {}", + output_path.display(), + ff_err(ret) ); } // SAFETY: avformat_write_header writes the container header. let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) }; if ret < 0 { - bail!("Failed to write output header: error {ret}"); + bail!("Failed to write output header: {}", ff_err(ret)); } // SAFETY: We created fmt_ctx_ptr above and it's valid. @@ -492,9 +510,9 @@ impl EncState { } pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> { - let mut filter_src_ctx = self.video_filter.get("in").unwrap(); + let mut filter_src_ctx = self.video_filter.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?; let mut filter_src = filter_src_ctx.source(); - let mut filter_sink_ctx = self.video_filter.get("out").unwrap(); + let mut filter_sink_ctx = self.video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?; let mut filter_sink = filter_sink_ctx.sink(); // SAFETY: hw_frame is a valid VAAPI hardware frame from capture. @@ -524,7 +542,7 @@ impl EncState { let ret = unsafe { ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), filtered.as_ptr()) }; if ret < 0 { - bail!("avcodec_send_frame failed: error {ret}"); + bail!("avcodec_send_frame failed: {}", ff_err(ret)); } self.drain_encoder(start_ts)?; } @@ -534,12 +552,14 @@ impl EncState { pub fn flush(&mut self) -> Result<()> { // Flush filter graph - let mut filter_src_ctx = self.video_filter.get("in").unwrap(); + let mut filter_src_ctx = self.video_filter.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?; let mut filter_src = filter_src_ctx.source(); - let _ = filter_src.flush(); + if let Err(e) = filter_src.flush() { + tracing::debug!("filter source flush error: {e}"); + } // Drain filter - let mut filter_sink_ctx = self.video_filter.get("out").unwrap(); + let mut filter_sink_ctx = self.video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?; let mut filter_sink = filter_sink_ctx.sink(); loop { let mut filtered = ff::frame::Video::empty(); @@ -552,7 +572,7 @@ impl EncState { ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), filtered.as_ptr()) }; if ret < 0 { - bail!("avcodec_send_frame failed during flush: error {ret}"); + bail!("avcodec_send_frame failed during flush: {}", ff_err(ret)); } self.drain_encoder(start_ts)?; } @@ -589,7 +609,7 @@ impl EncState { if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF { break; } - bail!("avcodec_receive_packet failed: error {ret}"); + bail!("avcodec_receive_packet failed: {}", ff_err(ret)); } // Rescale timestamps from encoder time_base to stream time_base @@ -633,39 +653,34 @@ pub enum FrameOutput { Channel(crossbeam_channel::Sender>), } -pub struct SwEncState { +/// Owned CPU NV12 frame data for cross-thread transfer. +/// Produced by main thread (VAAPI import + GPU scale + transfer), consumed by encode thread. +pub struct CpuNv12Frame { + pub y_data: Vec, + pub uv_data: Vec, + pub y_stride: usize, + pub uv_stride: usize, + pub pts: i64, +} + +pub struct SwEncImport { hw_dev: AvHwDevCtx, frames_rgb: AvHwFrameCtx, filter_graph: ff::filter::Graph, - sws_ctx: *mut ffi::SwsContext, - enc_video: ff::codec::encoder::video::Video, - output: Option, - yuv_frame: *mut ffi::AVFrame, - starting_timestamp: Option, - frames_written: bool, - webrtc_disconnected: bool, - webrtc_paused: Option>, + enc_width: u32, + enc_height: u32, } -unsafe impl Send for SwEncState {} - -impl SwEncState { +impl SwEncImport { #[allow(clippy::too_many_arguments)] pub fn new( drm_device: &Path, - output_path: &Path, width: u32, height: u32, enc_width: u32, enc_height: u32, fps: u32, - bitrate: u64, - gop_size: u32, ) -> Result { - tracing::info!( - "SwEncState::new: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264" - ); - let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?; let frames_rgb = AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?; @@ -679,88 +694,32 @@ impl SwEncState { fps, )?; - let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?; - let (enc_video, octx) = - create_software_h264_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?; - let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?; - Ok(Self { hw_dev, frames_rgb, filter_graph, - sws_ctx, - enc_video, - output: Some(FrameOutput::Muxer(octx)), - yuv_frame, - starting_timestamp: None, - frames_written: false, - webrtc_disconnected: false, - webrtc_paused: None, - }) - } - - #[allow(clippy::too_many_arguments)] - pub fn new_webrtc( - drm_device: &Path, - width: u32, - height: u32, - enc_width: u32, - enc_height: u32, - fps: u32, - bitrate: u64, - gop_size: u32, - tx: crossbeam_channel::Sender>, - webrtc_paused: Arc, - ) -> Result { - tracing::info!( - "SwEncState::new_webrtc: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264 -> WebRTC" - ); - - let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?; - let frames_rgb = - AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?; - let filter_graph = build_swenc_filter_graph( - &hw_dev, - &frames_rgb, - width, - height, enc_width, enc_height, - fps, - )?; - let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?; - let enc_video = create_software_h264_encoder(enc_width, enc_height, fps, bitrate, gop_size)?; - let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?; - - Ok(Self { - hw_dev, - frames_rgb, - filter_graph, - sws_ctx, - enc_video, - output: Some(FrameOutput::Channel(tx)), - yuv_frame, - starting_timestamp: None, - frames_written: false, - webrtc_disconnected: false, - webrtc_paused: Some(webrtc_paused), }) } pub fn frames_rgb(&self) -> &AvHwFrameCtx { + let _ = self.hw_dev.as_ptr(); &self.frames_rgb } - pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> { - let mut filter_src_ctx = self.filter_graph.get("in").unwrap(); + pub fn import_and_scale(&mut self, hw_frame: &ff::frame::Video) -> Result { + let mut filter_src_ctx = self.filter_graph.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?; let mut filter_src = filter_src_ctx.source(); - let mut filter_sink_ctx = self.filter_graph.get("out").unwrap(); + let mut filter_sink_ctx = self.filter_graph.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?; let mut filter_sink = filter_sink_ctx.sink(); filter_src .add(hw_frame) .map_err(|e| anyhow::anyhow!("software pipeline filter source add failed: {e}"))?; + let mut first = None; + let mut extra_count = 0usize; loop { let mut filtered = ff::frame::Video::empty(); match filter_sink.frame(&mut filtered) { @@ -768,61 +727,47 @@ impl SwEncState { if filtered.pts().is_none() { filtered.set_pts(hw_frame.pts()); } - self.encode_filtered_frame(&filtered)?; + let cpu_frame = self.transfer_filtered_to_cpu(&filtered)?; + if first.is_none() { + first = Some(cpu_frame); + } else { + extra_count += 1; + } } Err(ff::Error::Other { errno }) if errno == ffi::EAGAIN => break, Err(e) => bail!("software pipeline filter sink get frame failed: {e}"), } } - Ok(()) + if extra_count > 0 { + tracing::warn!("software import filter produced {extra_count} extra frame(s); dropping extras"); + } + + first.ok_or_else(|| anyhow::anyhow!("software pipeline produced no scaled frame")) } - pub fn flush(&mut self) -> Result<()> { - let mut filter_src_ctx = self.filter_graph.get("in").unwrap(); + pub fn flush_import(&mut self) -> Result> { + let mut filter_src_ctx = self.filter_graph.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?; let mut filter_src = filter_src_ctx.source(); - let _ = filter_src.flush(); + if let Err(e) = filter_src.flush() { + tracing::debug!("filter source flush error: {e}"); + } - let mut filter_sink_ctx = self.filter_graph.get("out").unwrap(); + let mut filter_sink_ctx = self.filter_graph.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?; let mut filter_sink = filter_sink_ctx.sink(); + let mut frames = Vec::new(); loop { let mut filtered = ff::frame::Video::empty(); match filter_sink.frame(&mut filtered) { - Ok(()) => self.encode_filtered_frame(&filtered)?, + Ok(()) => frames.push(self.transfer_filtered_to_cpu(&filtered)?), Err(_) => break, } } - // SAFETY: Sending a null frame flushes the opened software encoder; - // no frame data is dereferenced. enc_video is exclusively borrowed via &mut self. - unsafe { - let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), ptr::null()); - if ret < 0 && ret != ffi::AVERROR_EOF { - bail!("software encoder flush send failed: error {ret}"); - } - } - let start_ts = self.starting_timestamp.unwrap_or(0); - self.drain_encoder(start_ts)?; - - if self.frames_written { - if let Some(FrameOutput::Muxer(ref mut octx)) = self.output { - octx.write_trailer() - .map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?; - } - } - - Ok(()) + Ok(frames) } - fn encode_filtered_frame(&mut self, filtered: &ff::frame::Video) -> Result<()> { - if self.webrtc_disconnected { - return Ok(()); - } - if let Some(ref paused) = self.webrtc_paused { - if paused.load(Ordering::Relaxed) { - return Ok(()); - } - } + fn transfer_filtered_to_cpu(&self, filtered: &ff::frame::Video) -> Result { // SAFETY: av_frame_alloc returns a newly allocated AVFrame or null, // which is checked below. let mut sw_nv12 = unsafe { ffi::av_frame_alloc() }; @@ -837,32 +782,171 @@ impl SwEncState { // SAFETY: sw_nv12 was allocated above and has not been freed yet. unsafe { ffi::av_frame_free(&mut sw_nv12) }; bail!( - "av_hwframe_transfer_data failed for GPU-downscaled frame: error {transfer_ret} ({})", - av_err_to_string(transfer_ret) + "av_hwframe_transfer_data failed for GPU-downscaled frame: {}", + ff_err(transfer_ret) ); } + // SAFETY: sw_nv12 was filled by av_hwframe_transfer_data. NV12 planes 0 and 1 are + // initialized for enc_width x enc_height; linesize values define each row's byte span. + let frame = unsafe { + let y_ptr = (*sw_nv12).data[0]; + let uv_ptr = (*sw_nv12).data[1]; + if y_ptr.is_null() || uv_ptr.is_null() { + ffi::av_frame_free(&mut sw_nv12); + bail!("NV12 transfer frame missing Y/UV plane data"); + } + let y_stride = (*sw_nv12).linesize[0] as usize; + let uv_stride = (*sw_nv12).linesize[1] as usize; + if (*sw_nv12).width != self.enc_width as i32 || (*sw_nv12).height != self.enc_height as i32 { + ffi::av_frame_free(&mut sw_nv12); + bail!("NV12 transfer frame has unexpected dimensions"); + } + let y_len = y_stride * self.enc_height as usize; + let uv_len = uv_stride * (self.enc_height as usize / 2); + let y_data = slice::from_raw_parts(y_ptr, y_len).to_vec(); + let uv_data = slice::from_raw_parts(uv_ptr, uv_len).to_vec(); + let pts = filtered.pts().unwrap_or(0); + ffi::av_frame_free(&mut sw_nv12); + CpuNv12Frame { + y_data, + uv_data, + y_stride, + uv_stride, + pts, + } + }; + + Ok(frame) + } +} + +pub struct SwEncEncode { + sws_ctx: *mut ffi::SwsContext, + enc_video: ff::codec::encoder::video::Video, + output: Option, + yuv_frame: *mut ffi::AVFrame, + starting_timestamp: Option, + frames_written: bool, + webrtc_disconnected: bool, + webrtc_paused: Option>, + enc_width: u32, + enc_height: u32, +} + +// SAFETY: SwEncEncode owns sws_ctx/yuv_frame/enc_video exclusively after construction. +// It is moved to a single encode thread and only accessed through &mut self there. +unsafe impl Send for SwEncEncode {} + +impl SwEncEncode { + #[allow(clippy::too_many_arguments)] + fn new_muxer( + output_path: &Path, + enc_width: u32, + enc_height: u32, + fps: u32, + bitrate: u64, + gop_size: u32, + ) -> Result { + let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?; + let (enc_video, octx) = + create_software_h264_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?; + let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?; + + Ok(Self { + sws_ctx, + enc_video, + output: Some(FrameOutput::Muxer(octx)), + yuv_frame, + starting_timestamp: None, + frames_written: false, + webrtc_disconnected: false, + webrtc_paused: None, + enc_width, + enc_height, + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_webrtc( + enc_width: u32, + enc_height: u32, + fps: u32, + bitrate: u64, + gop_size: u32, + tx: crossbeam_channel::Sender>, + webrtc_paused: Arc, + ) -> Result { + let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?; + let enc_video = create_software_h264_encoder(enc_width, enc_height, fps, bitrate, gop_size)?; + let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?; + + Ok(Self { + sws_ctx, + enc_video, + output: Some(FrameOutput::Channel(tx)), + yuv_frame, + starting_timestamp: None, + frames_written: false, + webrtc_disconnected: false, + webrtc_paused: Some(webrtc_paused), + enc_width, + enc_height, + }) + } + + pub fn flush(&mut self) -> Result<()> { + // SAFETY: Sending a null frame flushes the opened software encoder; + // no frame data is dereferenced. enc_video is exclusively borrowed via &mut self. + unsafe { + let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), ptr::null()); + if ret < 0 && ret != ffi::AVERROR_EOF { + bail!("software encoder flush send failed: {}", ff_err(ret)); + } + } + let start_ts = self.starting_timestamp.unwrap_or(0); + self.drain_encoder(start_ts)?; + + Ok(()) + } + + pub fn encode_cpu_frame(&mut self, frame: &CpuNv12Frame) -> Result<()> { + if self.webrtc_disconnected { + return Ok(()); + } + if frame.y_stride < self.enc_width as usize || frame.uv_stride < self.enc_width as usize { + bail!("CPU NV12 frame stride is smaller than encoder width"); + } + if let Some(ref paused) = self.webrtc_paused { + if paused.load(Ordering::Relaxed) { + return Ok(()); + } + } + // SAFETY: yuv_frame is an owned reusable YUV420P frame at the same dimensions as sw_nv12; // sws_ctx was created for NV12 -> YUV420P with no resize, so sws_scale only converts format. unsafe { let ret = ffi::av_frame_make_writable(self.yuv_frame); if ret < 0 { - ffi::av_frame_free(&mut sw_nv12); - bail!("av_frame_make_writable failed: error {ret}"); + bail!("av_frame_make_writable failed: {}", ff_err(ret)); } - ffi::sws_scale( + let src_slices = [frame.y_data.as_ptr(), frame.uv_data.as_ptr(), ptr::null(), ptr::null()]; + let src_strides = [frame.y_stride as i32, frame.uv_stride as i32, 0, 0]; + let scaled = ffi::sws_scale( self.sws_ctx, - (*sw_nv12).data.as_ptr() as *const *const u8, - (*sw_nv12).linesize.as_ptr() as *const i32, + src_slices.as_ptr(), + src_strides.as_ptr(), 0, - (*sw_nv12).height, + self.enc_height as i32, (*self.yuv_frame).data.as_ptr() as *mut *mut u8, (*self.yuv_frame).linesize.as_ptr() as *const i32, ); - ffi::av_frame_free(&mut sw_nv12); + if scaled < 0 { + bail!("sws_scale failed for software encoder: {scaled}"); + } } - let pts = filtered.pts().unwrap_or(0); + let pts = frame.pts; if self.starting_timestamp.is_none() { self.starting_timestamp = Some(pts); } @@ -873,13 +957,23 @@ impl SwEncState { (*self.yuv_frame).pts = pts; let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), self.yuv_frame); if ret < 0 { - bail!("avcodec_send_frame failed for software encoder: error {ret}"); + bail!("avcodec_send_frame failed for software encoder: {}", ff_err(ret)); } } self.drain_encoder(start_ts) } + fn write_trailer_if_needed(&mut self) -> Result<()> { + if self.frames_written { + if let Some(FrameOutput::Muxer(ref mut octx)) = self.output { + octx.write_trailer() + .map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?; + } + } + Ok(()) + } + fn drain_encoder(&mut self, start_ts: i64) -> Result<()> { loop { let mut pkt = ff::Packet::empty(); @@ -891,7 +985,7 @@ impl SwEncState { if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF { break; } - bail!("avcodec_receive_packet failed: error {ret}"); + bail!("avcodec_receive_packet failed: {}", ff_err(ret)); } match self.output { @@ -961,7 +1055,7 @@ impl SwEncState { } } -impl Drop for SwEncState { +impl Drop for SwEncEncode { fn drop(&mut self) { if !self.sws_ctx.is_null() { // SAFETY: sws_ctx is owned by this state and was returned by sws_getContext. @@ -975,6 +1069,83 @@ impl Drop for SwEncState { } } +pub struct SwEncState { + import: SwEncImport, + encode: SwEncEncode, +} + +// SAFETY: SwEncState owns import and encode state exclusively and existing sync callers move it +// between threads only with external serialization; all FFI handles are accessed through &mut self. +unsafe impl Send for SwEncState {} + +impl SwEncState { + #[allow(clippy::too_many_arguments)] + pub fn new( + drm_device: &Path, + output_path: &Path, + width: u32, + height: u32, + enc_width: u32, + enc_height: u32, + fps: u32, + bitrate: u64, + gop_size: u32, + ) -> Result { + tracing::info!( + "SwEncState::new: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264" + ); + let import = SwEncImport::new(drm_device, width, height, enc_width, enc_height, fps)?; + let encode = SwEncEncode::new_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?; + Ok(Self { import, encode }) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_webrtc( + drm_device: &Path, + width: u32, + height: u32, + enc_width: u32, + enc_height: u32, + fps: u32, + bitrate: u64, + gop_size: u32, + tx: crossbeam_channel::Sender>, + webrtc_paused: Arc, + ) -> Result { + tracing::info!( + "SwEncState::new_webrtc: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264 -> WebRTC" + ); + let import = SwEncImport::new(drm_device, width, height, enc_width, enc_height, fps)?; + let encode = SwEncEncode::new_webrtc( + enc_width, + enc_height, + fps, + bitrate, + gop_size, + tx, + webrtc_paused, + )?; + Ok(Self { import, encode }) + } + + pub fn frames_rgb(&self) -> &AvHwFrameCtx { + self.import.frames_rgb() + } + + pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> { + let cpu_frame = self.import.import_and_scale(hw_frame)?; + self.encode.encode_cpu_frame(&cpu_frame) + } + + pub fn flush(&mut self) -> Result<()> { + for frame in self.import.flush_import()? { + self.encode.encode_cpu_frame(&frame)?; + } + self.encode.flush()?; + self.encode.write_trailer_if_needed() + } +} + // --------------------------------------------------------------------------- // Shared encoder creation (used by both wlr-screencopy and portal paths) // --------------------------------------------------------------------------- @@ -1065,7 +1236,7 @@ fn build_swenc_filter_graph( let ret = ffi::av_buffersrc_parameters_set(src_ctx.as_mut_ptr(), par); ffi::av_free(par as *mut _); if ret < 0 { - bail!("av_buffersrc_parameters_set failed: error {ret}"); + bail!("av_buffersrc_parameters_set failed: {}", ff_err(ret)); } } @@ -1124,7 +1295,7 @@ fn alloc_yuv420p_frame(width: u32, height: u32) -> Result<*mut ffi::AVFrame> { let ret = ffi::av_frame_get_buffer(frame, 0); if ret < 0 { ffi::av_frame_free(&mut frame); - bail!("av_frame_get_buffer failed: error {ret}"); + bail!("av_frame_get_buffer failed: {}", ff_err(ret)); } Ok(frame) } @@ -1212,7 +1383,7 @@ fn create_software_h264_muxer( ) }; if ret < 0 || fmt_ctx_ptr.is_null() { - bail!("Failed to allocate output format context: error {ret}"); + bail!("Failed to allocate output format context: {}", ff_err(ret)); } // SAFETY: fmt_ctx_ptr is valid; stream and codec parameters are owned by the format context. @@ -1225,7 +1396,7 @@ fn create_software_h264_muxer( let ret = unsafe { ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr()) }; if ret < 0 { - bail!("Failed to copy codec parameters to stream: error {ret}"); + bail!("Failed to copy codec parameters to stream: {}", ff_err(ret)); } // SAFETY: stream_ptr is valid and writable during muxer setup. unsafe { @@ -1242,8 +1413,9 @@ fn create_software_h264_muxer( ); if ret < 0 { bail!( - "Failed to open output file '{}': error {ret}", - output_path.display() + "Failed to open output file '{}': {}", + output_path.display(), + ff_err(ret) ); } } @@ -1252,7 +1424,7 @@ fn create_software_h264_muxer( // SAFETY: fmt_ctx_ptr is fully configured. let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) }; if ret < 0 { - bail!("Failed to write output header: error {ret}"); + bail!("Failed to write output header: {}", ff_err(ret)); } // SAFETY: ownership of fmt_ctx_ptr transfers to ffmpeg-next Output wrapper. @@ -1361,7 +1533,7 @@ fn build_filter_graph( let ret = ffi::av_buffersrc_parameters_set(src_ctx.as_mut_ptr(), par); ffi::av_free(par as *mut _); if ret < 0 { - bail!("av_buffersrc_parameters_set failed: error {ret}"); + bail!("av_buffersrc_parameters_set failed: {}", ff_err(ret)); } } diff --git a/src/backend_detect.rs b/src/backend_detect.rs index 38ea435..01fea98 100644 --- a/src/backend_detect.rs +++ b/src/backend_detect.rs @@ -190,6 +190,7 @@ mod tests { backend: backend.map(String::from), port: 0, no_persist: false, + stats: false, } } diff --git a/src/bin/sw_encode_bench.rs b/src/bin/sw_encode_bench.rs index a1bcb5d..46d010d 100644 --- a/src/bin/sw_encode_bench.rs +++ b/src/bin/sw_encode_bench.rs @@ -114,6 +114,7 @@ fn main() -> Result<()> { backend: Some("portal".to_string()), port: 0, no_persist: false, + stats: false, }; let cap = CapPortal::new(&portal_args)?; diff --git a/src/bin/vaapi_import_bench.rs b/src/bin/vaapi_import_bench.rs index 0e2e025..91272bb 100644 --- a/src/bin/vaapi_import_bench.rs +++ b/src/bin/vaapi_import_bench.rs @@ -883,6 +883,7 @@ fn main() -> Result<()> { backend: Some("portal".to_string()), port: 0, no_persist: false, + stats: false, }; let cap = CapPortal::new(&portal_args)?; diff --git a/src/main.rs b/src/main.rs index 8ea5840..6557511 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ mod backend_detect; // 截屏后端自动检测(wlroots vs Portal/PipeWire) mod cap_portal; // XDG Portal 屏幕捕获 mod cap_wlr_screencopy; // wlroots wlr-screencopy 截屏协议 mod fps_limit; // 帧率限制器 +mod stats; // 管道性能统计(卡顿诊断) mod state; // wlr-screencopy 后端的主状态机 mod state_portal; // Portal/PipeWire 后端的主状态机 mod transform; // 图像变换(旋转/翻转) @@ -43,18 +44,23 @@ fn main() -> Result<()> { // 解析命令行参数 let args = Args::parse(); - // 根据是否启用 verbose 模式设置日志级别 + // 根据 verbose 模式或 RUST_LOG 环境变量设置日志级别 + // 支持 RUST_LOG 粒度控制(如 RUST_LOG=wl_webrtc::webrtc=trace) + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| { + if args.verbose { + tracing_subscriber::EnvFilter::new("debug") + } else { + tracing_subscriber::EnvFilter::new("info") + } + }); tracing_subscriber::fmt() - .with_max_level(if args.verbose { - tracing::Level::DEBUG - } else { - tracing::Level::INFO - }) + .with_env_filter(env_filter) .with_writer(std::io::stderr) .init(); tracing::info!("wl-webrtc starting"); - tracing::debug!("Args: {:?}", args); + tracing::debug!("Args: output={:?} fps={} codec={} port={} verbose={}", args.output, args.fps, args.codec, args.port, args.verbose); // MVP 阶段仅支持 H.264 编码,不支持 HEVC if args.codec != "h264" { @@ -250,7 +256,7 @@ fn run_wlr_screencopy(args: Args) -> Result<()> { // 状态机遇到致命错误时退出 if state.errored { - tracing::error!("Fatal error in state machine, exiting"); + tracing::error!("Fatal error in state machine (check preceding error logs), exiting"); running = false; } @@ -346,7 +352,7 @@ fn run_portal_pipewire(args: Args) -> Result<()> { // Portal 状态机遇到致命错误时退出 if state.is_errored() { - tracing::error!("Fatal error in portal state machine, exiting"); + tracing::error!("Fatal error in portal state machine (check preceding error logs), exiting"); running = false; } } diff --git a/src/state_portal.rs b/src/state_portal.rs index 3e0a61e..409ab37 100644 --- a/src/state_portal.rs +++ b/src/state_portal.rs @@ -3,13 +3,14 @@ use std::os::fd::AsRawFd; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use anyhow::{bail, Result}; // 错误处理工具 use crate::args::Args; // 命令行参数 -use crate::avhw::{self, SwEncState}; // 软件编码器状态(VAAPI 导入 + H.264 编码) +use crate::avhw::{self, CpuNv12Frame, SwEncEncode, SwEncImport, SwEncState}; // 软件编码器状态(VAAPI 导入 + H.264 编码) use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame}; // PipeWire 屏幕采集端点 +use crate::stats::{FrameTimings, PipelineStats}; // 管道统计(帧计时、每秒快照) use crate::webrtc::WebRtcState; // WebRTC 信令与媒体传输 /// 门户采集的阶段状态 @@ -20,6 +21,18 @@ enum PortalStage { Streaming, } +struct EncodeThreadTiming { + sws_us: u64, + encode_us: u64, + output_bytes: usize, +} + +struct EncodeThread { + handle: Option>, + input_tx: crossbeam_channel::Sender, + timing_rx: crossbeam_channel::Receiver, +} + /// 门户模式的主状态机 /// /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 @@ -27,16 +40,17 @@ enum PortalStage { pub struct StatePortal { stage: PortalStage, // 当前采集阶段(等待首帧 / 流式编码中) enc: Option, // 软件编码器,首帧到达后初始化 + enc_import: Option, + enc_thread: Option, cap: CapPortal, // PipeWire 屏幕采集端点 args: Args, // 用户命令行参数 errored: bool, // 是否遇到不可恢复的错误 drm_device: Option, // DRM 渲染设备路径(可自动检测) - frames_encoded: u64, // 已编码帧数 + frames_encoded: u64, // 已编码帧数(用于 PTS 编号) start_time: Option, // 编码开始时间 - last_stats_time: Option, // 上一次统计日志时间 - last_stats_frames: u64, // 上一次统计时的已编码帧数 + stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照) + pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算) webrtc: Option, // WebRTC 状态(仅 WebRTC 模式启用) - webrtc_tx: Option>>, // 编码帧发送通道 webrtc_rx: Option>>, webrtc_frames_sent: u64, webrtc_paused: Option>, @@ -56,29 +70,29 @@ impl StatePortal { let cap = CapPortal::new(&args)?; - let (webrtc, webrtc_tx, webrtc_rx, webrtc_paused) = if args.port > 0 { - let (tx, rx) = crossbeam_channel::bounded(32); + let (webrtc, webrtc_paused) = if args.port > 0 { let wrtc = WebRtcState::new(args.port, args.fps)?; let paused = Arc::new(AtomicBool::new(true)); - (Some(wrtc), Some(tx), Some(rx), Some(paused)) + (Some(wrtc), Some(paused)) } else { - (None, None, None, None) + (None, None) }; Ok(Self { stage: PortalStage::WaitingForFormat, enc: None, + enc_import: None, + enc_thread: None, cap, args, errored: false, drm_device, frames_encoded: 0, start_time: None, - last_stats_time: None, - last_stats_frames: 0, + stats: PipelineStats::new(), + pw_dropped_prev: 0, webrtc, - webrtc_tx, - webrtc_rx, + webrtc_rx: None, webrtc_frames_sent: 0, webrtc_paused, }) @@ -155,7 +169,7 @@ impl StatePortal { }); // GOP 大小:WebRTC 模式使用更小的 GOP(fps/2,最低10),MP4 模式使用 fps let actual_gop_size = self.args.gop_size.unwrap_or_else(|| { - if self.webrtc_tx.is_some() { + if self.webrtc.is_some() { (self.args.fps / 2).max(10) } else { self.args.fps @@ -163,26 +177,40 @@ impl StatePortal { }); // 根据是否启用 WebRTC 选择不同的编码器构造方式 - let enc = if let Some(ref tx) = self.webrtc_tx { + if self.webrtc.is_some() { let paused = self.webrtc_paused.as_ref() .ok_or_else(|| anyhow::anyhow!("internal invariant broken: webrtc_paused missing while WebRTC mode is active"))?; - avhw::SwEncState::new_webrtc( + let import = SwEncImport::new( &drm_path, frame.width, frame.height, enc_width, enc_height, self.args.fps, + )?; + let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(32); + let (input_tx, input_rx) = crossbeam_channel::bounded::(2); + let (timing_tx, timing_rx) = crossbeam_channel::bounded::(32); + let encode = SwEncEncode::new_webrtc( + enc_width, + enc_height, + self.args.fps, actual_bitrate, actual_gop_size, - tx.clone(), + webrtc_tx, paused.clone(), - )? + )?; + let handle = std::thread::Builder::new() + .name("wl-webrtc-encode".into()) + .spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?; + self.enc_import = Some(import); + self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx }); + self.webrtc_rx = Some(webrtc_rx); } else { // MP4 模式:编码输出写入文件 let output_path = self.args.output.as_deref() .ok_or_else(|| anyhow::anyhow!("--output is required in MP4 file output mode; use --port > 0 for WebRTC mode"))?; - avhw::SwEncState::new( + let enc = avhw::SwEncState::new( &drm_path, std::path::Path::new(output_path), frame.width, @@ -192,17 +220,17 @@ impl StatePortal { self.args.fps, actual_bitrate, actual_gop_size, - )? + )?; + self.enc = Some(enc); }; - - self.enc = Some(enc); self.stage = PortalStage::Streaming; // 切换到流式编码阶段 self.start_time = Some(Instant::now()); - self.last_stats_time = Some(Instant::now()); tracing::info!("First frame processed, encoder initialized, transitioning to Streaming"); drop(frame); // 首帧仅用于初始化,不参与编码 } PortalStage::Streaming => { + // 记录采集帧到达(用于 capture gap 和 capture_fps 统计) + self.stats.record_capture(); // 流式编码阶段:直接处理帧 self.handle_pw_frame(frame)?; } @@ -212,6 +240,16 @@ impl StatePortal { // WebRTC: drain encoded frames produced by this poll before returning. self.poll_webrtc()?; + // 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志) + if self.args.stats && self.stats.should_snapshot() { + // PipeWire 丢弃帧数:CapPortal 尚未暴露 dropped_count(),暂用占位 + self.stats.set_pipewire_dropped(0, 0); + let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0); + self.stats.set_queue_depths(0, enc_q); + let snap = self.stats.snapshot_and_reset(); + tracing::info!("stats: {snap}"); + } + Ok(true) } @@ -271,49 +309,84 @@ impl StatePortal { /// 通过 `av_hwframe_map` 零拷贝导入 VAAPI,然后交给 SwEncState 完成: /// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。 fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> { - // 获取已初始化的编码器引用 - let enc = match self.enc.as_mut() { - Some(enc) => enc, - None => bail!("encoder not initialized"), - }; - - // 将 DMA-BUF 帧零拷贝导入 VAAPI 硬件帧池 - let mut vaapi_frame = unsafe { - avhw::import_dma_buf_to_vaapi( - enc.frames_rgb().as_ptr(), - frame.fd.as_raw_fd(), - frame.width, - frame.height, - frame.format, - frame.modifier, - frame.stride, - frame.offset, - ) - }?; - - // 设置帧的显示时间戳(PTS),基于已编码帧序号 + let t_import_start = Instant::now(); let pts = self.frames_encoded as i64; - unsafe { - (*vaapi_frame.as_mut_ptr()).pts = pts; - } - // 送入编码器完成:缩放 → 回读 → 格式转换 → H.264 编码 - enc.encode_frame(&vaapi_frame)?; - self.frames_encoded += 1; + if let Some(enc) = self.enc.as_mut() { + // 将 DMA-BUF 帧零拷贝导入 VAAPI 硬件帧池 + let mut vaapi_frame = unsafe { + avhw::import_dma_buf_to_vaapi( + enc.frames_rgb().as_ptr(), + frame.fd.as_raw_fd(), + frame.width, + frame.height, + frame.format, + frame.modifier, + frame.stride, + frame.offset, + ) + }?; - // 每 10 秒输出一次编码统计(已编码帧数、实时帧率) - if let Some(last) = self.last_stats_time { - if last.elapsed() >= Duration::from_secs(10) { - let delta_frames = self.frames_encoded - self.last_stats_frames; - let delta_secs = last.elapsed().as_secs_f64(); - let fps = delta_frames as f64 / delta_secs; - tracing::info!( - "encoded={}, fps={fps:.1}", - self.frames_encoded, - ); - self.last_stats_time = Some(Instant::now()); - self.last_stats_frames = self.frames_encoded; + let import_us = t_import_start.elapsed().as_micros() as u64; + let t_encode_start = Instant::now(); + + // 设置帧的显示时间戳(PTS),基于已编码帧序号 + unsafe { + (*vaapi_frame.as_mut_ptr()).pts = pts; } + + // 送入编码器完成:缩放 → 回读 → 格式转换 → H.264 编码 + enc.encode_frame(&vaapi_frame)?; + let total_us = t_import_start.elapsed().as_micros() as u64; + let encode_us = t_encode_start.elapsed().as_micros() as u64; + + self.frames_encoded += 1; + + // 记录帧计时到管道统计(import + encode 内部各阶段暂不可分离,用 total 覆盖) + let timings = FrameTimings { + import_us, + encode_us, + total_us, + ..Default::default() + }; + self.stats.record_encode(&timings); + } else if let Some(import) = self.enc_import.as_mut() { + let mut vaapi_frame = unsafe { + avhw::import_dma_buf_to_vaapi( + import.frames_rgb().as_ptr(), + frame.fd.as_raw_fd(), + frame.width, + frame.height, + frame.format, + frame.modifier, + frame.stride, + frame.offset, + ) + }?; + unsafe { + (*vaapi_frame.as_mut_ptr()).pts = pts; + } + + let cpu_nv12 = import.import_and_scale(&vaapi_frame)?; + let import_us = t_import_start.elapsed().as_micros() as u64; + self.stats.record_import(import_us); + + let enc_thread = self.enc_thread.as_ref() + .ok_or_else(|| anyhow::anyhow!("internal invariant broken: encode thread missing while async import is active"))?; + match enc_thread.input_tx.try_send(cpu_nv12) { + Ok(()) => { + self.frames_encoded += 1; + } + Err(crossbeam_channel::TrySendError::Full(_frame)) => { + tracing::warn!("Encode thread input full, dropping portal frame"); + } + Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => { + tracing::error!("Encode thread input disconnected"); + self.errored = true; + } + } + } else { + bail!("encoder not initialized"); } Ok(()) @@ -326,6 +399,15 @@ impl StatePortal { // 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected // 而非在满通道上阻塞(修复 issue #8 死锁) self.webrtc_rx = None; + if let Some(mut enc_thread) = self.enc_thread.take() { + drop(enc_thread.input_tx); + if let Some(handle) = enc_thread.handle.take() { + if handle.join().is_err() { + tracing::error!("Encode thread panicked during shutdown"); + } + } + } + self.enc_import = None; if let Some(mut enc) = self.enc.take() { if let Err(e) = enc.flush() { tracing::error!("Flush error during shutdown: {e}"); @@ -384,10 +466,21 @@ impl StatePortal { if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) { tracing::debug!("WebRTC write frame error: {e}"); } + self.stats.record_send(0.0, None); self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1); } if count > 0 { - tracing::info!("WebRTC forwarded {count} frames from channel"); + tracing::debug!("WebRTC forwarded {count} frames from channel"); + } + } + + if let Some(ref enc_thread) = self.enc_thread { + while let Ok(timing) = enc_thread.timing_rx.try_recv() { + self.stats.record_encode_thread( + timing.sws_us, + timing.encode_us, + timing.output_bytes, + ); } } @@ -395,6 +488,42 @@ impl StatePortal { } } +fn encode_thread_loop( + mut encode: SwEncEncode, + input_rx: crossbeam_channel::Receiver, + timing_tx: crossbeam_channel::Sender, +) { + loop { + match input_rx.recv() { + Ok(frame) => { + let t_start = Instant::now(); + match encode.encode_cpu_frame(&frame) { + Ok(()) => { + let elapsed = t_start.elapsed().as_micros() as u64; + let _ = timing_tx.try_send(EncodeThreadTiming { + sws_us: 0, + encode_us: elapsed, + output_bytes: 0, + }); + } + Err(e) => { + tracing::error!("Encode thread error: {e}"); + break; + } + } + } + Err(_) => { + tracing::info!("Encode thread input closed, flushing encoder"); + if let Err(e) = encode.flush() { + tracing::error!("Encode thread flush error: {e}"); + } + break; + } + } + } + tracing::info!("Encode thread exiting"); +} + impl Drop for StatePortal { // 析构时自动调用 shutdown,确保编码器被刷新、资源被释放 fn drop(&mut self) { @@ -510,6 +639,7 @@ mod tests { backend: None, port: 0, no_persist: false, + stats: false, }; let result = resolve_drm_device(&args).unwrap(); assert_eq!( @@ -533,6 +663,7 @@ mod tests { backend: None, port: 0, no_persist: false, + stats: false, }; let result = resolve_drm_device(&args).unwrap(); assert_eq!(result, None);