diff --git a/Cargo.toml b/Cargo.toml index 431d9d7..81290c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,6 @@ libc = "0.2" ashpd = { version = "0.13", features = ["tokio", "screencast"] } zbus = { version = "5", default-features = false, features = ["tokio"] } tokio = { version = "1", features = ["rt"] } -pipewire = "0.9" +pipewire = { version = "0.9", features = ["v0_3_45"] } libspa = "0.9" crossbeam-channel = "0.5" diff --git a/src/avhw.rs b/src/avhw.rs index 8198b7d..9a73a69 100644 --- a/src/avhw.rs +++ b/src/avhw.rs @@ -996,10 +996,10 @@ fn create_software_h264_muxer( ff::format::context::Output, )> { let output_cstr = CString::new(output_path.to_str().unwrap())?; - let codec = ff::encoder::find_by_name("libopenh264") - .or_else(|| ff::encoder::find_by_name("libx264")) + let codec = ff::encoder::find_by_name("libx264") + .or_else(|| ff::encoder::find_by_name("libopenh264")) .ok_or_else(|| { - anyhow::anyhow!("No H.264 software encoder found (tried libopenh264, libx264)") + anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)") })?; let codec_name = codec.name().to_string(); @@ -1024,11 +1024,14 @@ fn create_software_h264_muxer( // SAFETY: priv_data belongs to the unopened encoder; strings live for each call. unsafe { let key = CString::new("preset").unwrap(); - let val = CString::new("veryfast").unwrap(); + let val = CString::new("ultrafast").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); let key = CString::new("tune").unwrap(); let val = CString::new("zerolatency").unwrap(); ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); + let key = CString::new("threads").unwrap(); + let val = CString::new("6").unwrap(); + ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0); } } diff --git a/src/bin/vaapi_import_bench.rs b/src/bin/vaapi_import_bench.rs index ba2a898..6af1b94 100644 --- a/src/bin/vaapi_import_bench.rs +++ b/src/bin/vaapi_import_bench.rs @@ -186,10 +186,10 @@ fn drain_encoder( fn create_software_encoder(output_path: &Path, width: u32, height: u32) -> Result { let output_cstr = CString::new(output_path.to_str().unwrap())?; - let codec = ff::encoder::find_by_name("libopenh264") - .or_else(|| ff::encoder::find_by_name("libx264")) + let codec = ff::encoder::find_by_name("libx264") + .or_else(|| ff::encoder::find_by_name("libopenh264")) .ok_or_else(|| { - anyhow::anyhow!("No H.264 software encoder found (tried libopenh264, libx264)") + anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)") })?; let codec_name = codec.name().to_string(); diff --git a/src/cap_portal.rs b/src/cap_portal.rs index 5332057..9c157f1 100644 --- a/src/cap_portal.rs +++ b/src/cap_portal.rs @@ -13,6 +13,7 @@ use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::thread::{self, JoinHandle}; use anyhow::Result; @@ -80,13 +81,10 @@ pub struct CapPortal { struct PwThreadCtx { frame_tx: Sender, event_tx: Sender, - dropped: AtomicU64, + dropped: Arc, shutdown_read: OwnedFd, - /// Portal 返回的 PipeWire 远程连接文件描述符 pw_fd: OwnedFd, - /// Portal 返回的 PipeWire 节点 ID,标识要捕获的屏幕流 node_id: u32, - /// 目标帧率(当前保留,未直接用于 PipeWire 协商) fps: u32, } @@ -96,7 +94,7 @@ impl CapPortal { /// 执行流程: /// 1. 创建 Tokio 运行时(用于异步 Portal 调用) /// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id - /// 3. 创建有界通道(容量 3)用于帧传递 + /// 3. 创建有界通道(容量 16)用于帧传递 /// 4. 创建 eventfd 对,用于线程安全的关闭信号传递 /// 5. 启动 PipeWire 捕获线程 pub fn new(args: &Args) -> Result { @@ -107,7 +105,7 @@ impl CapPortal { // block_on 在此处同步等待异步 Portal 调用完成 let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal().await })?; - let (frame_tx, frame_rx) = bounded(3); + let (frame_tx, frame_rx) = bounded(16); let (event_tx, event_rx) = bounded(8); let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; @@ -124,10 +122,12 @@ impl CapPortal { return Err(anyhow::anyhow!("dup eventfd failed: {err}")); } + let pw_dropped = Arc::new(AtomicU64::new(0)); + let ctx = PwThreadCtx { frame_tx, event_tx, - dropped: AtomicU64::new(0), + dropped: pw_dropped.clone(), shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) }, pw_fd, node_id, @@ -305,7 +305,7 @@ fn pipewire_thread(ctx: PwThreadCtx) { shutdown_read, pw_fd, node_id, - fps: _fps, + fps, } = ctx; let mainloop = match pw::main_loop::MainLoopBox::new(None) { @@ -344,6 +344,7 @@ fn pipewire_thread(ctx: PwThreadCtx) { *pw::keys::MEDIA_TYPE => "Video", *pw::keys::MEDIA_CATEGORY => "Capture", *pw::keys::MEDIA_ROLE => "Screen", + *pw::keys::NODE_FORCE_QUANTUM => "512", }, ) { Ok(s) => s, @@ -359,9 +360,13 @@ fn pipewire_thread(ctx: PwThreadCtx) { let _listener = stream .add_local_listener::<()>() .state_changed(move |_, _, old, new| { - tracing::debug!("PipeWire stream state: {old:?} -> {new:?}"); + tracing::info!("PipeWire stream state: {old:?} -> {new:?}"); match new { - pw::stream::StreamState::Error(_) | pw::stream::StreamState::Unconnected => { + pw::stream::StreamState::Error(e) => { + tracing::error!("PipeWire stream error: {e}"); + let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded); + } + pw::stream::StreamState::Unconnected => { let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded); } _ => {} @@ -390,11 +395,16 @@ fn pipewire_thread(ctx: PwThreadCtx) { let drm_format = spa_to_drm_fourcc(info.format()); // 获取 DRM 修饰符,描述 GPU buffer 的内存布局(如 tiling 模式) let modifier = info.modifier(); + let framerate = info.framerate(); + let max_framerate = info.max_framerate(); // 保存协商后的格式信息,供 process 回调读取 format_info.set(Some((width, height, drm_format, modifier))); tracing::info!( "PipeWire format negotiated: {width}x{height}, \ - drm_format={drm_format:#010x}, modifier={modifier:#x}" + drm_format={drm_format:#010x}, modifier={modifier:#x}, \ + framerate={}/{}, max_framerate={}/{}", + framerate.num, framerate.denom, + max_framerate.num, max_framerate.denom, ); } }) @@ -405,15 +415,8 @@ fn pipewire_thread(ctx: PwThreadCtx) { let format_info = format_info.clone(); let frame_tx = frame_tx.clone(); let dropped = dropped; - let process_count = Rc::new(Cell::new(0u64)); - let process_count_clone = process_count.clone(); move |stream, _| { - let count = process_count_clone.get() + 1; - process_count_clone.set(count); - if count <= 5 || count % 60 == 0 { - tracing::info!("PipeWire process callback #{count}"); - } - // 从流中出队原始 buffer(包含帧数据的元信息) + let raw_buf = unsafe { stream.dequeue_raw_buffer() }; if raw_buf.is_null() { return; @@ -519,13 +522,8 @@ fn pipewire_thread(ctx: PwThreadCtx) { }) .register(); - // 空的参数数组,不主动请求特定格式(由 PipeWire 和源端协商决定) let mut params: [&pw::spa::pod::Pod; 0] = []; - // 连接到指定的 PipeWire 节点 - // Direction::Input: 作为消费者(输入方向接收数据) - // AUTOCONNECT: 允许 PipeWire 自动连接源和消费者 - // MAP_BUFFERS: 映射 buffer 到用户空间(DMA-BUF 模式下必须设置) if let Err(e) = stream.connect( pw::spa::utils::Direction::Input, Some(node_id), diff --git a/src/main.rs b/src/main.rs index 2990b12..074a5ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -304,15 +304,14 @@ fn run_portal_pipewire(args: Args) -> Result<()> { poll.registry() .register(&mut signals, mio::Token(1), mio::Interest::READABLE)?; - // 主事件循环(超时 10ms,比 wlr-screencopy 更短,因为不依赖 Wayland fd 唤醒) - // 10ms 超时的作用是让循环高频转动,以便及时处理 PipeWire 投递的帧 - // 如果没有信号,poll 最多阻塞 10ms 就会超时返回 + // 主事件循环(非阻塞信号检测 + recv_timeout 等待帧) + // poll 超时为 0ms(非阻塞),实际等待由 poll_and_encode 的 recv_timeout 实现 let mut running = true; while running { - // poll 在此循环中只监听信号 fd,所以: + // poll 在此循环中只监听信号 fd(非阻塞): // - 收到 SIGINT/SIGTERM → 事件触发,设置 running=false - // - 超时 10ms → 事件为空,继续执行 poll_and_encode - poll.poll(&mut events, Some(std::time::Duration::from_millis(10))) + // - 无事件 → 立即返回,继续执行 poll_and_encode(内部 recv_timeout 等待帧) + poll.poll(&mut events, Some(std::time::Duration::from_millis(0))) .unwrap_or_else(|e| { if e.kind() == std::io::ErrorKind::Interrupted { return; @@ -321,7 +320,6 @@ fn run_portal_pipewire(args: Args) -> Result<()> { running = false; }); - // 遍历事件,检查是否收到退出信号 for event in &events { if event.token() == mio::Token(1) { tracing::info!("Received quit signal"); @@ -334,7 +332,9 @@ fn run_portal_pipewire(args: Args) -> Result<()> { // poll_and_encode 会从 PipeWire 缓冲区取出帧, // 编码为 H.264 并推送。返回 true 表示还有更多帧待处理, // 返回 false 表示当前没有帧了,while 循环退出等待下一轮 poll - while state.poll_and_encode()? {} + if state.poll_and_encode(true)? { + while state.poll_and_encode(false)? {} + } // Portal 状态机遇到致命错误时退出 if state.is_errored() { diff --git a/src/state_portal.rs b/src/state_portal.rs index 4206092..8ab1815 100644 --- a/src/state_portal.rs +++ b/src/state_portal.rs @@ -1,13 +1,13 @@ // 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码 use std::os::fd::AsRawFd; use std::path::PathBuf; +use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use crate::args::Args; use crate::avhw::{self, SwEncState}; use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame}; -use crate::fps_limit::FpsLimit; /// 门户采集的阶段状态 /// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数 @@ -22,32 +22,16 @@ enum PortalStage { /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 /// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。 pub struct StatePortal { - /// 当前采集阶段 stage: PortalStage, - /// GPU 缩放 + 软件编码器状态(第一帧到达后才初始化) enc: Option, - /// 帧率限制器 - fps_limit: FpsLimit<()>, - /// PipeWire 屏幕采集端点 cap: CapPortal, - /// 命令行参数 args: Args, - /// 是否遇到错误 errored: bool, - /// 是否为第一帧(首帧跳过帧率限制) - first_frame: bool, - /// DRM 渲染设备路径(如 /dev/dri/renderD128);None 表示首帧自动检测 drm_device: Option, - /// 第一帧的时间戳(纳秒),用于计算相对 PTS - first_pts_ns: Option, - /// Diagnostic: frames received from PipeWire channel - frames_received: u64, - /// Diagnostic: frames dropped by FPS limiter - frames_fps_dropped: u64, - /// Diagnostic: frames successfully encoded frames_encoded: u64, - /// Diagnostic: last time we printed stats - last_stats_time: Option, + start_time: Option, + last_stats_time: Option, + last_stats_frames: u64, } impl StatePortal { @@ -67,25 +51,23 @@ impl StatePortal { Ok(Self { stage: PortalStage::WaitingForFormat, enc: None, - fps_limit: FpsLimit::new(args.fps), cap, args, errored: false, - first_frame: true, drm_device, - first_pts_ns: None, - frames_received: 0, - frames_fps_dropped: 0, frames_encoded: 0, + start_time: None, last_stats_time: None, + last_stats_frames: 0, }) } /// 轮询 PipeWire 事件并编码帧 /// - /// 尝试从采集端点接收一帧事件。返回 `Ok(true)` 表示已处理事件, - /// `Ok(false)` 表示暂无数据。内部根据当前阶段(等待格式/流式)分发处理。 - pub fn poll_and_encode(&mut self) -> Result { + /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms), + /// `block=false` 时使用 try_recv 非阻塞检查。 + /// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。 + pub fn poll_and_encode(&mut self, block: bool) -> Result { if let Ok(ctrl) = self.cap.event_receiver().try_recv() { match ctrl { PwCtrlEvent::StreamEnded => { @@ -101,13 +83,16 @@ impl StatePortal { } } - let frame = match self.cap.frame_receiver().try_recv() { - Ok(frame) => { - self.frames_received += 1; - tracing::debug!("poll_and_encode: got frame #{} from channel", self.frames_received); - frame + let frame = if block { + match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) { + Ok(frame) => frame, + Err(_) => return Ok(false), + } + } else { + match self.cap.frame_receiver().try_recv() { + Ok(frame) => frame, + Err(_) => return Ok(false), } - Err(_) => return Ok(false), }; match self.stage { @@ -150,6 +135,8 @@ impl StatePortal { self.enc = Some(enc); self.stage = PortalStage::Streaming; + self.start_time = Some(Instant::now()); + self.last_stats_time = Some(Instant::now()); tracing::info!("First frame processed, encoder initialized, transitioning to Streaming"); drop(frame); } @@ -208,27 +195,11 @@ impl StatePortal { /// 通过 `av_hwframe_map` 零拷贝导入 VAAPI,然后交给 SwEncState 完成: /// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。 fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> { - if self.first_frame { - self.first_frame = false; - } else { - let now = std::time::Instant::now(); - if self.fps_limit.on_new_frame((), now).is_none() { - self.frames_fps_dropped += 1; - tracing::debug!("handle_pw_frame: FPS limit, dropping frame (#{})", self.frames_fps_dropped); - self.maybe_print_stats(now); - return Ok(()); - } - } - - tracing::debug!("handle_pw_frame: processing frame, pts={}", frame.pts); - let enc = match self.enc.as_mut() { Some(enc) => enc, None => bail!("encoder not initialized"), }; - // SAFETY: frames_rgb is a live VAAPI frames context configured for capture; frame carries - // valid DMA-BUF fd/format/modifier/stride/offset metadata for the duration of this call. let mut vaapi_frame = unsafe { avhw::import_dma_buf_to_vaapi( enc.frames_rgb().as_ptr(), @@ -242,39 +213,31 @@ impl StatePortal { ) }?; - tracing::debug!("handle_pw_frame: DMA-BUF import OK"); - - let pts = compute_pts(&mut self.first_pts_ns, frame.pts, self.args.fps); + let pts = self.frames_encoded as i64; unsafe { (*vaapi_frame.as_mut_ptr()).pts = pts; } enc.encode_frame(&vaapi_frame)?; self.frames_encoded += 1; - tracing::info!("handle_pw_frame: frame #{} encoded OK, pts={}", self.frames_encoded, pts); - let now = std::time::Instant::now(); - self.maybe_print_stats(now); + if let Some(last) = self.last_stats_time { + if last.elapsed() >= Duration::from_secs(10) { + let delta_frames = self.frames_encoded - self.last_stats_frames; + let delta_secs = last.elapsed().as_secs_f64(); + let fps = delta_frames as f64 / delta_secs; + tracing::info!( + "encoded={}, fps={fps:.1}", + self.frames_encoded, + ); + self.last_stats_time = Some(Instant::now()); + self.last_stats_frames = self.frames_encoded; + } + } Ok(()) } - fn maybe_print_stats(&mut self, now: std::time::Instant) { - let should_print = match self.last_stats_time { - None => true, - Some(last) => now.duration_since(last) >= std::time::Duration::from_secs(2), - }; - if should_print { - self.last_stats_time = Some(now); - tracing::info!( - "STATS: received={}, fps_dropped={}, encoded={}", - self.frames_received, - self.frames_fps_dropped, - self.frames_encoded, - ); - } - } - /// 关闭状态:刷新编码器并清理资源 /// /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 @@ -284,6 +247,18 @@ impl StatePortal { tracing::error!("Flush error during shutdown: {e}"); } } + if let Some(start) = self.start_time { + if self.frames_encoded > 0 { + let elapsed = start.elapsed().as_secs_f64(); + let fps = self.frames_encoded as f64 / elapsed; + tracing::info!( + "Total: {} frames in {:.1}s, avg {:.1}fps", + self.frames_encoded, + elapsed, + fps, + ); + } + } tracing::info!("StatePortal shutdown complete"); } @@ -316,17 +291,6 @@ fn portal_encode_dimensions(width: u32, height: u32) -> (u32, u32) { } } -/// Convert PipeWire nanosecond PTS to encoder frame-number units. -/// -/// Uses elapsed time since the first frame to avoid i64 overflow on absolute timestamps. -/// PipeWire PTS is CLOCK_MONOTONIC in nanoseconds; encoder time_base = 1/fps. -fn compute_pts(first_pts_ns: &mut Option, frame_pts: i64, fps: u32) -> i64 { - let fps_i64 = fps as i64; - let base_ns = *first_pts_ns.get_or_insert(frame_pts.max(0)); - let elapsed_ns = (frame_pts.max(0) - base_ns).max(0); - elapsed_ns * fps_i64 / 1_000_000_000 -} - /// 解析 DRM 渲染设备路径 /// /// 仅使用命令行指定的设备路径;未指定则在首帧到达时自动检测。 @@ -452,68 +416,4 @@ mod tests { assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4); } - // --- compute_pts tests --- - - #[test] - fn compute_pts_first_frame_is_zero() { - let mut base = None; - let pts = compute_pts(&mut base, 1_000_000_000, 30); - assert_eq!(pts, 0); - assert_eq!(base, Some(1_000_000_000)); - } - - #[test] - fn compute_pts_second_frame_at_30fps() { - let mut base = Some(1_000_000_000); - // 33_333_333 * 30 / 1_000_000_000 = 0 (integer division) - let pts = compute_pts(&mut base, 1_000_000_000 + 33_333_333, 30); - assert_eq!(pts, 0); - - // 100ms later = frame 3 - let pts = compute_pts(&mut base, 1_000_000_000 + 100_000_000, 30); - assert_eq!(pts, 3); - } - - #[test] - fn compute_pts_multiple_frames_accumulate() { - let mut base = None; - let fps = 60; - - let pts0 = compute_pts(&mut base, 0, fps); - assert_eq!(pts0, 0); - - let pts1 = compute_pts(&mut base, 16_666_666, fps); - assert_eq!(pts1, 0); // 16_666_666 * 60 / 1_000_000_000 = 0 - - let pts2 = compute_pts(&mut base, 33_333_333, fps); - assert_eq!(pts2, 1); // 33_333_333 * 60 / 1_000_000_000 = 1 - - let pts3 = compute_pts(&mut base, 50_000_000, fps); - assert_eq!(pts3, 3); // 50ms * 60 / 1000 = 3 - } - - #[test] - fn compute_pts_negative_pts_clamped_to_zero() { - let mut base = None; - let pts = compute_pts(&mut base, -999_999, 30); - assert_eq!(pts, 0); - assert_eq!(base, Some(0)); // max(0) clamps negative - } - - #[test] - fn compute_pts_late_frame_after_negative() { - let mut base = Some(0); - let pts = compute_pts(&mut base, 1_000_000_000, 30); - assert_eq!(pts, 30); - } - - #[test] - fn compute_pts_base_not_overwritten_after_first_call() { - let mut base = None; - let _ = compute_pts(&mut base, 5_000_000_000, 30); - assert_eq!(base, Some(5_000_000_000)); - - let _ = compute_pts(&mut base, 10_000_000_000, 30); - assert_eq!(base, Some(5_000_000_000)); // base stays at first frame - } }