diff --git a/src/cap_portal.rs b/src/cap_portal.rs index 065d719..cb1495c 100644 --- a/src/cap_portal.rs +++ b/src/cap_portal.rs @@ -45,13 +45,11 @@ pub struct PwDmaBufFrame { pub pts: i64, } -/// PipeWire 事件枚举 +/// PipeWire 控制事件枚举 /// -/// 从 PipeWire 捕获线程发送给消费者的事件类型。 -/// 消费者通过 frame_receiver() 获取的 Receiver 接收这些事件。 -pub enum PwEvent { - /// 收到一帧新的 DMA-BUF 视频帧 - Frame(PwDmaBufFrame), +/// 从 PipeWire 捕获线程发送给消费者的控制事件。 +/// 与帧数据分离,通过独立的 channel 传输,确保控制事件不被帧数据淹没。 +pub enum PwCtrlEvent { /// 流已结束(PipeWire 流断开连接或进入错误状态) StreamEnded, /// 发生错误,包含错误描述信息 @@ -68,13 +66,10 @@ pub enum PwEvent { /// 2. frame_receiver() — 获取帧接收端,供消费者轮询 /// 3. Drop — 通过 eventfd 通知 PipeWire 线程安全退出 pub struct CapPortal { - /// eventfd 的写入端,用于在 drop 时通知 PipeWire 线程退出 shutdown_fd: OwnedFd, - /// 帧事件接收端,消费者通过此 Receiver 获取帧数据 - frame_rx: Receiver, - /// PipeWire 捕获线程的 JoinHandle,drop 时等待线程退出 + frame_rx: Receiver, + event_rx: Receiver, pw_thread: Option>, - /// Tokio 运行时,仅用于 setup_portal() 中的异步 Portal 调用 rt: Runtime, } @@ -83,11 +78,9 @@ pub struct CapPortal { /// 从主线程传递给 PipeWire 捕获线程的所有必要资源。 /// 该结构体在线程创建时一次性 move 到线程中使用。 struct PwThreadCtx { - /// 帧事件发送端,用于向消费者线程发送帧数据或错误/结束事件 - frame_tx: Sender, - /// 已丢弃帧的计数器(原子操作),用于统计因通道满而丢弃的帧数 + frame_tx: Sender, + event_tx: Sender, dropped: AtomicU64, - /// eventfd 的读取端,注册到 PipeWire 事件循环中,用于接收关闭信号 shutdown_read: OwnedFd, /// Portal 返回的 PipeWire 远程连接文件描述符 pw_fd: OwnedFd, @@ -116,14 +109,9 @@ impl CapPortal { Self::setup_portal().await })?; - // 创建有界通道,容量为 3 帧 - // 使用有界通道实现背压:当消费者处理不过来时,生产者会丢弃帧而非无限堆积 let (frame_tx, frame_rx) = bounded(3); + let (event_tx, event_rx) = bounded(8); - // 创建 eventfd 对,用于线程安全的关闭信号传递 - // eventfd 是 Linux 内核提供的轻量级进程/线程间通知机制 - // 写入端保存在 CapPortal(主线程),读取端注册到 PipeWire 事件循环中 - // 这样 CapPortal drop 时可以安全地通知 PipeWire 线程退出 let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; if efd < 0 { return Err(anyhow::anyhow!( @@ -131,8 +119,6 @@ impl CapPortal { std::io::Error::last_os_error() )); } - // 复制 eventfd 得到写入端,原始 fd 作为读取端 - // 需要 dup 是因为读取端和写入端需要各自独立的 OwnedFd 所有权 let write_fd = unsafe { libc::dup(efd) }; if write_fd < 0 { let err = std::io::Error::last_os_error(); @@ -140,9 +126,9 @@ impl CapPortal { return Err(anyhow::anyhow!("dup eventfd failed: {err}")); } - // 构建 PipeWire 线程上下文,将所有必要资源 move 进去 let ctx = PwThreadCtx { frame_tx, + event_tx, dropped: AtomicU64::new(0), shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) }, pw_fd, @@ -150,28 +136,33 @@ impl CapPortal { fps: args.fps, }; - // 启动 PipeWire 捕获线程,命名便于调试和性能分析 let pw_thread = thread::Builder::new() .name("pipewire-capture".into()) .spawn(move || { pipewire_thread(ctx); + }) + .map_err(|e| { + unsafe { libc::close(write_fd) }; + anyhow::anyhow!("thread spawn failed: {e}") })?; Ok(Self { shutdown_fd: unsafe { OwnedFd::from_raw_fd(write_fd) }, frame_rx, + event_rx, pw_thread: Some(pw_thread), rt, }) } - /// 获取帧事件接收端的引用 - /// - /// 消费者通过此方法获取 Receiver,然后不断接收 PwEvent 事件来获取帧数据。 - pub fn frame_receiver(&self) -> &Receiver { + pub fn frame_receiver(&self) -> &Receiver { &self.frame_rx } + pub fn event_receiver(&self) -> &Receiver { + &self.event_rx + } + /// 通过 XDG Desktop Portal 建立屏幕录制会话 /// /// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。 @@ -309,10 +300,9 @@ fn pipewire_thread(ctx: PwThreadCtx) { // 不调用 pw::deinit()——进程退出时全局状态由 OS 回收。 pw::init(); - // 解构上下文,取出所有必要资源 - // fps 重命名为 _fps 表示当前未使用(保留供将来帧率控制使用) let PwThreadCtx { frame_tx, + event_tx, dropped, shutdown_read, pw_fd, @@ -320,31 +310,26 @@ fn pipewire_thread(ctx: PwThreadCtx) { fps: _fps, } = ctx; - // 创建 PipeWire MainLoop(主事件循环) - // MainLoopBox 是栈分配的 PipeWire 主循环封装 let mainloop = match pw::main_loop::MainLoopBox::new(None) { Ok(ml) => ml, Err(e) => { - let _ = frame_tx.send(PwEvent::Error(format!("MainLoop::new failed: {e}"))); + let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))); return; } }; - // 创建 PipeWire Context,用于管理核心对象和协议处理 let context = match pw::context::ContextBox::new(mainloop.loop_(), None) { Ok(c) => c, Err(e) => { - let _ = frame_tx.send(PwEvent::Error(format!("Context::new failed: {e}"))); + let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))); return; } }; - // 使用 Portal 提供的 fd 连接到 PipeWire 核心守护进程 - // connect_fd 接管该 fd 的所有权(通过 dup),不关闭原始 fd let core = match context.connect_fd(pw_fd, None) { Ok(c) => c, Err(e) => { - let _ = frame_tx.send(PwEvent::Error(format!( + let _ = event_tx.try_send(PwCtrlEvent::Error(format!( "connect_fd failed: {e}" ))); return; @@ -367,33 +352,23 @@ fn pipewire_thread(ctx: PwThreadCtx) { ) { Ok(s) => s, Err(e) => { - let _ = frame_tx.send(PwEvent::Error(format!("Stream::new failed: {e}"))); + let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))); return; } }; - // 共享的格式状态: (宽度, 高度, DRM FourCC 格式, 修饰符) - // 使用 Rc> 因为 PipeWire 回调在同一个线程内执行,无需跨线程同步 - // Cell> 允许在不可变引用中修改值(内部可变性) - // format_info 在 param_changed 回调中设置,在 process 回调中读取 let format_info: Rc>> = Rc::new(Cell::new(None)); - let frame_tx_clone = frame_tx.clone(); - // 注册流事件监听器,包含三个回调: - // - state_changed: 流状态变化通知 - // - param_changed: 格式协商完成通知 - // - process: 每帧数据处理 + let event_tx_state = event_tx.clone(); let _listener = stream .add_local_listener::<()>() - // 流状态变化回调 - // 当流进入 Error 或 Unconnected 状态时,通知消费者流已结束 .state_changed(move |_, _, old, new| { tracing::debug!("PipeWire stream state: {old:?} -> {new:?}"); match new { pw::stream::StreamState::Error(_) | pw::stream::StreamState::Unconnected => { - let _ = frame_tx_clone.send(PwEvent::StreamEnded); + let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded); } _ => {} } @@ -468,7 +443,10 @@ fn pipewire_thread(ctx: PwThreadCtx) { return; } - // 获取 chunk 信息,包含帧数据在 DMA-BUF 中的偏移量和行跨度 + if data_ref.as_raw().chunk.is_null() { + unsafe { stream.queue_raw_buffer(raw_buf) }; + return; + } let chunk = data_ref.chunk(); let offset = chunk.offset() as u64; let stride = chunk.stride() as u32; @@ -527,19 +505,14 @@ fn pipewire_thread(ctx: PwThreadCtx) { pts, }; - // 尝试非阻塞发送帧到通道 - // 如果通道已满(消费者处理不过来),丢弃该帧并增加丢弃计数 - // 每 30 帧丢弃时输出一条警告日志,避免日志洪泛 if let Err(crossbeam_channel::TrySendError::Full(_)) = - frame_tx.try_send(PwEvent::Frame(frame)) + frame_tx.try_send(frame) { let prev = dropped.fetch_add(1, Ordering::Relaxed); if prev > 0 && prev % 30 == 0 { tracing::warn!("dropped {prev} frames total: encoder backlog"); } } - // 无论是否成功发送帧,都必须将 buffer 重新入队 - // PipeWire 会复用这些 buffer,不入队会导致 buffer 泄漏 unsafe { stream.queue_raw_buffer(raw_buf) }; } }) @@ -558,21 +531,11 @@ fn pipewire_thread(ctx: PwThreadCtx) { StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, &mut params, ) { - let _ = frame_tx.send(PwEvent::Error(format!("stream.connect failed: {e}"))); + let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))); return; } let loop_ = mainloop.loop_(); - // 注册信号处理(空回调),阻止 SIGINT/SIGTERM 默认行为终止线程 - // 真正的退出通过 shutdown eventfd 控制 - loop_.add_signal_local( - pw::loop_::Signal::SIGINT, - Box::new(|| {}), - ); - loop_.add_signal_local( - pw::loop_::Signal::SIGTERM, - Box::new(|| {}), - ); // Register the shutdown eventfd on the PipeWire loop. //