use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::thread::{self, JoinHandle}; use anyhow::Result; use crossbeam_channel::{Receiver, Sender, bounded}; use tokio::runtime::Runtime; use crate::args::Args; pub struct PwDmaBufFrame { pub fd: OwnedFd, pub offset: u64, pub stride: u32, pub modifier: u64, pub width: u32, pub height: u32, pub format: u32, pub pts: i64, } pub enum PwEvent { Frame(PwDmaBufFrame), StreamEnded, Error(String), } pub struct CapPortal { shutdown_fd: OwnedFd, frame_rx: Receiver, pw_thread: Option>, rt: Runtime, } struct PwThreadCtx { frame_tx: Sender, shutdown_read: OwnedFd, pw_fd: OwnedFd, node_id: u32, fps: u32, } impl CapPortal { pub fn new(args: &Args) -> Result { let rt = Runtime::new()?; let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal().await })?; let (frame_tx, frame_rx) = bounded(3); // Create eventfd pair for thread-safe shutdown signaling. // The write end lives in CapPortal (main thread), the read end is // registered on the PipeWire loop so quit() happens on the loop thread // where mainloop is guaranteed alive. let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; if efd < 0 { return Err(anyhow::anyhow!( "eventfd failed: {}", std::io::Error::last_os_error() )); } let write_fd = unsafe { libc::dup(efd) }; if write_fd < 0 { let err = std::io::Error::last_os_error(); unsafe { libc::close(efd) }; return Err(anyhow::anyhow!("dup eventfd failed: {err}")); } let ctx = PwThreadCtx { frame_tx, shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) }, pw_fd, node_id, fps: args.fps, }; let pw_thread = thread::Builder::new() .name("pipewire-capture".into()) .spawn(move || { pipewire_thread(ctx); })?; Ok(Self { shutdown_fd: unsafe { OwnedFd::from_raw_fd(write_fd) }, frame_rx, pw_thread: Some(pw_thread), rt, }) } pub fn frame_receiver(&self) -> &Receiver { &self.frame_rx } async fn setup_portal() -> Result<(OwnedFd, u32)> { use ashpd::desktop::screencast::{ CursorMode, Screencast, SelectSourcesOptions, SourceType, }; use ashpd::desktop::PersistMode; let proxy = Screencast::new().await.map_err(|e| { anyhow::anyhow!("Failed to create Screencast proxy: {e}") })?; let session = proxy .create_session(Default::default()) .await .map_err(|e| anyhow::anyhow!("Failed to create ScreenCast session: {e}"))?; proxy .select_sources( &session, SelectSourcesOptions::default() .set_cursor_mode(CursorMode::Embedded) .set_sources(ashpd::enumflags2::BitFlags::from(SourceType::Monitor)) .set_multiple(false) .set_persist_mode(PersistMode::DoNot), ) .await .map_err(|e| { anyhow::anyhow!("屏幕共享权限被拒绝 / Screen sharing permission denied: {e}") })?; let response = proxy .start(&session, None, Default::default()) .await .map_err(|e| anyhow::anyhow!("ScreenCast start failed: {e}"))? .response() .map_err(|e| anyhow::anyhow!("ScreenCast response error: {e}"))?; let stream = response .streams() .first() .ok_or_else(|| anyhow::anyhow!("No streams returned from ScreenCast"))?; let node_id = stream.pipe_wire_node_id(); let fd = proxy .open_pipe_wire_remote(&session, Default::default()) .await .map_err(|e| anyhow::anyhow!("Failed to open PipeWire remote: {e}"))?; tracing::info!("Portal session established: node_id={node_id}"); Ok((fd, node_id)) } } impl Drop for CapPortal { fn drop(&mut self) { // Signal the PipeWire loop to quit via eventfd. // eventfd write is a kernel syscall — thread-safe and lock-free. let val: u64 = 1u64; let _ = unsafe { libc::write( self.shutdown_fd.as_raw_fd(), &val as *const u64 as *const _, std::mem::size_of::(), ) }; if let Some(handle) = self.pw_thread.take() { let _ = handle.join(); } } } fn pipewire_thread(ctx: PwThreadCtx) { use pipewire as pw; use pw::properties::properties; use pw::stream::{StreamBox, StreamFlags}; use std::cell::Cell; use std::rc::Rc; use pw::spa::param::video::VideoInfoRaw; pw::init(); let PwThreadCtx { frame_tx, shutdown_read, pw_fd, node_id, fps: _fps, } = ctx; let mainloop = match pw::main_loop::MainLoopBox::new(None) { Ok(ml) => ml, Err(e) => { let _ = frame_tx.send(PwEvent::Error(format!("MainLoop::new failed: {e}"))); return; } }; let context = match pw::context::ContextBox::new(mainloop.loop_(), None) { Ok(c) => c, Err(e) => { let _ = frame_tx.send(PwEvent::Error(format!("Context::new failed: {e}"))); return; } }; let core = match context.connect_fd(pw_fd, None) { Ok(c) => c, Err(e) => { let _ = frame_tx.send(PwEvent::Error(format!( "connect_fd failed: {e}" ))); return; } }; let stream = match StreamBox::new( &core, "wl-webrtc", properties! { *pw::keys::MEDIA_TYPE => "Video", *pw::keys::MEDIA_CATEGORY => "Capture", *pw::keys::MEDIA_ROLE => "Screen", }, ) { Ok(s) => s, Err(e) => { let _ = frame_tx.send(PwEvent::Error(format!("Stream::new failed: {e}"))); return; } }; // Shared format state: (width, height, drm_fourcc, modifier) let format_info: Rc>> = Rc::new(Cell::new(None)); let frame_tx_clone = frame_tx.clone(); let _listener = stream .add_local_listener::<()>() .state_changed(move |_, _, old, new| { tracing::debug!("PipeWire stream state: {old:?} -> {new:?}"); match new { pw::stream::StreamState::Error(_) | pw::stream::StreamState::Unconnected => { let _ = frame_tx_clone.send(PwEvent::StreamEnded); } _ => {} } }) .param_changed({ let format_info = format_info.clone(); move |_, _, id, param| { let Some(param) = param else { return }; if id != pw::spa::param::ParamType::Format.as_raw() { return; } let mut info = VideoInfoRaw::new(); if let Err(e) = info.parse(param) { tracing::warn!("Failed to parse video format: {e}"); return; } let width = info.size().width; let height = info.size().height; let drm_format = spa_to_drm_fourcc(info.format()); let modifier = info.modifier(); format_info.set(Some((width, height, drm_format, modifier))); tracing::info!( "PipeWire format negotiated: {width}x{height}, \ drm_format={drm_format:#010x}, modifier={modifier:#x}" ); } }) .process({ let format_info = format_info.clone(); let frame_tx = frame_tx.clone(); move |stream, _| { let raw_buf = unsafe { stream.dequeue_raw_buffer() }; if raw_buf.is_null() { return; } let spa_buf = unsafe { (*raw_buf).buffer }; if spa_buf.is_null() { unsafe { stream.queue_raw_buffer(raw_buf) }; return; } let n_datas = unsafe { (*spa_buf).n_datas }; let datas_ptr = unsafe { (*spa_buf).datas }; if n_datas == 0 || datas_ptr.is_null() { unsafe { stream.queue_raw_buffer(raw_buf) }; return; } // Access first data item through libspa Data wrapper let data_ref: &pw::spa::buffer::Data = unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) }; let fd = data_ref.fd(); if fd < 0 { unsafe { stream.queue_raw_buffer(raw_buf) }; return; } let chunk = data_ref.chunk(); let offset = chunk.offset() as u64; let stride = chunk.stride() as u32; // Get PTS from SPA_META_Header metadata let pts: i64 = unsafe { let mut pts_val: i64 = 0; let n_metas = (*spa_buf).n_metas; let metas = (*spa_buf).metas; if !metas.is_null() { for i in 0..n_metas { let meta = &*metas.add(i as usize); if meta.type_ == libspa::sys::SPA_META_Header && meta.size as usize >= std::mem::size_of::() && !meta.data.is_null() { let header = &*(meta.data as *const libspa::sys::spa_meta_header); pts_val = header.pts; break; } } } pts_val }; let dup_fd = unsafe { libc::dup(fd) }; if dup_fd < 0 { unsafe { stream.queue_raw_buffer(raw_buf) }; return; } let (width, height, format, modifier) = format_info.get().unwrap_or((0, 0, 0, 0)); let frame = PwDmaBufFrame { fd: unsafe { OwnedFd::from_raw_fd(dup_fd) }, offset, stride, modifier, width, height, format, pts, }; let _ = frame_tx.send(PwEvent::Frame(frame)); unsafe { stream.queue_raw_buffer(raw_buf) }; } }) .register(); let mut params: [&pw::spa::pod::Pod; 0] = []; if let Err(e) = stream.connect( pw::spa::utils::Direction::Input, Some(node_id), StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, &mut params, ) { let _ = frame_tx.send(PwEvent::Error(format!("stream.connect failed: {e}"))); return; } let loop_ = mainloop.loop_(); loop_.add_signal_local( pw::loop_::Signal::SIGINT, Box::new(|| {}), ); loop_.add_signal_local( pw::loop_::Signal::SIGTERM, Box::new(|| {}), ); // Register the shutdown eventfd on the PipeWire loop. // // When CapPortal::drop writes to the eventfd, the loop wakes up and // dispatches this callback on the loop thread. Because the callback // only fires while mainloop.run() is blocking this thread, mainloop // is guaranteed alive — eliminating the UAF that existed with the // previous detached helper thread approach. let mainloop_ptr = mainloop.as_raw_ptr(); let _shutdown_source = loop_.add_io( shutdown_read, libspa::support::system::IoFlags::IN, move |fd| { // Drain the eventfd so it doesn't re-trigger let mut buf: u64 = 0; let _ = unsafe { libc::read( fd.as_raw_fd(), &mut buf as *mut u64 as *mut _, std::mem::size_of::(), ) }; // SAFETY: This callback only executes while mainloop.run() is // blocking this thread, so mainloop is guaranteed alive. unsafe { pipewire::sys::pw_main_loop_quit(mainloop_ptr) }; }, ); mainloop.run(); // run() returned — _shutdown_source drops first (reverse declaration order), // which unregisters the callback from the loop. Then mainloop drops. // No dangling raw pointers are possible. // SAFETY: pipewire has been initialized with pw::init() above and all // PipeWire resources (mainloop, stream) have been dropped. unsafe { pw::deinit() }; } const fn fourcc(a: u8, b: u8, c: u8, d: u8) -> u32 { (a as u32) | ((b as u32) << 8) | ((c as u32) << 16) | ((d as u32) << 24) } fn spa_to_drm_fourcc(format: libspa::param::video::VideoFormat) -> u32 { use libspa::param::video::VideoFormat; match format { VideoFormat::BGRA => fourcc(b'B', b'G', b'R', b'A'), VideoFormat::BGRx => fourcc(b'B', b'G', b'R', b'X'), VideoFormat::RGBA => fourcc(b'R', b'G', b'B', b'A'), VideoFormat::RGBx => fourcc(b'R', b'G', b'B', b'X'), VideoFormat::ARGB => fourcc(b'A', b'R', b'2', b'4'), VideoFormat::xRGB => fourcc(b'X', b'R', b'2', b'4'), VideoFormat::ABGR => fourcc(b'A', b'B', b'2', b'4'), VideoFormat::xBGR => fourcc(b'X', b'B', b'2', b'4'), _ => 0, } } #[cfg(test)] mod tests { use super::*; #[test] fn spa_to_drm_fourcc_bgra() { use libspa::param::video::VideoFormat; assert_eq!(spa_to_drm_fourcc(VideoFormat::BGRA), fourcc(b'B', b'G', b'R', b'A')); } #[test] fn spa_to_drm_fourcc_unsupported() { use libspa::param::video::VideoFormat; assert_eq!(spa_to_drm_fourcc(VideoFormat::NV12), 0); } #[test] fn fourcc_values() { assert_eq!(fourcc(b'B', b'G', b'R', b'A'), 0x41524742); assert_eq!(fourcc(b'R', b'G', b'B', b'A'), 0x41424752); } }