perf(portal): achieve 58-60fps PipeWire screen capture

- Force PipeWire quantum=512 via NODE_FORCE_QUANTUM (48000/512=93Hz scheduling)
- Switch to libx264 ultrafast/zerolatency with 6 threads
- Use two-phase poll_and_encode: blocking recv_timeout for first frame,
  non-blocking try_recv drain for subsequent frames
- Remove fps_limit from portal path (PW already rate-limits via quantum/KWin;
  fps_limit's min_interval was silently dropping ~10% of valid frames)
- Remove diagnostic instrumentation (TIMING/PIPEWIRE logs, timing fields,
  pw_stats counters)
- Add lightweight production stats: per-10s fps log + shutdown summary
- Prefer libx264 over libopenh264 (better quality at same speed)
This commit is contained in:
dailz
2026-05-30 08:44:15 +08:00
parent a83d146ed3
commit 74f4dc826d
6 changed files with 88 additions and 187 deletions

View File

@@ -22,6 +22,6 @@ libc = "0.2"
ashpd = { version = "0.13", features = ["tokio", "screencast"] }
zbus = { version = "5", default-features = false, features = ["tokio"] }
tokio = { version = "1", features = ["rt"] }
pipewire = "0.9"
pipewire = { version = "0.9", features = ["v0_3_45"] }
libspa = "0.9"
crossbeam-channel = "0.5"

View File

@@ -996,10 +996,10 @@ fn create_software_h264_muxer(
ff::format::context::Output,
)> {
let output_cstr = CString::new(output_path.to_str().unwrap())?;
let codec = ff::encoder::find_by_name("libopenh264")
.or_else(|| ff::encoder::find_by_name("libx264"))
let codec = ff::encoder::find_by_name("libx264")
.or_else(|| ff::encoder::find_by_name("libopenh264"))
.ok_or_else(|| {
anyhow::anyhow!("No H.264 software encoder found (tried libopenh264, libx264)")
anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)")
})?;
let codec_name = codec.name().to_string();
@@ -1024,11 +1024,14 @@ fn create_software_h264_muxer(
// SAFETY: priv_data belongs to the unopened encoder; strings live for each call.
unsafe {
let key = CString::new("preset").unwrap();
let val = CString::new("veryfast").unwrap();
let val = CString::new("ultrafast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("threads").unwrap();
let val = CString::new("6").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
}
}

View File

@@ -186,10 +186,10 @@ fn drain_encoder(
fn create_software_encoder(output_path: &Path, width: u32, height: u32) -> Result<SoftwareEncoder> {
let output_cstr = CString::new(output_path.to_str().unwrap())?;
let codec = ff::encoder::find_by_name("libopenh264")
.or_else(|| ff::encoder::find_by_name("libx264"))
let codec = ff::encoder::find_by_name("libx264")
.or_else(|| ff::encoder::find_by_name("libopenh264"))
.ok_or_else(|| {
anyhow::anyhow!("No H.264 software encoder found (tried libopenh264, libx264)")
anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)")
})?;
let codec_name = codec.name().to_string();

View File

@@ -13,6 +13,7 @@
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use anyhow::Result;
@@ -80,13 +81,10 @@ pub struct CapPortal {
struct PwThreadCtx {
frame_tx: Sender<PwDmaBufFrame>,
event_tx: Sender<PwCtrlEvent>,
dropped: AtomicU64,
dropped: Arc<AtomicU64>,
shutdown_read: OwnedFd,
/// Portal 返回的 PipeWire 远程连接文件描述符
pw_fd: OwnedFd,
/// Portal 返回的 PipeWire 节点 ID标识要捕获的屏幕流
node_id: u32,
/// 目标帧率(当前保留,未直接用于 PipeWire 协商)
fps: u32,
}
@@ -96,7 +94,7 @@ impl CapPortal {
/// 执行流程:
/// 1. 创建 Tokio 运行时(用于异步 Portal 调用)
/// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id
/// 3. 创建有界通道(容量 3)用于帧传递
/// 3. 创建有界通道(容量 16)用于帧传递
/// 4. 创建 eventfd 对,用于线程安全的关闭信号传递
/// 5. 启动 PipeWire 捕获线程
pub fn new(args: &Args) -> Result<Self> {
@@ -107,7 +105,7 @@ impl CapPortal {
// block_on 在此处同步等待异步 Portal 调用完成
let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal().await })?;
let (frame_tx, frame_rx) = bounded(3);
let (frame_tx, frame_rx) = bounded(16);
let (event_tx, event_rx) = bounded(8);
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
@@ -124,10 +122,12 @@ impl CapPortal {
return Err(anyhow::anyhow!("dup eventfd failed: {err}"));
}
let pw_dropped = Arc::new(AtomicU64::new(0));
let ctx = PwThreadCtx {
frame_tx,
event_tx,
dropped: AtomicU64::new(0),
dropped: pw_dropped.clone(),
shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) },
pw_fd,
node_id,
@@ -305,7 +305,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
shutdown_read,
pw_fd,
node_id,
fps: _fps,
fps,
} = ctx;
let mainloop = match pw::main_loop::MainLoopBox::new(None) {
@@ -344,6 +344,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
*pw::keys::MEDIA_TYPE => "Video",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_ROLE => "Screen",
*pw::keys::NODE_FORCE_QUANTUM => "512",
},
) {
Ok(s) => s,
@@ -359,9 +360,13 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let _listener = stream
.add_local_listener::<()>()
.state_changed(move |_, _, old, new| {
tracing::debug!("PipeWire stream state: {old:?} -> {new:?}");
tracing::info!("PipeWire stream state: {old:?} -> {new:?}");
match new {
pw::stream::StreamState::Error(_) | pw::stream::StreamState::Unconnected => {
pw::stream::StreamState::Error(e) => {
tracing::error!("PipeWire stream error: {e}");
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
}
pw::stream::StreamState::Unconnected => {
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
}
_ => {}
@@ -390,11 +395,16 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let drm_format = spa_to_drm_fourcc(info.format());
// 获取 DRM 修饰符,描述 GPU buffer 的内存布局(如 tiling 模式)
let modifier = info.modifier();
let framerate = info.framerate();
let max_framerate = info.max_framerate();
// 保存协商后的格式信息,供 process 回调读取
format_info.set(Some((width, height, drm_format, modifier)));
tracing::info!(
"PipeWire format negotiated: {width}x{height}, \
drm_format={drm_format:#010x}, modifier={modifier:#x}"
drm_format={drm_format:#010x}, modifier={modifier:#x}, \
framerate={}/{}, max_framerate={}/{}",
framerate.num, framerate.denom,
max_framerate.num, max_framerate.denom,
);
}
})
@@ -405,15 +415,8 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let format_info = format_info.clone();
let frame_tx = frame_tx.clone();
let dropped = dropped;
let process_count = Rc::new(Cell::new(0u64));
let process_count_clone = process_count.clone();
move |stream, _| {
let count = process_count_clone.get() + 1;
process_count_clone.set(count);
if count <= 5 || count % 60 == 0 {
tracing::info!("PipeWire process callback #{count}");
}
// 从流中出队原始 buffer包含帧数据的元信息
let raw_buf = unsafe { stream.dequeue_raw_buffer() };
if raw_buf.is_null() {
return;
@@ -519,13 +522,8 @@ fn pipewire_thread(ctx: PwThreadCtx) {
})
.register();
// 空的参数数组,不主动请求特定格式(由 PipeWire 和源端协商决定)
let mut params: [&pw::spa::pod::Pod; 0] = [];
// 连接到指定的 PipeWire 节点
// Direction::Input: 作为消费者(输入方向接收数据)
// AUTOCONNECT: 允许 PipeWire 自动连接源和消费者
// MAP_BUFFERS: 映射 buffer 到用户空间DMA-BUF 模式下必须设置)
if let Err(e) = stream.connect(
pw::spa::utils::Direction::Input,
Some(node_id),

View File

@@ -304,15 +304,14 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
poll.registry()
.register(&mut signals, mio::Token(1), mio::Interest::READABLE)?;
// 主事件循环(超时 10ms比 wlr-screencopy 更短,因为不依赖 Wayland fd 唤醒
// 10ms 超时的作用是让循环高频转动,以便及时处理 PipeWire 投递的帧
// 如果没有信号poll 最多阻塞 10ms 就会超时返回
// 主事件循环(非阻塞信号检测 + recv_timeout 等待帧
// poll 超时为 0ms非阻塞实际等待由 poll_and_encode 的 recv_timeout 实现
let mut running = true;
while running {
// poll 在此循环中只监听信号 fd,所以
// poll 在此循环中只监听信号 fd(非阻塞)
// - 收到 SIGINT/SIGTERM → 事件触发,设置 running=false
// - 超时 10ms → 事件为空,继续执行 poll_and_encode
poll.poll(&mut events, Some(std::time::Duration::from_millis(10)))
// - 无事件 → 立即返回,继续执行 poll_and_encode(内部 recv_timeout 等待帧)
poll.poll(&mut events, Some(std::time::Duration::from_millis(0)))
.unwrap_or_else(|e| {
if e.kind() == std::io::ErrorKind::Interrupted {
return;
@@ -321,7 +320,6 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
running = false;
});
// 遍历事件,检查是否收到退出信号
for event in &events {
if event.token() == mio::Token(1) {
tracing::info!("Received quit signal");
@@ -334,7 +332,9 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
// poll_and_encode 会从 PipeWire 缓冲区取出帧,
// 编码为 H.264 并推送。返回 true 表示还有更多帧待处理,
// 返回 false 表示当前没有帧了while 循环退出等待下一轮 poll
while state.poll_and_encode()? {}
if state.poll_and_encode(true)? {
while state.poll_and_encode(false)? {}
}
// Portal 状态机遇到致命错误时退出
if state.is_errored() {

View File

@@ -1,13 +1,13 @@
// 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use crate::args::Args;
use crate::avhw::{self, SwEncState};
use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame};
use crate::fps_limit::FpsLimit;
/// 门户采集的阶段状态
/// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数
@@ -22,32 +22,16 @@ enum PortalStage {
/// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。
/// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。
pub struct StatePortal {
/// 当前采集阶段
stage: PortalStage,
/// GPU 缩放 + 软件编码器状态(第一帧到达后才初始化)
enc: Option<SwEncState>,
/// 帧率限制器
fps_limit: FpsLimit<()>,
/// PipeWire 屏幕采集端点
cap: CapPortal,
/// 命令行参数
args: Args,
/// 是否遇到错误
errored: bool,
/// 是否为第一帧(首帧跳过帧率限制)
first_frame: bool,
/// DRM 渲染设备路径(如 /dev/dri/renderD128None 表示首帧自动检测
drm_device: Option<PathBuf>,
/// 第一帧的时间戳(纳秒),用于计算相对 PTS
first_pts_ns: Option<i64>,
/// Diagnostic: frames received from PipeWire channel
frames_received: u64,
/// Diagnostic: frames dropped by FPS limiter
frames_fps_dropped: u64,
/// Diagnostic: frames successfully encoded
frames_encoded: u64,
/// Diagnostic: last time we printed stats
last_stats_time: Option<std::time::Instant>,
start_time: Option<Instant>,
last_stats_time: Option<Instant>,
last_stats_frames: u64,
}
impl StatePortal {
@@ -67,25 +51,23 @@ impl StatePortal {
Ok(Self {
stage: PortalStage::WaitingForFormat,
enc: None,
fps_limit: FpsLimit::new(args.fps),
cap,
args,
errored: false,
first_frame: true,
drm_device,
first_pts_ns: None,
frames_received: 0,
frames_fps_dropped: 0,
frames_encoded: 0,
start_time: None,
last_stats_time: None,
last_stats_frames: 0,
})
}
/// 轮询 PipeWire 事件并编码帧
///
/// 尝试从采集端点接收一帧事件。返回 `Ok(true)` 表示已处理事件
/// `Ok(false)` 表示暂无数据。内部根据当前阶段(等待格式/流式)分发处理
pub fn poll_and_encode(&mut self) -> Result<bool> {
/// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms
/// `block=false` 时使用 try_recv 非阻塞检查
/// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。
pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> {
if let Ok(ctrl) = self.cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => {
@@ -101,13 +83,16 @@ impl StatePortal {
}
}
let frame = match self.cap.frame_receiver().try_recv() {
Ok(frame) => {
self.frames_received += 1;
tracing::debug!("poll_and_encode: got frame #{} from channel", self.frames_received);
frame
let frame = if block {
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) {
Ok(frame) => frame,
Err(_) => return Ok(false),
}
} else {
match self.cap.frame_receiver().try_recv() {
Ok(frame) => frame,
Err(_) => return Ok(false),
}
Err(_) => return Ok(false),
};
match self.stage {
@@ -150,6 +135,8 @@ impl StatePortal {
self.enc = Some(enc);
self.stage = PortalStage::Streaming;
self.start_time = Some(Instant::now());
self.last_stats_time = Some(Instant::now());
tracing::info!("First frame processed, encoder initialized, transitioning to Streaming");
drop(frame);
}
@@ -208,27 +195,11 @@ impl StatePortal {
/// 通过 `av_hwframe_map` 零拷贝导入 VAAPI然后交给 SwEncState 完成:
/// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。
fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> {
if self.first_frame {
self.first_frame = false;
} else {
let now = std::time::Instant::now();
if self.fps_limit.on_new_frame((), now).is_none() {
self.frames_fps_dropped += 1;
tracing::debug!("handle_pw_frame: FPS limit, dropping frame (#{})", self.frames_fps_dropped);
self.maybe_print_stats(now);
return Ok(());
}
}
tracing::debug!("handle_pw_frame: processing frame, pts={}", frame.pts);
let enc = match self.enc.as_mut() {
Some(enc) => enc,
None => bail!("encoder not initialized"),
};
// SAFETY: frames_rgb is a live VAAPI frames context configured for capture; frame carries
// valid DMA-BUF fd/format/modifier/stride/offset metadata for the duration of this call.
let mut vaapi_frame = unsafe {
avhw::import_dma_buf_to_vaapi(
enc.frames_rgb().as_ptr(),
@@ -242,39 +213,31 @@ impl StatePortal {
)
}?;
tracing::debug!("handle_pw_frame: DMA-BUF import OK");
let pts = compute_pts(&mut self.first_pts_ns, frame.pts, self.args.fps);
let pts = self.frames_encoded as i64;
unsafe {
(*vaapi_frame.as_mut_ptr()).pts = pts;
}
enc.encode_frame(&vaapi_frame)?;
self.frames_encoded += 1;
tracing::info!("handle_pw_frame: frame #{} encoded OK, pts={}", self.frames_encoded, pts);
let now = std::time::Instant::now();
self.maybe_print_stats(now);
if let Some(last) = self.last_stats_time {
if last.elapsed() >= Duration::from_secs(10) {
let delta_frames = self.frames_encoded - self.last_stats_frames;
let delta_secs = last.elapsed().as_secs_f64();
let fps = delta_frames as f64 / delta_secs;
tracing::info!(
"encoded={}, fps={fps:.1}",
self.frames_encoded,
);
self.last_stats_time = Some(Instant::now());
self.last_stats_frames = self.frames_encoded;
}
}
Ok(())
}
fn maybe_print_stats(&mut self, now: std::time::Instant) {
let should_print = match self.last_stats_time {
None => true,
Some(last) => now.duration_since(last) >= std::time::Duration::from_secs(2),
};
if should_print {
self.last_stats_time = Some(now);
tracing::info!(
"STATS: received={}, fps_dropped={}, encoded={}",
self.frames_received,
self.frames_fps_dropped,
self.frames_encoded,
);
}
}
/// 关闭状态:刷新编码器并清理资源
///
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
@@ -284,6 +247,18 @@ impl StatePortal {
tracing::error!("Flush error during shutdown: {e}");
}
}
if let Some(start) = self.start_time {
if self.frames_encoded > 0 {
let elapsed = start.elapsed().as_secs_f64();
let fps = self.frames_encoded as f64 / elapsed;
tracing::info!(
"Total: {} frames in {:.1}s, avg {:.1}fps",
self.frames_encoded,
elapsed,
fps,
);
}
}
tracing::info!("StatePortal shutdown complete");
}
@@ -316,17 +291,6 @@ fn portal_encode_dimensions(width: u32, height: u32) -> (u32, u32) {
}
}
/// Convert PipeWire nanosecond PTS to encoder frame-number units.
///
/// Uses elapsed time since the first frame to avoid i64 overflow on absolute timestamps.
/// PipeWire PTS is CLOCK_MONOTONIC in nanoseconds; encoder time_base = 1/fps.
fn compute_pts(first_pts_ns: &mut Option<i64>, frame_pts: i64, fps: u32) -> i64 {
let fps_i64 = fps as i64;
let base_ns = *first_pts_ns.get_or_insert(frame_pts.max(0));
let elapsed_ns = (frame_pts.max(0) - base_ns).max(0);
elapsed_ns * fps_i64 / 1_000_000_000
}
/// 解析 DRM 渲染设备路径
///
/// 仅使用命令行指定的设备路径;未指定则在首帧到达时自动检测。
@@ -452,68 +416,4 @@ mod tests {
assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4);
}
// --- compute_pts tests ---
#[test]
fn compute_pts_first_frame_is_zero() {
let mut base = None;
let pts = compute_pts(&mut base, 1_000_000_000, 30);
assert_eq!(pts, 0);
assert_eq!(base, Some(1_000_000_000));
}
#[test]
fn compute_pts_second_frame_at_30fps() {
let mut base = Some(1_000_000_000);
// 33_333_333 * 30 / 1_000_000_000 = 0 (integer division)
let pts = compute_pts(&mut base, 1_000_000_000 + 33_333_333, 30);
assert_eq!(pts, 0);
// 100ms later = frame 3
let pts = compute_pts(&mut base, 1_000_000_000 + 100_000_000, 30);
assert_eq!(pts, 3);
}
#[test]
fn compute_pts_multiple_frames_accumulate() {
let mut base = None;
let fps = 60;
let pts0 = compute_pts(&mut base, 0, fps);
assert_eq!(pts0, 0);
let pts1 = compute_pts(&mut base, 16_666_666, fps);
assert_eq!(pts1, 0); // 16_666_666 * 60 / 1_000_000_000 = 0
let pts2 = compute_pts(&mut base, 33_333_333, fps);
assert_eq!(pts2, 1); // 33_333_333 * 60 / 1_000_000_000 = 1
let pts3 = compute_pts(&mut base, 50_000_000, fps);
assert_eq!(pts3, 3); // 50ms * 60 / 1000 = 3
}
#[test]
fn compute_pts_negative_pts_clamped_to_zero() {
let mut base = None;
let pts = compute_pts(&mut base, -999_999, 30);
assert_eq!(pts, 0);
assert_eq!(base, Some(0)); // max(0) clamps negative
}
#[test]
fn compute_pts_late_frame_after_negative() {
let mut base = Some(0);
let pts = compute_pts(&mut base, 1_000_000_000, 30);
assert_eq!(pts, 30);
}
#[test]
fn compute_pts_base_not_overwritten_after_first_call() {
let mut base = None;
let _ = compute_pts(&mut base, 5_000_000_000, 30);
assert_eq!(base, Some(5_000_000_000));
let _ = compute_pts(&mut base, 10_000_000_000, 30);
assert_eq!(base, Some(5_000_000_000)); // base stays at first frame
}
}