feat(portal): async encode pipeline - decouple capture from encoding
Split synchronous encode pipeline so sws_scale + libx264 runs on a dedicated thread, leaving only VAAPI import + GPU scale + GPU→CPU transfer on the main capture thread. Problem: encode_p95 occasionally hit 74ms, blocking the entire capture pipeline and causing capture_gap_max=356ms stutter. Solution: - avhw.rs: Split SwEncState into SwEncImport (main thread: VAAPI import, filter_graph scale, GPU→CPU transfer) and SwEncEncode (encode thread: sws_scale NV12→YUV420P, libx264 encode). New CpuNv12Frame struct carries owned pixel data across threads via crossbeam channel. SwEncState wraps both for backward compat (MP4/sync path untouched). - state_portal.rs: WebRTC portal path spawns 'wl-webrtc-encode' thread with bounded(2) input channel (drop-newest backpressure) and separate timing channel. Graceful shutdown: drop webrtc_rx → drop input_tx → join encode thread → flush sync encoder. - stats.rs: Add record_import() + record_encode_thread() for async timing. Results: encode_p95 stable at 2.9-4.2ms (was 11-74ms), capture_fps stable 59-60fps, cap_gap_p95 17-19ms. Remaining capture stalls traced to PipeWire compositor frame delivery (external, not our code).
This commit is contained in:
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -1126,6 +1126,15 @@ version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.8.0"
|
||||
@@ -2025,10 +2034,14 @@ version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex-automata",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
@@ -14,7 +14,7 @@ signal-hook = "0.3"
|
||||
signal-hook-mio = { version = "0.2", features = ["support-v1_0"] }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
anyhow = "1"
|
||||
drm = "0.12"
|
||||
drm-fourcc = "2"
|
||||
|
||||
490
src/avhw.rs
490
src/avhw.rs
@@ -3,6 +3,7 @@ use std::mem;
|
||||
use std::os::fd::{AsRawFd, RawFd};
|
||||
use std::os::raw::c_void;
|
||||
use std::path::Path;
|
||||
use std::slice;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::ptr;
|
||||
@@ -45,8 +46,9 @@ impl AvHwDevCtx {
|
||||
};
|
||||
if ret < 0 {
|
||||
bail!(
|
||||
"Failed to create VAAPI device context from {}: error {ret}",
|
||||
drm_device.display()
|
||||
"Failed to create VAAPI device context from {}: {}",
|
||||
drm_device.display(),
|
||||
ff_err(ret)
|
||||
);
|
||||
}
|
||||
Ok(Self { ptr: p })
|
||||
@@ -108,7 +110,7 @@ impl AvHwFrameCtx {
|
||||
if ret < 0 {
|
||||
// SAFETY: p is valid but init failed; clean up.
|
||||
unsafe { ffi::av_buffer_unref(&mut p) };
|
||||
bail!("av_hwframe_ctx_init failed: error {ret}");
|
||||
bail!("av_hwframe_ctx_init failed: {}", ff_err(ret));
|
||||
}
|
||||
Ok(Self { ptr: p })
|
||||
}
|
||||
@@ -244,8 +246,7 @@ pub unsafe fn import_dma_buf_to_vaapi(
|
||||
)
|
||||
};
|
||||
if ret < 0 {
|
||||
let err_str = av_err_to_string(ret);
|
||||
bail!("av_hwframe_map failed: error {ret} ({err_str})");
|
||||
bail!("av_hwframe_map failed: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
Ok(dst)
|
||||
@@ -259,7 +260,8 @@ unsafe extern "C" fn cleanup_drm_descriptor(_opaque: *mut c_void, data: *mut u8)
|
||||
let _ = Box::from_raw(data as *mut ffi::AVDRMFrameDescriptor);
|
||||
}
|
||||
|
||||
fn av_err_to_string(err: i32) -> String {
|
||||
/// Convert an FFmpeg error code to a human-readable string.
|
||||
pub(crate) fn av_err_to_string(err: i32) -> String {
|
||||
let mut buf = vec![0u8; 128];
|
||||
// SAFETY: buf points to 128 writable bytes and lives for the duration of
|
||||
// av_strerror.
|
||||
@@ -270,6 +272,12 @@ fn av_err_to_string(err: i32) -> String {
|
||||
.trim_end_matches('\0')
|
||||
.to_string()
|
||||
}
|
||||
|
||||
/// Format an FFmpeg error code with both numeric value and description.
|
||||
/// Example output: "error -22 (Invalid argument)"
|
||||
pub(crate) fn ff_err(ret: i32) -> String {
|
||||
format!("error {ret} ({})", av_err_to_string(ret))
|
||||
}
|
||||
// ---------------------------------------------------------------------------
|
||||
// EncState
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -326,7 +334,7 @@ impl EncState {
|
||||
transform,
|
||||
)?;
|
||||
|
||||
let mut sink_ctx = video_filter.get("out").unwrap();
|
||||
let mut sink_ctx = video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?;
|
||||
// SAFETY: sink_ctx is a live buffersink; the returned hw_frames_ctx is
|
||||
// borrowed, so av_buffer_ref creates an owned reference.
|
||||
let sink_hw_frames = unsafe {
|
||||
@@ -371,6 +379,15 @@ impl EncState {
|
||||
enc.set_time_base(ff::Rational::new(1, fps as i32));
|
||||
enc.set_max_b_frames(0);
|
||||
|
||||
// VBV rate limiting: caps IDR burst size for WebRTC. Without this a 4K
|
||||
// scene change can produce a 256KB keyframe that overflows the UDP send
|
||||
// buffer. bufsize=bitrate/4 ≈ 250ms of video at the target bitrate.
|
||||
unsafe {
|
||||
let ctx_ptr = enc.as_mut_ptr();
|
||||
(*ctx_ptr).rc_max_rate = bitrate as i64;
|
||||
(*ctx_ptr).rc_buffer_size = (bitrate / 4) as i32;
|
||||
}
|
||||
|
||||
// SAFETY: AV_CODEC_FLAG_GLOBAL_HEADER must be set BEFORE opening the encoder.
|
||||
// It triggers SPS/PPS extradata generation needed by the muxer for
|
||||
// Annex B to AVCC conversion.
|
||||
@@ -395,7 +412,7 @@ impl EncState {
|
||||
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0)
|
||||
};
|
||||
if ret < 0 {
|
||||
tracing::warn!("av_opt_set repeat_pps failed (error {ret}), likely FFmpeg < 7.0; continuing without per-frame PPS");
|
||||
tracing::warn!("av_opt_set repeat_pps failed ({}), likely FFmpeg < 7.0; continuing without per-frame PPS", ff_err(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,7 +437,7 @@ impl EncState {
|
||||
)
|
||||
};
|
||||
if ret < 0 || fmt_ctx_ptr.is_null() {
|
||||
bail!("Failed to allocate output format context: error {ret}");
|
||||
bail!("Failed to allocate output format context: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// SAFETY: avformat_query_codec checks codec+format compatibility.
|
||||
@@ -444,7 +461,7 @@ impl EncState {
|
||||
ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr())
|
||||
};
|
||||
if ret < 0 {
|
||||
bail!("Failed to copy encoder parameters to stream: error {ret}");
|
||||
bail!("Failed to copy encoder parameters to stream: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// SAFETY: Copy encoder time_base to stream.
|
||||
@@ -462,15 +479,16 @@ impl EncState {
|
||||
};
|
||||
if ret < 0 {
|
||||
bail!(
|
||||
"Failed to open output file '{}': error {ret}",
|
||||
output_path.display()
|
||||
"Failed to open output file '{}': {}",
|
||||
output_path.display(),
|
||||
ff_err(ret)
|
||||
);
|
||||
}
|
||||
|
||||
// SAFETY: avformat_write_header writes the container header.
|
||||
let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) };
|
||||
if ret < 0 {
|
||||
bail!("Failed to write output header: error {ret}");
|
||||
bail!("Failed to write output header: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// SAFETY: We created fmt_ctx_ptr above and it's valid.
|
||||
@@ -492,9 +510,9 @@ impl EncState {
|
||||
}
|
||||
|
||||
pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> {
|
||||
let mut filter_src_ctx = self.video_filter.get("in").unwrap();
|
||||
let mut filter_src_ctx = self.video_filter.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?;
|
||||
let mut filter_src = filter_src_ctx.source();
|
||||
let mut filter_sink_ctx = self.video_filter.get("out").unwrap();
|
||||
let mut filter_sink_ctx = self.video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?;
|
||||
let mut filter_sink = filter_sink_ctx.sink();
|
||||
|
||||
// SAFETY: hw_frame is a valid VAAPI hardware frame from capture.
|
||||
@@ -524,7 +542,7 @@ impl EncState {
|
||||
let ret =
|
||||
unsafe { ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), filtered.as_ptr()) };
|
||||
if ret < 0 {
|
||||
bail!("avcodec_send_frame failed: error {ret}");
|
||||
bail!("avcodec_send_frame failed: {}", ff_err(ret));
|
||||
}
|
||||
self.drain_encoder(start_ts)?;
|
||||
}
|
||||
@@ -534,12 +552,14 @@ impl EncState {
|
||||
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
// Flush filter graph
|
||||
let mut filter_src_ctx = self.video_filter.get("in").unwrap();
|
||||
let mut filter_src_ctx = self.video_filter.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?;
|
||||
let mut filter_src = filter_src_ctx.source();
|
||||
let _ = filter_src.flush();
|
||||
if let Err(e) = filter_src.flush() {
|
||||
tracing::debug!("filter source flush error: {e}");
|
||||
}
|
||||
|
||||
// Drain filter
|
||||
let mut filter_sink_ctx = self.video_filter.get("out").unwrap();
|
||||
let mut filter_sink_ctx = self.video_filter.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?;
|
||||
let mut filter_sink = filter_sink_ctx.sink();
|
||||
loop {
|
||||
let mut filtered = ff::frame::Video::empty();
|
||||
@@ -552,7 +572,7 @@ impl EncState {
|
||||
ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), filtered.as_ptr())
|
||||
};
|
||||
if ret < 0 {
|
||||
bail!("avcodec_send_frame failed during flush: error {ret}");
|
||||
bail!("avcodec_send_frame failed during flush: {}", ff_err(ret));
|
||||
}
|
||||
self.drain_encoder(start_ts)?;
|
||||
}
|
||||
@@ -589,7 +609,7 @@ impl EncState {
|
||||
if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF {
|
||||
break;
|
||||
}
|
||||
bail!("avcodec_receive_packet failed: error {ret}");
|
||||
bail!("avcodec_receive_packet failed: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// Rescale timestamps from encoder time_base to stream time_base
|
||||
@@ -633,39 +653,34 @@ pub enum FrameOutput {
|
||||
Channel(crossbeam_channel::Sender<Vec<u8>>),
|
||||
}
|
||||
|
||||
pub struct SwEncState {
|
||||
/// Owned CPU NV12 frame data for cross-thread transfer.
|
||||
/// Produced by main thread (VAAPI import + GPU scale + transfer), consumed by encode thread.
|
||||
pub struct CpuNv12Frame {
|
||||
pub y_data: Vec<u8>,
|
||||
pub uv_data: Vec<u8>,
|
||||
pub y_stride: usize,
|
||||
pub uv_stride: usize,
|
||||
pub pts: i64,
|
||||
}
|
||||
|
||||
pub struct SwEncImport {
|
||||
hw_dev: AvHwDevCtx,
|
||||
frames_rgb: AvHwFrameCtx,
|
||||
filter_graph: ff::filter::Graph,
|
||||
sws_ctx: *mut ffi::SwsContext,
|
||||
enc_video: ff::codec::encoder::video::Video,
|
||||
output: Option<FrameOutput>,
|
||||
yuv_frame: *mut ffi::AVFrame,
|
||||
starting_timestamp: Option<i64>,
|
||||
frames_written: bool,
|
||||
webrtc_disconnected: bool,
|
||||
webrtc_paused: Option<Arc<AtomicBool>>,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
}
|
||||
|
||||
unsafe impl Send for SwEncState {}
|
||||
|
||||
impl SwEncState {
|
||||
impl SwEncImport {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
drm_device: &Path,
|
||||
output_path: &Path,
|
||||
width: u32,
|
||||
height: u32,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
) -> Result<Self> {
|
||||
tracing::info!(
|
||||
"SwEncState::new: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264"
|
||||
);
|
||||
|
||||
let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?;
|
||||
let frames_rgb =
|
||||
AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?;
|
||||
@@ -679,88 +694,32 @@ impl SwEncState {
|
||||
fps,
|
||||
)?;
|
||||
|
||||
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
|
||||
let (enc_video, octx) =
|
||||
create_software_h264_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?;
|
||||
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
|
||||
|
||||
Ok(Self {
|
||||
hw_dev,
|
||||
frames_rgb,
|
||||
filter_graph,
|
||||
sws_ctx,
|
||||
enc_video,
|
||||
output: Some(FrameOutput::Muxer(octx)),
|
||||
yuv_frame,
|
||||
starting_timestamp: None,
|
||||
frames_written: false,
|
||||
webrtc_disconnected: false,
|
||||
webrtc_paused: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_webrtc(
|
||||
drm_device: &Path,
|
||||
width: u32,
|
||||
height: u32,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
tx: crossbeam_channel::Sender<Vec<u8>>,
|
||||
webrtc_paused: Arc<AtomicBool>,
|
||||
) -> Result<Self> {
|
||||
tracing::info!(
|
||||
"SwEncState::new_webrtc: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264 -> WebRTC"
|
||||
);
|
||||
|
||||
let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?;
|
||||
let frames_rgb =
|
||||
AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?;
|
||||
let filter_graph = build_swenc_filter_graph(
|
||||
&hw_dev,
|
||||
&frames_rgb,
|
||||
width,
|
||||
height,
|
||||
enc_width,
|
||||
enc_height,
|
||||
fps,
|
||||
)?;
|
||||
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
|
||||
let enc_video = create_software_h264_encoder(enc_width, enc_height, fps, bitrate, gop_size)?;
|
||||
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
|
||||
|
||||
Ok(Self {
|
||||
hw_dev,
|
||||
frames_rgb,
|
||||
filter_graph,
|
||||
sws_ctx,
|
||||
enc_video,
|
||||
output: Some(FrameOutput::Channel(tx)),
|
||||
yuv_frame,
|
||||
starting_timestamp: None,
|
||||
frames_written: false,
|
||||
webrtc_disconnected: false,
|
||||
webrtc_paused: Some(webrtc_paused),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn frames_rgb(&self) -> &AvHwFrameCtx {
|
||||
let _ = self.hw_dev.as_ptr();
|
||||
&self.frames_rgb
|
||||
}
|
||||
|
||||
pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> {
|
||||
let mut filter_src_ctx = self.filter_graph.get("in").unwrap();
|
||||
pub fn import_and_scale(&mut self, hw_frame: &ff::frame::Video) -> Result<CpuNv12Frame> {
|
||||
let mut filter_src_ctx = self.filter_graph.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?;
|
||||
let mut filter_src = filter_src_ctx.source();
|
||||
let mut filter_sink_ctx = self.filter_graph.get("out").unwrap();
|
||||
let mut filter_sink_ctx = self.filter_graph.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?;
|
||||
let mut filter_sink = filter_sink_ctx.sink();
|
||||
|
||||
filter_src
|
||||
.add(hw_frame)
|
||||
.map_err(|e| anyhow::anyhow!("software pipeline filter source add failed: {e}"))?;
|
||||
|
||||
let mut first = None;
|
||||
let mut extra_count = 0usize;
|
||||
loop {
|
||||
let mut filtered = ff::frame::Video::empty();
|
||||
match filter_sink.frame(&mut filtered) {
|
||||
@@ -768,61 +727,47 @@ impl SwEncState {
|
||||
if filtered.pts().is_none() {
|
||||
filtered.set_pts(hw_frame.pts());
|
||||
}
|
||||
self.encode_filtered_frame(&filtered)?;
|
||||
let cpu_frame = self.transfer_filtered_to_cpu(&filtered)?;
|
||||
if first.is_none() {
|
||||
first = Some(cpu_frame);
|
||||
} else {
|
||||
extra_count += 1;
|
||||
}
|
||||
}
|
||||
Err(ff::Error::Other { errno }) if errno == ffi::EAGAIN => break,
|
||||
Err(e) => bail!("software pipeline filter sink get frame failed: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
if extra_count > 0 {
|
||||
tracing::warn!("software import filter produced {extra_count} extra frame(s); dropping extras");
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
let mut filter_src_ctx = self.filter_graph.get("in").unwrap();
|
||||
let mut filter_src = filter_src_ctx.source();
|
||||
let _ = filter_src.flush();
|
||||
first.ok_or_else(|| anyhow::anyhow!("software pipeline produced no scaled frame"))
|
||||
}
|
||||
|
||||
let mut filter_sink_ctx = self.filter_graph.get("out").unwrap();
|
||||
pub fn flush_import(&mut self) -> Result<Vec<CpuNv12Frame>> {
|
||||
let mut filter_src_ctx = self.filter_graph.get("in").ok_or_else(|| anyhow::anyhow!("filter 'in' not found"))?;
|
||||
let mut filter_src = filter_src_ctx.source();
|
||||
if let Err(e) = filter_src.flush() {
|
||||
tracing::debug!("filter source flush error: {e}");
|
||||
}
|
||||
|
||||
let mut filter_sink_ctx = self.filter_graph.get("out").ok_or_else(|| anyhow::anyhow!("filter 'out' not found"))?;
|
||||
let mut filter_sink = filter_sink_ctx.sink();
|
||||
let mut frames = Vec::new();
|
||||
loop {
|
||||
let mut filtered = ff::frame::Video::empty();
|
||||
match filter_sink.frame(&mut filtered) {
|
||||
Ok(()) => self.encode_filtered_frame(&filtered)?,
|
||||
Ok(()) => frames.push(self.transfer_filtered_to_cpu(&filtered)?),
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: Sending a null frame flushes the opened software encoder;
|
||||
// no frame data is dereferenced. enc_video is exclusively borrowed via &mut self.
|
||||
unsafe {
|
||||
let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), ptr::null());
|
||||
if ret < 0 && ret != ffi::AVERROR_EOF {
|
||||
bail!("software encoder flush send failed: error {ret}");
|
||||
}
|
||||
}
|
||||
let start_ts = self.starting_timestamp.unwrap_or(0);
|
||||
self.drain_encoder(start_ts)?;
|
||||
|
||||
if self.frames_written {
|
||||
if let Some(FrameOutput::Muxer(ref mut octx)) = self.output {
|
||||
octx.write_trailer()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?;
|
||||
}
|
||||
Ok(frames)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode_filtered_frame(&mut self, filtered: &ff::frame::Video) -> Result<()> {
|
||||
if self.webrtc_disconnected {
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(ref paused) = self.webrtc_paused {
|
||||
if paused.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
fn transfer_filtered_to_cpu(&self, filtered: &ff::frame::Video) -> Result<CpuNv12Frame> {
|
||||
// SAFETY: av_frame_alloc returns a newly allocated AVFrame or null,
|
||||
// which is checked below.
|
||||
let mut sw_nv12 = unsafe { ffi::av_frame_alloc() };
|
||||
@@ -837,32 +782,171 @@ impl SwEncState {
|
||||
// SAFETY: sw_nv12 was allocated above and has not been freed yet.
|
||||
unsafe { ffi::av_frame_free(&mut sw_nv12) };
|
||||
bail!(
|
||||
"av_hwframe_transfer_data failed for GPU-downscaled frame: error {transfer_ret} ({})",
|
||||
av_err_to_string(transfer_ret)
|
||||
"av_hwframe_transfer_data failed for GPU-downscaled frame: {}",
|
||||
ff_err(transfer_ret)
|
||||
);
|
||||
}
|
||||
|
||||
// SAFETY: sw_nv12 was filled by av_hwframe_transfer_data. NV12 planes 0 and 1 are
|
||||
// initialized for enc_width x enc_height; linesize values define each row's byte span.
|
||||
let frame = unsafe {
|
||||
let y_ptr = (*sw_nv12).data[0];
|
||||
let uv_ptr = (*sw_nv12).data[1];
|
||||
if y_ptr.is_null() || uv_ptr.is_null() {
|
||||
ffi::av_frame_free(&mut sw_nv12);
|
||||
bail!("NV12 transfer frame missing Y/UV plane data");
|
||||
}
|
||||
let y_stride = (*sw_nv12).linesize[0] as usize;
|
||||
let uv_stride = (*sw_nv12).linesize[1] as usize;
|
||||
if (*sw_nv12).width != self.enc_width as i32 || (*sw_nv12).height != self.enc_height as i32 {
|
||||
ffi::av_frame_free(&mut sw_nv12);
|
||||
bail!("NV12 transfer frame has unexpected dimensions");
|
||||
}
|
||||
let y_len = y_stride * self.enc_height as usize;
|
||||
let uv_len = uv_stride * (self.enc_height as usize / 2);
|
||||
let y_data = slice::from_raw_parts(y_ptr, y_len).to_vec();
|
||||
let uv_data = slice::from_raw_parts(uv_ptr, uv_len).to_vec();
|
||||
let pts = filtered.pts().unwrap_or(0);
|
||||
ffi::av_frame_free(&mut sw_nv12);
|
||||
CpuNv12Frame {
|
||||
y_data,
|
||||
uv_data,
|
||||
y_stride,
|
||||
uv_stride,
|
||||
pts,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(frame)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SwEncEncode {
|
||||
sws_ctx: *mut ffi::SwsContext,
|
||||
enc_video: ff::codec::encoder::video::Video,
|
||||
output: Option<FrameOutput>,
|
||||
yuv_frame: *mut ffi::AVFrame,
|
||||
starting_timestamp: Option<i64>,
|
||||
frames_written: bool,
|
||||
webrtc_disconnected: bool,
|
||||
webrtc_paused: Option<Arc<AtomicBool>>,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
}
|
||||
|
||||
// SAFETY: SwEncEncode owns sws_ctx/yuv_frame/enc_video exclusively after construction.
|
||||
// It is moved to a single encode thread and only accessed through &mut self there.
|
||||
unsafe impl Send for SwEncEncode {}
|
||||
|
||||
impl SwEncEncode {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_muxer(
|
||||
output_path: &Path,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
) -> Result<Self> {
|
||||
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
|
||||
let (enc_video, octx) =
|
||||
create_software_h264_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?;
|
||||
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
|
||||
|
||||
Ok(Self {
|
||||
sws_ctx,
|
||||
enc_video,
|
||||
output: Some(FrameOutput::Muxer(octx)),
|
||||
yuv_frame,
|
||||
starting_timestamp: None,
|
||||
frames_written: false,
|
||||
webrtc_disconnected: false,
|
||||
webrtc_paused: None,
|
||||
enc_width,
|
||||
enc_height,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_webrtc(
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
tx: crossbeam_channel::Sender<Vec<u8>>,
|
||||
webrtc_paused: Arc<AtomicBool>,
|
||||
) -> Result<Self> {
|
||||
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
|
||||
let enc_video = create_software_h264_encoder(enc_width, enc_height, fps, bitrate, gop_size)?;
|
||||
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
|
||||
|
||||
Ok(Self {
|
||||
sws_ctx,
|
||||
enc_video,
|
||||
output: Some(FrameOutput::Channel(tx)),
|
||||
yuv_frame,
|
||||
starting_timestamp: None,
|
||||
frames_written: false,
|
||||
webrtc_disconnected: false,
|
||||
webrtc_paused: Some(webrtc_paused),
|
||||
enc_width,
|
||||
enc_height,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
// SAFETY: Sending a null frame flushes the opened software encoder;
|
||||
// no frame data is dereferenced. enc_video is exclusively borrowed via &mut self.
|
||||
unsafe {
|
||||
let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), ptr::null());
|
||||
if ret < 0 && ret != ffi::AVERROR_EOF {
|
||||
bail!("software encoder flush send failed: {}", ff_err(ret));
|
||||
}
|
||||
}
|
||||
let start_ts = self.starting_timestamp.unwrap_or(0);
|
||||
self.drain_encoder(start_ts)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn encode_cpu_frame(&mut self, frame: &CpuNv12Frame) -> Result<()> {
|
||||
if self.webrtc_disconnected {
|
||||
return Ok(());
|
||||
}
|
||||
if frame.y_stride < self.enc_width as usize || frame.uv_stride < self.enc_width as usize {
|
||||
bail!("CPU NV12 frame stride is smaller than encoder width");
|
||||
}
|
||||
if let Some(ref paused) = self.webrtc_paused {
|
||||
if paused.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: yuv_frame is an owned reusable YUV420P frame at the same dimensions as sw_nv12;
|
||||
// sws_ctx was created for NV12 -> YUV420P with no resize, so sws_scale only converts format.
|
||||
unsafe {
|
||||
let ret = ffi::av_frame_make_writable(self.yuv_frame);
|
||||
if ret < 0 {
|
||||
ffi::av_frame_free(&mut sw_nv12);
|
||||
bail!("av_frame_make_writable failed: error {ret}");
|
||||
bail!("av_frame_make_writable failed: {}", ff_err(ret));
|
||||
}
|
||||
ffi::sws_scale(
|
||||
let src_slices = [frame.y_data.as_ptr(), frame.uv_data.as_ptr(), ptr::null(), ptr::null()];
|
||||
let src_strides = [frame.y_stride as i32, frame.uv_stride as i32, 0, 0];
|
||||
let scaled = ffi::sws_scale(
|
||||
self.sws_ctx,
|
||||
(*sw_nv12).data.as_ptr() as *const *const u8,
|
||||
(*sw_nv12).linesize.as_ptr() as *const i32,
|
||||
src_slices.as_ptr(),
|
||||
src_strides.as_ptr(),
|
||||
0,
|
||||
(*sw_nv12).height,
|
||||
self.enc_height as i32,
|
||||
(*self.yuv_frame).data.as_ptr() as *mut *mut u8,
|
||||
(*self.yuv_frame).linesize.as_ptr() as *const i32,
|
||||
);
|
||||
ffi::av_frame_free(&mut sw_nv12);
|
||||
if scaled < 0 {
|
||||
bail!("sws_scale failed for software encoder: {scaled}");
|
||||
}
|
||||
}
|
||||
|
||||
let pts = filtered.pts().unwrap_or(0);
|
||||
let pts = frame.pts;
|
||||
if self.starting_timestamp.is_none() {
|
||||
self.starting_timestamp = Some(pts);
|
||||
}
|
||||
@@ -873,13 +957,23 @@ impl SwEncState {
|
||||
(*self.yuv_frame).pts = pts;
|
||||
let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), self.yuv_frame);
|
||||
if ret < 0 {
|
||||
bail!("avcodec_send_frame failed for software encoder: error {ret}");
|
||||
bail!("avcodec_send_frame failed for software encoder: {}", ff_err(ret));
|
||||
}
|
||||
}
|
||||
|
||||
self.drain_encoder(start_ts)
|
||||
}
|
||||
|
||||
fn write_trailer_if_needed(&mut self) -> Result<()> {
|
||||
if self.frames_written {
|
||||
if let Some(FrameOutput::Muxer(ref mut octx)) = self.output {
|
||||
octx.write_trailer()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drain_encoder(&mut self, start_ts: i64) -> Result<()> {
|
||||
loop {
|
||||
let mut pkt = ff::Packet::empty();
|
||||
@@ -891,7 +985,7 @@ impl SwEncState {
|
||||
if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF {
|
||||
break;
|
||||
}
|
||||
bail!("avcodec_receive_packet failed: error {ret}");
|
||||
bail!("avcodec_receive_packet failed: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
match self.output {
|
||||
@@ -961,7 +1055,7 @@ impl SwEncState {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SwEncState {
|
||||
impl Drop for SwEncEncode {
|
||||
fn drop(&mut self) {
|
||||
if !self.sws_ctx.is_null() {
|
||||
// SAFETY: sws_ctx is owned by this state and was returned by sws_getContext.
|
||||
@@ -975,6 +1069,83 @@ impl Drop for SwEncState {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SwEncState {
|
||||
import: SwEncImport,
|
||||
encode: SwEncEncode,
|
||||
}
|
||||
|
||||
// SAFETY: SwEncState owns import and encode state exclusively and existing sync callers move it
|
||||
// between threads only with external serialization; all FFI handles are accessed through &mut self.
|
||||
unsafe impl Send for SwEncState {}
|
||||
|
||||
impl SwEncState {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
drm_device: &Path,
|
||||
output_path: &Path,
|
||||
width: u32,
|
||||
height: u32,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
) -> Result<Self> {
|
||||
tracing::info!(
|
||||
"SwEncState::new: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264"
|
||||
);
|
||||
let import = SwEncImport::new(drm_device, width, height, enc_width, enc_height, fps)?;
|
||||
let encode = SwEncEncode::new_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?;
|
||||
Ok(Self { import, encode })
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_webrtc(
|
||||
drm_device: &Path,
|
||||
width: u32,
|
||||
height: u32,
|
||||
enc_width: u32,
|
||||
enc_height: u32,
|
||||
fps: u32,
|
||||
bitrate: u64,
|
||||
gop_size: u32,
|
||||
tx: crossbeam_channel::Sender<Vec<u8>>,
|
||||
webrtc_paused: Arc<AtomicBool>,
|
||||
) -> Result<Self> {
|
||||
tracing::info!(
|
||||
"SwEncState::new_webrtc: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264 -> WebRTC"
|
||||
);
|
||||
let import = SwEncImport::new(drm_device, width, height, enc_width, enc_height, fps)?;
|
||||
let encode = SwEncEncode::new_webrtc(
|
||||
enc_width,
|
||||
enc_height,
|
||||
fps,
|
||||
bitrate,
|
||||
gop_size,
|
||||
tx,
|
||||
webrtc_paused,
|
||||
)?;
|
||||
Ok(Self { import, encode })
|
||||
}
|
||||
|
||||
pub fn frames_rgb(&self) -> &AvHwFrameCtx {
|
||||
self.import.frames_rgb()
|
||||
}
|
||||
|
||||
pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> {
|
||||
let cpu_frame = self.import.import_and_scale(hw_frame)?;
|
||||
self.encode.encode_cpu_frame(&cpu_frame)
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
for frame in self.import.flush_import()? {
|
||||
self.encode.encode_cpu_frame(&frame)?;
|
||||
}
|
||||
self.encode.flush()?;
|
||||
self.encode.write_trailer_if_needed()
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Shared encoder creation (used by both wlr-screencopy and portal paths)
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -1065,7 +1236,7 @@ fn build_swenc_filter_graph(
|
||||
let ret = ffi::av_buffersrc_parameters_set(src_ctx.as_mut_ptr(), par);
|
||||
ffi::av_free(par as *mut _);
|
||||
if ret < 0 {
|
||||
bail!("av_buffersrc_parameters_set failed: error {ret}");
|
||||
bail!("av_buffersrc_parameters_set failed: {}", ff_err(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1124,7 +1295,7 @@ fn alloc_yuv420p_frame(width: u32, height: u32) -> Result<*mut ffi::AVFrame> {
|
||||
let ret = ffi::av_frame_get_buffer(frame, 0);
|
||||
if ret < 0 {
|
||||
ffi::av_frame_free(&mut frame);
|
||||
bail!("av_frame_get_buffer failed: error {ret}");
|
||||
bail!("av_frame_get_buffer failed: {}", ff_err(ret));
|
||||
}
|
||||
Ok(frame)
|
||||
}
|
||||
@@ -1212,7 +1383,7 @@ fn create_software_h264_muxer(
|
||||
)
|
||||
};
|
||||
if ret < 0 || fmt_ctx_ptr.is_null() {
|
||||
bail!("Failed to allocate output format context: error {ret}");
|
||||
bail!("Failed to allocate output format context: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// SAFETY: fmt_ctx_ptr is valid; stream and codec parameters are owned by the format context.
|
||||
@@ -1225,7 +1396,7 @@ fn create_software_h264_muxer(
|
||||
let ret =
|
||||
unsafe { ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr()) };
|
||||
if ret < 0 {
|
||||
bail!("Failed to copy codec parameters to stream: error {ret}");
|
||||
bail!("Failed to copy codec parameters to stream: {}", ff_err(ret));
|
||||
}
|
||||
// SAFETY: stream_ptr is valid and writable during muxer setup.
|
||||
unsafe {
|
||||
@@ -1242,8 +1413,9 @@ fn create_software_h264_muxer(
|
||||
);
|
||||
if ret < 0 {
|
||||
bail!(
|
||||
"Failed to open output file '{}': error {ret}",
|
||||
output_path.display()
|
||||
"Failed to open output file '{}': {}",
|
||||
output_path.display(),
|
||||
ff_err(ret)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1252,7 +1424,7 @@ fn create_software_h264_muxer(
|
||||
// SAFETY: fmt_ctx_ptr is fully configured.
|
||||
let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) };
|
||||
if ret < 0 {
|
||||
bail!("Failed to write output header: error {ret}");
|
||||
bail!("Failed to write output header: {}", ff_err(ret));
|
||||
}
|
||||
|
||||
// SAFETY: ownership of fmt_ctx_ptr transfers to ffmpeg-next Output wrapper.
|
||||
@@ -1361,7 +1533,7 @@ fn build_filter_graph(
|
||||
let ret = ffi::av_buffersrc_parameters_set(src_ctx.as_mut_ptr(), par);
|
||||
ffi::av_free(par as *mut _);
|
||||
if ret < 0 {
|
||||
bail!("av_buffersrc_parameters_set failed: error {ret}");
|
||||
bail!("av_buffersrc_parameters_set failed: {}", ff_err(ret));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -190,6 +190,7 @@ mod tests {
|
||||
backend: backend.map(String::from),
|
||||
port: 0,
|
||||
no_persist: false,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ fn main() -> Result<()> {
|
||||
backend: Some("portal".to_string()),
|
||||
port: 0,
|
||||
no_persist: false,
|
||||
stats: false,
|
||||
};
|
||||
|
||||
let cap = CapPortal::new(&portal_args)?;
|
||||
|
||||
@@ -883,6 +883,7 @@ fn main() -> Result<()> {
|
||||
backend: Some("portal".to_string()),
|
||||
port: 0,
|
||||
no_persist: false,
|
||||
stats: false,
|
||||
};
|
||||
|
||||
let cap = CapPortal::new(&portal_args)?;
|
||||
|
||||
24
src/main.rs
24
src/main.rs
@@ -15,6 +15,7 @@ mod backend_detect; // 截屏后端自动检测(wlroots vs Portal/PipeWire)
|
||||
mod cap_portal; // XDG Portal 屏幕捕获
|
||||
mod cap_wlr_screencopy; // wlroots wlr-screencopy 截屏协议
|
||||
mod fps_limit; // 帧率限制器
|
||||
mod stats; // 管道性能统计(卡顿诊断)
|
||||
mod state; // wlr-screencopy 后端的主状态机
|
||||
mod state_portal; // Portal/PipeWire 后端的主状态机
|
||||
mod transform; // 图像变换(旋转/翻转)
|
||||
@@ -43,18 +44,23 @@ fn main() -> Result<()> {
|
||||
// 解析命令行参数
|
||||
let args = Args::parse();
|
||||
|
||||
// 根据是否启用 verbose 模式设置日志级别
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(if args.verbose {
|
||||
tracing::Level::DEBUG
|
||||
// 根据 verbose 模式或 RUST_LOG 环境变量设置日志级别
|
||||
// 支持 RUST_LOG 粒度控制(如 RUST_LOG=wl_webrtc::webrtc=trace)
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| {
|
||||
if args.verbose {
|
||||
tracing_subscriber::EnvFilter::new("debug")
|
||||
} else {
|
||||
tracing::Level::INFO
|
||||
})
|
||||
tracing_subscriber::EnvFilter::new("info")
|
||||
}
|
||||
});
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
|
||||
tracing::info!("wl-webrtc starting");
|
||||
tracing::debug!("Args: {:?}", args);
|
||||
tracing::debug!("Args: output={:?} fps={} codec={} port={} verbose={}", args.output, args.fps, args.codec, args.port, args.verbose);
|
||||
|
||||
// MVP 阶段仅支持 H.264 编码,不支持 HEVC
|
||||
if args.codec != "h264" {
|
||||
@@ -250,7 +256,7 @@ fn run_wlr_screencopy(args: Args) -> Result<()> {
|
||||
|
||||
// 状态机遇到致命错误时退出
|
||||
if state.errored {
|
||||
tracing::error!("Fatal error in state machine, exiting");
|
||||
tracing::error!("Fatal error in state machine (check preceding error logs), exiting");
|
||||
running = false;
|
||||
}
|
||||
|
||||
@@ -346,7 +352,7 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
|
||||
|
||||
// Portal 状态机遇到致命错误时退出
|
||||
if state.is_errored() {
|
||||
tracing::error!("Fatal error in portal state machine, exiting");
|
||||
tracing::error!("Fatal error in portal state machine (check preceding error logs), exiting");
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,14 @@ use std::os::fd::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{bail, Result}; // 错误处理工具
|
||||
|
||||
use crate::args::Args; // 命令行参数
|
||||
use crate::avhw::{self, SwEncState}; // 软件编码器状态(VAAPI 导入 + H.264 编码)
|
||||
use crate::avhw::{self, CpuNv12Frame, SwEncEncode, SwEncImport, SwEncState}; // 软件编码器状态(VAAPI 导入 + H.264 编码)
|
||||
use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame}; // PipeWire 屏幕采集端点
|
||||
use crate::stats::{FrameTimings, PipelineStats}; // 管道统计(帧计时、每秒快照)
|
||||
use crate::webrtc::WebRtcState; // WebRTC 信令与媒体传输
|
||||
|
||||
/// 门户采集的阶段状态
|
||||
@@ -20,6 +21,18 @@ enum PortalStage {
|
||||
Streaming,
|
||||
}
|
||||
|
||||
struct EncodeThreadTiming {
|
||||
sws_us: u64,
|
||||
encode_us: u64,
|
||||
output_bytes: usize,
|
||||
}
|
||||
|
||||
struct EncodeThread {
|
||||
handle: Option<std::thread::JoinHandle<()>>,
|
||||
input_tx: crossbeam_channel::Sender<CpuNv12Frame>,
|
||||
timing_rx: crossbeam_channel::Receiver<EncodeThreadTiming>,
|
||||
}
|
||||
|
||||
/// 门户模式的主状态机
|
||||
///
|
||||
/// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。
|
||||
@@ -27,16 +40,17 @@ enum PortalStage {
|
||||
pub struct StatePortal {
|
||||
stage: PortalStage, // 当前采集阶段(等待首帧 / 流式编码中)
|
||||
enc: Option<SwEncState>, // 软件编码器,首帧到达后初始化
|
||||
enc_import: Option<SwEncImport>,
|
||||
enc_thread: Option<EncodeThread>,
|
||||
cap: CapPortal, // PipeWire 屏幕采集端点
|
||||
args: Args, // 用户命令行参数
|
||||
errored: bool, // 是否遇到不可恢复的错误
|
||||
drm_device: Option<PathBuf>, // DRM 渲染设备路径(可自动检测)
|
||||
frames_encoded: u64, // 已编码帧数
|
||||
frames_encoded: u64, // 已编码帧数(用于 PTS 编号)
|
||||
start_time: Option<Instant>, // 编码开始时间
|
||||
last_stats_time: Option<Instant>, // 上一次统计日志时间
|
||||
last_stats_frames: u64, // 上一次统计时的已编码帧数
|
||||
stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照)
|
||||
pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算)
|
||||
webrtc: Option<WebRtcState>, // WebRTC 状态(仅 WebRTC 模式启用)
|
||||
webrtc_tx: Option<crossbeam_channel::Sender<Vec<u8>>>, // 编码帧发送通道
|
||||
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
|
||||
webrtc_frames_sent: u64,
|
||||
webrtc_paused: Option<Arc<AtomicBool>>,
|
||||
@@ -56,29 +70,29 @@ impl StatePortal {
|
||||
|
||||
let cap = CapPortal::new(&args)?;
|
||||
|
||||
let (webrtc, webrtc_tx, webrtc_rx, webrtc_paused) = if args.port > 0 {
|
||||
let (tx, rx) = crossbeam_channel::bounded(32);
|
||||
let (webrtc, webrtc_paused) = if args.port > 0 {
|
||||
let wrtc = WebRtcState::new(args.port, args.fps)?;
|
||||
let paused = Arc::new(AtomicBool::new(true));
|
||||
(Some(wrtc), Some(tx), Some(rx), Some(paused))
|
||||
(Some(wrtc), Some(paused))
|
||||
} else {
|
||||
(None, None, None, None)
|
||||
(None, None)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
stage: PortalStage::WaitingForFormat,
|
||||
enc: None,
|
||||
enc_import: None,
|
||||
enc_thread: None,
|
||||
cap,
|
||||
args,
|
||||
errored: false,
|
||||
drm_device,
|
||||
frames_encoded: 0,
|
||||
start_time: None,
|
||||
last_stats_time: None,
|
||||
last_stats_frames: 0,
|
||||
stats: PipelineStats::new(),
|
||||
pw_dropped_prev: 0,
|
||||
webrtc,
|
||||
webrtc_tx,
|
||||
webrtc_rx,
|
||||
webrtc_rx: None,
|
||||
webrtc_frames_sent: 0,
|
||||
webrtc_paused,
|
||||
})
|
||||
@@ -155,7 +169,7 @@ impl StatePortal {
|
||||
});
|
||||
// GOP 大小:WebRTC 模式使用更小的 GOP(fps/2,最低10),MP4 模式使用 fps
|
||||
let actual_gop_size = self.args.gop_size.unwrap_or_else(|| {
|
||||
if self.webrtc_tx.is_some() {
|
||||
if self.webrtc.is_some() {
|
||||
(self.args.fps / 2).max(10)
|
||||
} else {
|
||||
self.args.fps
|
||||
@@ -163,26 +177,40 @@ impl StatePortal {
|
||||
});
|
||||
|
||||
// 根据是否启用 WebRTC 选择不同的编码器构造方式
|
||||
let enc = if let Some(ref tx) = self.webrtc_tx {
|
||||
if self.webrtc.is_some() {
|
||||
let paused = self.webrtc_paused.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: webrtc_paused missing while WebRTC mode is active"))?;
|
||||
avhw::SwEncState::new_webrtc(
|
||||
let import = SwEncImport::new(
|
||||
&drm_path,
|
||||
frame.width,
|
||||
frame.height,
|
||||
enc_width,
|
||||
enc_height,
|
||||
self.args.fps,
|
||||
)?;
|
||||
let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(32);
|
||||
let (input_tx, input_rx) = crossbeam_channel::bounded::<CpuNv12Frame>(2);
|
||||
let (timing_tx, timing_rx) = crossbeam_channel::bounded::<EncodeThreadTiming>(32);
|
||||
let encode = SwEncEncode::new_webrtc(
|
||||
enc_width,
|
||||
enc_height,
|
||||
self.args.fps,
|
||||
actual_bitrate,
|
||||
actual_gop_size,
|
||||
tx.clone(),
|
||||
webrtc_tx,
|
||||
paused.clone(),
|
||||
)?
|
||||
)?;
|
||||
let handle = std::thread::Builder::new()
|
||||
.name("wl-webrtc-encode".into())
|
||||
.spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?;
|
||||
self.enc_import = Some(import);
|
||||
self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx });
|
||||
self.webrtc_rx = Some(webrtc_rx);
|
||||
} else {
|
||||
// MP4 模式:编码输出写入文件
|
||||
let output_path = self.args.output.as_deref()
|
||||
.ok_or_else(|| anyhow::anyhow!("--output is required in MP4 file output mode; use --port > 0 for WebRTC mode"))?;
|
||||
avhw::SwEncState::new(
|
||||
let enc = avhw::SwEncState::new(
|
||||
&drm_path,
|
||||
std::path::Path::new(output_path),
|
||||
frame.width,
|
||||
@@ -192,17 +220,17 @@ impl StatePortal {
|
||||
self.args.fps,
|
||||
actual_bitrate,
|
||||
actual_gop_size,
|
||||
)?
|
||||
};
|
||||
|
||||
)?;
|
||||
self.enc = Some(enc);
|
||||
};
|
||||
self.stage = PortalStage::Streaming; // 切换到流式编码阶段
|
||||
self.start_time = Some(Instant::now());
|
||||
self.last_stats_time = Some(Instant::now());
|
||||
tracing::info!("First frame processed, encoder initialized, transitioning to Streaming");
|
||||
drop(frame); // 首帧仅用于初始化,不参与编码
|
||||
}
|
||||
PortalStage::Streaming => {
|
||||
// 记录采集帧到达(用于 capture gap 和 capture_fps 统计)
|
||||
self.stats.record_capture();
|
||||
// 流式编码阶段:直接处理帧
|
||||
self.handle_pw_frame(frame)?;
|
||||
}
|
||||
@@ -212,6 +240,16 @@ impl StatePortal {
|
||||
// WebRTC: drain encoded frames produced by this poll before returning.
|
||||
self.poll_webrtc()?;
|
||||
|
||||
// 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志)
|
||||
if self.args.stats && self.stats.should_snapshot() {
|
||||
// PipeWire 丢弃帧数:CapPortal 尚未暴露 dropped_count(),暂用占位
|
||||
self.stats.set_pipewire_dropped(0, 0);
|
||||
let enc_q = self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0);
|
||||
self.stats.set_queue_depths(0, enc_q);
|
||||
let snap = self.stats.snapshot_and_reset();
|
||||
tracing::info!("stats: {snap}");
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -271,12 +309,10 @@ impl StatePortal {
|
||||
/// 通过 `av_hwframe_map` 零拷贝导入 VAAPI,然后交给 SwEncState 完成:
|
||||
/// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。
|
||||
fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> {
|
||||
// 获取已初始化的编码器引用
|
||||
let enc = match self.enc.as_mut() {
|
||||
Some(enc) => enc,
|
||||
None => bail!("encoder not initialized"),
|
||||
};
|
||||
let t_import_start = Instant::now();
|
||||
let pts = self.frames_encoded as i64;
|
||||
|
||||
if let Some(enc) = self.enc.as_mut() {
|
||||
// 将 DMA-BUF 帧零拷贝导入 VAAPI 硬件帧池
|
||||
let mut vaapi_frame = unsafe {
|
||||
avhw::import_dma_buf_to_vaapi(
|
||||
@@ -291,29 +327,66 @@ impl StatePortal {
|
||||
)
|
||||
}?;
|
||||
|
||||
let import_us = t_import_start.elapsed().as_micros() as u64;
|
||||
let t_encode_start = Instant::now();
|
||||
|
||||
// 设置帧的显示时间戳(PTS),基于已编码帧序号
|
||||
let pts = self.frames_encoded as i64;
|
||||
unsafe {
|
||||
(*vaapi_frame.as_mut_ptr()).pts = pts;
|
||||
}
|
||||
|
||||
// 送入编码器完成:缩放 → 回读 → 格式转换 → H.264 编码
|
||||
enc.encode_frame(&vaapi_frame)?;
|
||||
let total_us = t_import_start.elapsed().as_micros() as u64;
|
||||
let encode_us = t_encode_start.elapsed().as_micros() as u64;
|
||||
|
||||
self.frames_encoded += 1;
|
||||
|
||||
// 每 10 秒输出一次编码统计(已编码帧数、实时帧率)
|
||||
if let Some(last) = self.last_stats_time {
|
||||
if last.elapsed() >= Duration::from_secs(10) {
|
||||
let delta_frames = self.frames_encoded - self.last_stats_frames;
|
||||
let delta_secs = last.elapsed().as_secs_f64();
|
||||
let fps = delta_frames as f64 / delta_secs;
|
||||
tracing::info!(
|
||||
"encoded={}, fps={fps:.1}",
|
||||
self.frames_encoded,
|
||||
);
|
||||
self.last_stats_time = Some(Instant::now());
|
||||
self.last_stats_frames = self.frames_encoded;
|
||||
// 记录帧计时到管道统计(import + encode 内部各阶段暂不可分离,用 total 覆盖)
|
||||
let timings = FrameTimings {
|
||||
import_us,
|
||||
encode_us,
|
||||
total_us,
|
||||
..Default::default()
|
||||
};
|
||||
self.stats.record_encode(&timings);
|
||||
} else if let Some(import) = self.enc_import.as_mut() {
|
||||
let mut vaapi_frame = unsafe {
|
||||
avhw::import_dma_buf_to_vaapi(
|
||||
import.frames_rgb().as_ptr(),
|
||||
frame.fd.as_raw_fd(),
|
||||
frame.width,
|
||||
frame.height,
|
||||
frame.format,
|
||||
frame.modifier,
|
||||
frame.stride,
|
||||
frame.offset,
|
||||
)
|
||||
}?;
|
||||
unsafe {
|
||||
(*vaapi_frame.as_mut_ptr()).pts = pts;
|
||||
}
|
||||
|
||||
let cpu_nv12 = import.import_and_scale(&vaapi_frame)?;
|
||||
let import_us = t_import_start.elapsed().as_micros() as u64;
|
||||
self.stats.record_import(import_us);
|
||||
|
||||
let enc_thread = self.enc_thread.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: encode thread missing while async import is active"))?;
|
||||
match enc_thread.input_tx.try_send(cpu_nv12) {
|
||||
Ok(()) => {
|
||||
self.frames_encoded += 1;
|
||||
}
|
||||
Err(crossbeam_channel::TrySendError::Full(_frame)) => {
|
||||
tracing::warn!("Encode thread input full, dropping portal frame");
|
||||
}
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => {
|
||||
tracing::error!("Encode thread input disconnected");
|
||||
self.errored = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bail!("encoder not initialized");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -326,6 +399,15 @@ impl StatePortal {
|
||||
// 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected
|
||||
// 而非在满通道上阻塞(修复 issue #8 死锁)
|
||||
self.webrtc_rx = None;
|
||||
if let Some(mut enc_thread) = self.enc_thread.take() {
|
||||
drop(enc_thread.input_tx);
|
||||
if let Some(handle) = enc_thread.handle.take() {
|
||||
if handle.join().is_err() {
|
||||
tracing::error!("Encode thread panicked during shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.enc_import = None;
|
||||
if let Some(mut enc) = self.enc.take() {
|
||||
if let Err(e) = enc.flush() {
|
||||
tracing::error!("Flush error during shutdown: {e}");
|
||||
@@ -384,10 +466,21 @@ impl StatePortal {
|
||||
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) {
|
||||
tracing::debug!("WebRTC write frame error: {e}");
|
||||
}
|
||||
self.stats.record_send(0.0, None);
|
||||
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
|
||||
}
|
||||
if count > 0 {
|
||||
tracing::info!("WebRTC forwarded {count} frames from channel");
|
||||
tracing::debug!("WebRTC forwarded {count} frames from channel");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref enc_thread) = self.enc_thread {
|
||||
while let Ok(timing) = enc_thread.timing_rx.try_recv() {
|
||||
self.stats.record_encode_thread(
|
||||
timing.sws_us,
|
||||
timing.encode_us,
|
||||
timing.output_bytes,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,6 +488,42 @@ impl StatePortal {
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_thread_loop(
|
||||
mut encode: SwEncEncode,
|
||||
input_rx: crossbeam_channel::Receiver<CpuNv12Frame>,
|
||||
timing_tx: crossbeam_channel::Sender<EncodeThreadTiming>,
|
||||
) {
|
||||
loop {
|
||||
match input_rx.recv() {
|
||||
Ok(frame) => {
|
||||
let t_start = Instant::now();
|
||||
match encode.encode_cpu_frame(&frame) {
|
||||
Ok(()) => {
|
||||
let elapsed = t_start.elapsed().as_micros() as u64;
|
||||
let _ = timing_tx.try_send(EncodeThreadTiming {
|
||||
sws_us: 0,
|
||||
encode_us: elapsed,
|
||||
output_bytes: 0,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Encode thread error: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!("Encode thread input closed, flushing encoder");
|
||||
if let Err(e) = encode.flush() {
|
||||
tracing::error!("Encode thread flush error: {e}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!("Encode thread exiting");
|
||||
}
|
||||
|
||||
impl Drop for StatePortal {
|
||||
// 析构时自动调用 shutdown,确保编码器被刷新、资源被释放
|
||||
fn drop(&mut self) {
|
||||
@@ -510,6 +639,7 @@ mod tests {
|
||||
backend: None,
|
||||
port: 0,
|
||||
no_persist: false,
|
||||
stats: false,
|
||||
};
|
||||
let result = resolve_drm_device(&args).unwrap();
|
||||
assert_eq!(
|
||||
@@ -533,6 +663,7 @@ mod tests {
|
||||
backend: None,
|
||||
port: 0,
|
||||
no_persist: false,
|
||||
stats: false,
|
||||
};
|
||||
let result = resolve_drm_device(&args).unwrap();
|
||||
assert_eq!(result, None);
|
||||
|
||||
Reference in New Issue
Block a user