refactor(cap_portal): split PwEvent into separate ctrl/frame channels

- Rename PwEvent to PwCtrlEvent, separate frame data into its own channel
- Add null chunk check to prevent crash on malformed PipeWire buffer
- Remove redundant inline comments and signal handlers
- Use try_send for error events to avoid blocking on full channel
This commit is contained in:
dailz
2026-05-27 09:25:00 +08:00
parent 60a55c17f2
commit 715a9c0bab

View File

@@ -45,13 +45,11 @@ pub struct PwDmaBufFrame {
pub pts: i64,
}
/// PipeWire 事件枚举
/// PipeWire 控制事件枚举
///
/// 从 PipeWire 捕获线程发送给消费者的事件类型
/// 消费者通过 frame_receiver() 获取的 Receiver 接收这些事件
pub enum PwEvent {
/// 收到一帧新的 DMA-BUF 视频帧
Frame(PwDmaBufFrame),
/// 从 PipeWire 捕获线程发送给消费者的控制事件。
/// 与帧数据分离,通过独立的 channel 传输,确保控制事件不被帧数据淹没
pub enum PwCtrlEvent {
/// 流已结束PipeWire 流断开连接或进入错误状态)
StreamEnded,
/// 发生错误,包含错误描述信息
@@ -68,13 +66,10 @@ pub enum PwEvent {
/// 2. frame_receiver() — 获取帧接收端,供消费者轮询
/// 3. Drop — 通过 eventfd 通知 PipeWire 线程安全退出
pub struct CapPortal {
/// eventfd 的写入端,用于在 drop 时通知 PipeWire 线程退出
shutdown_fd: OwnedFd,
/// 帧事件接收端,消费者通过此 Receiver 获取帧数据
frame_rx: Receiver<PwEvent>,
/// PipeWire 捕获线程的 JoinHandledrop 时等待线程退出
frame_rx: Receiver<PwDmaBufFrame>,
event_rx: Receiver<PwCtrlEvent>,
pw_thread: Option<JoinHandle<()>>,
/// Tokio 运行时,仅用于 setup_portal() 中的异步 Portal 调用
rt: Runtime,
}
@@ -83,11 +78,9 @@ pub struct CapPortal {
/// 从主线程传递给 PipeWire 捕获线程的所有必要资源。
/// 该结构体在线程创建时一次性 move 到线程中使用。
struct PwThreadCtx {
/// 帧事件发送端,用于向消费者线程发送帧数据或错误/结束事件
frame_tx: Sender<PwEvent>,
/// 已丢弃帧的计数器(原子操作),用于统计因通道满而丢弃的帧数
frame_tx: Sender<PwDmaBufFrame>,
event_tx: Sender<PwCtrlEvent>,
dropped: AtomicU64,
/// eventfd 的读取端,注册到 PipeWire 事件循环中,用于接收关闭信号
shutdown_read: OwnedFd,
/// Portal 返回的 PipeWire 远程连接文件描述符
pw_fd: OwnedFd,
@@ -116,14 +109,9 @@ impl CapPortal {
Self::setup_portal().await
})?;
// 创建有界通道,容量为 3 帧
// 使用有界通道实现背压:当消费者处理不过来时,生产者会丢弃帧而非无限堆积
let (frame_tx, frame_rx) = bounded(3);
let (event_tx, event_rx) = bounded(8);
// 创建 eventfd 对,用于线程安全的关闭信号传递
// eventfd 是 Linux 内核提供的轻量级进程/线程间通知机制
// 写入端保存在 CapPortal主线程读取端注册到 PipeWire 事件循环中
// 这样 CapPortal drop 时可以安全地通知 PipeWire 线程退出
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
if efd < 0 {
return Err(anyhow::anyhow!(
@@ -131,8 +119,6 @@ impl CapPortal {
std::io::Error::last_os_error()
));
}
// 复制 eventfd 得到写入端,原始 fd 作为读取端
// 需要 dup 是因为读取端和写入端需要各自独立的 OwnedFd 所有权
let write_fd = unsafe { libc::dup(efd) };
if write_fd < 0 {
let err = std::io::Error::last_os_error();
@@ -140,9 +126,9 @@ impl CapPortal {
return Err(anyhow::anyhow!("dup eventfd failed: {err}"));
}
// 构建 PipeWire 线程上下文,将所有必要资源 move 进去
let ctx = PwThreadCtx {
frame_tx,
event_tx,
dropped: AtomicU64::new(0),
shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) },
pw_fd,
@@ -150,28 +136,33 @@ impl CapPortal {
fps: args.fps,
};
// 启动 PipeWire 捕获线程,命名便于调试和性能分析
let pw_thread = thread::Builder::new()
.name("pipewire-capture".into())
.spawn(move || {
pipewire_thread(ctx);
})
.map_err(|e| {
unsafe { libc::close(write_fd) };
anyhow::anyhow!("thread spawn failed: {e}")
})?;
Ok(Self {
shutdown_fd: unsafe { OwnedFd::from_raw_fd(write_fd) },
frame_rx,
event_rx,
pw_thread: Some(pw_thread),
rt,
})
}
/// 获取帧事件接收端的引用
///
/// 消费者通过此方法获取 Receiver然后不断接收 PwEvent 事件来获取帧数据。
pub fn frame_receiver(&self) -> &Receiver<PwEvent> {
pub fn frame_receiver(&self) -> &Receiver<PwDmaBufFrame> {
&self.frame_rx
}
pub fn event_receiver(&self) -> &Receiver<PwCtrlEvent> {
&self.event_rx
}
/// 通过 XDG Desktop Portal 建立屏幕录制会话
///
/// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。
@@ -309,10 +300,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
// 不调用 pw::deinit()——进程退出时全局状态由 OS 回收。
pw::init();
// 解构上下文,取出所有必要资源
// fps 重命名为 _fps 表示当前未使用(保留供将来帧率控制使用)
let PwThreadCtx {
frame_tx,
event_tx,
dropped,
shutdown_read,
pw_fd,
@@ -320,31 +310,26 @@ fn pipewire_thread(ctx: PwThreadCtx) {
fps: _fps,
} = ctx;
// 创建 PipeWire MainLoop主事件循环
// MainLoopBox 是栈分配的 PipeWire 主循环封装
let mainloop = match pw::main_loop::MainLoopBox::new(None) {
Ok(ml) => ml,
Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("MainLoop::new failed: {e}")));
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}")));
return;
}
};
// 创建 PipeWire Context用于管理核心对象和协议处理
let context = match pw::context::ContextBox::new(mainloop.loop_(), None) {
Ok(c) => c,
Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("Context::new failed: {e}")));
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}")));
return;
}
};
// 使用 Portal 提供的 fd 连接到 PipeWire 核心守护进程
// connect_fd 接管该 fd 的所有权(通过 dup不关闭原始 fd
let core = match context.connect_fd(pw_fd, None) {
Ok(c) => c,
Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!(
let _ = event_tx.try_send(PwCtrlEvent::Error(format!(
"connect_fd failed: {e}"
)));
return;
@@ -367,33 +352,23 @@ fn pipewire_thread(ctx: PwThreadCtx) {
) {
Ok(s) => s,
Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("Stream::new failed: {e}")));
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}")));
return;
}
};
// 共享的格式状态: (宽度, 高度, DRM FourCC 格式, 修饰符)
// 使用 Rc<Cell<>> 因为 PipeWire 回调在同一个线程内执行,无需跨线程同步
// Cell<Option<...>> 允许在不可变引用中修改值(内部可变性)
// format_info 在 param_changed 回调中设置,在 process 回调中读取
let format_info: Rc<Cell<Option<(u32, u32, u32, u64)>>> =
Rc::new(Cell::new(None));
let frame_tx_clone = frame_tx.clone();
// 注册流事件监听器,包含三个回调:
// - state_changed: 流状态变化通知
// - param_changed: 格式协商完成通知
// - process: 每帧数据处理
let event_tx_state = event_tx.clone();
let _listener = stream
.add_local_listener::<()>()
// 流状态变化回调
// 当流进入 Error 或 Unconnected 状态时,通知消费者流已结束
.state_changed(move |_, _, old, new| {
tracing::debug!("PipeWire stream state: {old:?} -> {new:?}");
match new {
pw::stream::StreamState::Error(_)
| pw::stream::StreamState::Unconnected => {
let _ = frame_tx_clone.send(PwEvent::StreamEnded);
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
}
_ => {}
}
@@ -468,7 +443,10 @@ fn pipewire_thread(ctx: PwThreadCtx) {
return;
}
// 获取 chunk 信息,包含帧数据在 DMA-BUF 中的偏移量和行跨度
if data_ref.as_raw().chunk.is_null() {
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
let chunk = data_ref.chunk();
let offset = chunk.offset() as u64;
let stride = chunk.stride() as u32;
@@ -527,19 +505,14 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pts,
};
// 尝试非阻塞发送帧到通道
// 如果通道已满(消费者处理不过来),丢弃该帧并增加丢弃计数
// 每 30 帧丢弃时输出一条警告日志,避免日志洪泛
if let Err(crossbeam_channel::TrySendError::Full(_)) =
frame_tx.try_send(PwEvent::Frame(frame))
frame_tx.try_send(frame)
{
let prev = dropped.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 30 == 0 {
tracing::warn!("dropped {prev} frames total: encoder backlog");
}
}
// 无论是否成功发送帧,都必须将 buffer 重新入队
// PipeWire 会复用这些 buffer不入队会导致 buffer 泄漏
unsafe { stream.queue_raw_buffer(raw_buf) };
}
})
@@ -558,21 +531,11 @@ fn pipewire_thread(ctx: PwThreadCtx) {
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
&mut params,
) {
let _ = frame_tx.send(PwEvent::Error(format!("stream.connect failed: {e}")));
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}")));
return;
}
let loop_ = mainloop.loop_();
// 注册信号处理(空回调),阻止 SIGINT/SIGTERM 默认行为终止线程
// 真正的退出通过 shutdown eventfd 控制
loop_.add_signal_local(
pw::loop_::Signal::SIGINT,
Box::new(|| {}),
);
loop_.add_signal_local(
pw::loop_::Signal::SIGTERM,
Box::new(|| {}),
);
// Register the shutdown eventfd on the PipeWire loop.
//