Compare commits

..

10 Commits

Author SHA1 Message Date
dailz
b0ed6548a6 feat: add WebRTC streaming via str0m + portal session persistence
- Add src/webrtc.rs: HTTP signaling server + str0m Sans-IO WebRTC transport
  with H.264 Annex-B → RTP packetization and key-frame request handling
- avhw: introduce FrameOutput enum (Muxer | Channel) so SwEncState can
  output to either MP4 muxer or crossbeam channel for WebRTC
- cap_portal: support portal session restore tokens (PersistMode::ExplicitlyRevoked)
  to skip re-authorization dialog; add --no-persist flag to force fresh dialog
- args: make --output optional when --port is used for WebRTC mode
- state_portal: integrate WebRTC pipeline (encoder channel → RTP forwarding)
  with shorter GOP for WebRTC (fps/2, min 10)
- main: redirect tracing to stderr; validate --output or --port required
- Add dependencies: str0m 0.20, serde_json 1, dirs 6
2026-06-04 20:54:16 +08:00
dailz
74f4dc826d perf(portal): achieve 58-60fps PipeWire screen capture
- Force PipeWire quantum=512 via NODE_FORCE_QUANTUM (48000/512=93Hz scheduling)
- Switch to libx264 ultrafast/zerolatency with 6 threads
- Use two-phase poll_and_encode: blocking recv_timeout for first frame,
  non-blocking try_recv drain for subsequent frames
- Remove fps_limit from portal path (PW already rate-limits via quantum/KWin;
  fps_limit's min_interval was silently dropping ~10% of valid frames)
- Remove diagnostic instrumentation (TIMING/PIPEWIRE logs, timing fields,
  pw_stats counters)
- Add lightweight production stats: per-10s fps log + shutdown summary
- Prefer libx264 over libopenh264 (better quality at same speed)
2026-05-30 08:44:15 +08:00
dailz
a83d146ed3 fix: FPS limiter never passes frames when input > target rate
The old FpsLimit compared timestamps between CONSECUTIVE frames.
When PipeWire delivers at 60fps (16ms intervals) and target is 30fps
(33ms min_interval), the gap between consecutive frames is always
16ms < 33ms, so EVERY frame was rejected after the first.

Fix: track last_output_time and compare against that instead of the
previous frame's timestamp. Now frames pass when enough time has
elapsed since the last OUTPUT, not since the last INPUT.

Also adds PipeWire process callback counter logging and frame
diagnostic STATS in state_portal.rs for debugging.
2026-05-29 22:09:35 +08:00
dailz
d80b34f44f feat: GPU-downscale + software H.264 encode pipeline (WIP)
Add SwEncState in avhw.rs: GPU pipeline using scale_vaapi to downscale
4K BGRA -> 2K NV12 on AMD iGPU, then software encode with libopenh264.

- import_dma_buf_to_vaapi: av_hwframe_map based DMA-BUF import
- SwEncState: GPU filter graph (scale_vaapi) + NV12->YUV420P + libopenh264
- state_portal.rs: integrated SwEncState, auto DRM device detection
- vaapi_import_bench.rs: CPU vs GPU pipeline benchmark
- sw_encode_bench.rs: software encode benchmark

Benchmark results: GPU pipeline ~91 FPS theoretical (10.95ms/frame)
vs CPU pipeline ~33 FPS (30.21ms/frame).

Known issue: only 1 frame encoded in production recording,
diagnostic STATS logging added to debug frame flow.
2026-05-29 22:04:12 +08:00
dailz
55abb5e56d fix(backend_detect): use raw zbus for portal check to avoid OnceLock connection poisoning
ashpd caches zbus::Connection in a global OnceLock. When check_portal_available()
created a Screencast proxy, the connection was cached there. When the function
returned and its tokio Runtime dropped, the cached connection became dead.
Subsequent setup_portal() calls reused this dead connection and hung forever.

Fix: replace ashpd Screencast proxy with direct zbus D-Bus interface check,
which does not touch the ashpd global connection cache.

Add examples/test_portal.rs for minimal Portal ScreenCast testing.
2026-05-27 22:07:11 +08:00
dailz
715a9c0bab refactor(cap_portal): split PwEvent into separate ctrl/frame channels
- Rename PwEvent to PwCtrlEvent, separate frame data into its own channel
- Add null chunk check to prevent crash on malformed PipeWire buffer
- Remove redundant inline comments and signal handlers
- Use try_send for error events to avoid blocking on full channel
2026-05-27 09:25:00 +08:00
dailz
60a55c17f2 fix(state_portal): add Drop impl, null dangling pointers, extract compute_pts, add tests
- Add Drop impl for StatePortal to flush encoder on drop (bug #2)
- Use enc.take() in shutdown() to prevent double-flush of write_trailer
- Null out data[0] after Box::from_raw recovery to avoid dangling pointer
- Extract compute_pts() for testable PTS calculation
- Add 8 tests: PTS calculation, DRM device resolution, descriptor building
2026-05-27 09:22:59 +08:00
dailz
5100d78aa8 fix: resolve SHM hang, DRM device mismatch, and duplicate VAAPI context
BUG-2 (HIGH): SHM Buffer event caused permanent hang
  In the ZwlrScreencopyFrameV1 dispatcher, receiving a SHM Buffer event
  left in_flight_surface stuck at AllocQueued forever, preventing
  queue_alloc_frame() from requesting new frames.
  Fix: treat Buffer as a metadata offer (v3 protocol), wait for
  BufferDone to decide failure, and add AllocQueued state guard to
  LinuxDmabuf handler.

BUG-3 (MEDIUM): Portal backend picked wrong GPU on multi-GPU systems
  state_portal.rs hardcoded /dev/dri/renderD128 then renderD129, which
  selects the wrong GPU when PipeWire uses a different device.
  Fix: extract find_drm_render_nodes() as shared utility; defer DRM
  device selection to first PipeWire frame; test each candidate with
  av_hwframe_transfer_data to find the GPU that can actually import
  the DMA-BUF frame.

BUG-4 (LOW): VAAPI device context created twice unnecessarily
  try_finalize_output() created an AvHwDevCtx stored in EverythingButFmt,
  but negotiate_format() discarded it (_hw_device_ctx) and EncState::new
  created a new one.
  Fix: thread the existing hw_device_ctx through negotiate_format() and
  create_encoder() to EncState::new() which reuses it when provided.
2026-05-25 14:32:58 +08:00
dailz
460a3ee711 fix(cap_portal): remove unsafe pw::deinit() to prevent global state corruption
pw::init() is guarded by an internal OnceCell (process-global one-shot).
pw::deinit() is unsafe and requires 'only called once per process lifetime
after all PipeWire use has permanently stopped'. Since CapPortal can be
created/destroyed multiple times, calling deinit() from a function-local
scope would prevent re-initialization (OnceCell already consumed) and
violate the unsafe contract.

The 5 early-return error paths in pipewire_thread() that previously
leaked global state are now consistent with the success path — neither
calls pw::deinit(). Process exit reclaims global PipeWire state.
2026-05-25 14:32:19 +08:00
dailz
b8026981d2 feat(examples): add Wayland globals lister utility
Minimal example that connects to the Wayland compositor and prints
all advertised globals (interface name, ID, version).
2026-05-25 08:56:55 +08:00
16 changed files with 4432 additions and 529 deletions

795
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,7 +20,11 @@ drm = "0.12"
drm-fourcc = "2" drm-fourcc = "2"
libc = "0.2" libc = "0.2"
ashpd = { version = "0.13", features = ["tokio", "screencast"] } ashpd = { version = "0.13", features = ["tokio", "screencast"] }
zbus = { version = "5", default-features = false, features = ["tokio"] }
tokio = { version = "1", features = ["rt"] } tokio = { version = "1", features = ["rt"] }
pipewire = "0.9" pipewire = { version = "0.9", features = ["v0_3_45"] }
libspa = "0.9" libspa = "0.9"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
str0m = "0.20"
serde_json = "1"
dirs = "6"

26
examples/list_globals.rs Normal file
View File

@@ -0,0 +1,26 @@
use wayland_client::globals::registry_queue_init;
use wayland_client::globals::GlobalListContents;
use wayland_client::protocol::wl_registry::{Event, WlRegistry};
use wayland_client::{Connection, Dispatch, QueueHandle};
struct Ls;
impl Dispatch<WlRegistry, GlobalListContents> for Ls {
fn event(
_state: &mut Self,
_registry: &WlRegistry,
_event: Event,
_data: &GlobalListContents,
_conn: &Connection,
_qhandle: &QueueHandle<Self>,
) {
}
}
fn main() {
let conn = Connection::connect_to_env().unwrap();
let (globals, _queue) = registry_queue_init::<Ls>(&conn).unwrap();
for g in globals.contents().clone_list() {
println!("{}: {} v{}", g.name, g.interface, g.version);
}
}

68
examples/test_portal.rs Normal file
View File

@@ -0,0 +1,68 @@
use ashpd::desktop::screencast::{CursorMode, Screencast, SelectSourcesOptions, SourceType};
use ashpd::desktop::PersistMode;
use ashpd::enumflags2::BitFlags;
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
eprintln!("1. Creating Screencast proxy...");
let proxy = match Screencast::new().await {
Ok(p) => {
eprintln!(" OK");
p
}
Err(e) => {
eprintln!(" FAIL: {e}");
return;
}
};
eprintln!("2. Creating session...");
let session = match proxy.create_session(Default::default()).await {
Ok(s) => {
eprintln!(" OK");
s
}
Err(e) => {
eprintln!(" FAIL: {e}");
return;
}
};
eprintln!("3. Selecting sources...");
let sources: BitFlags<SourceType> = SourceType::Monitor.into();
let result = proxy
.select_sources(
&session,
SelectSourcesOptions::default()
.set_cursor_mode(CursorMode::Embedded)
.set_sources(sources)
.set_multiple(false)
.set_persist_mode(PersistMode::DoNot),
)
.await;
match result {
Ok(_) => eprintln!(" OK"),
Err(e) => {
eprintln!(" FAIL: {e}");
return;
}
}
eprintln!("4. Starting (should show dialog)...");
let response = match proxy.start(&session, None, Default::default()).await {
Ok(r) => {
eprintln!(" OK");
r
}
Err(e) => {
eprintln!(" FAIL: {e}");
return;
}
};
match response.response() {
Ok(r) => eprintln!(" Got {} stream(s)", r.streams().len()),
Err(e) => eprintln!(" Response error: {e}"),
}
});
}

View File

@@ -3,9 +3,9 @@ use clap::Parser;
#[derive(Parser, Debug, Clone)] #[derive(Parser, Debug, Clone)]
#[command(name = "wl-webrtc", about = "Wayland screen capture and encoding tool")] #[command(name = "wl-webrtc", about = "Wayland screen capture and encoding tool")]
pub struct Args { pub struct Args {
/// Output file path (e.g., output.mp4, output.mkv) /// Output file path (e.g., output.mp4, output.mkv). Optional when using --port for WebRTC mode
#[arg(short, long)] #[arg(short, long)]
pub output: String, pub output: Option<String>,
/// Wayland output name to capture /// Wayland output name to capture
#[arg(long)] #[arg(long)]
@@ -43,7 +43,11 @@ pub struct Args {
#[arg(long)] #[arg(long)]
pub backend: Option<String>, pub backend: Option<String>,
/// Port for WebTransport server (Phase 2, unused in MVP) /// Port for WebRTC HTTP signaling server; 0 keeps MP4 file output mode
#[arg(long, default_value_t = 0)] #[arg(long, default_value_t = 0)]
pub port: u16, pub port: u16,
/// Force re-authorization dialog (ignore saved portal restore token)
#[arg(long)]
pub no_persist: bool,
} }

View File

@@ -1,4 +1,7 @@
use std::ffi::CString; use std::ffi::CString;
use std::mem;
use std::os::fd::{AsRawFd, RawFd};
use std::os::raw::c_void;
use std::path::Path; use std::path::Path;
use std::ptr; use std::ptr;
@@ -7,6 +10,7 @@ use ffmpeg_next as ff;
use ffmpeg_next::ffi; use ffmpeg_next::ffi;
use ffmpeg_next::packet::Mut as _; use ffmpeg_next::packet::Mut as _;
use crate::cap_portal::PwDmaBufFrame;
use crate::transform::{transpose_if_transform_transposed, Transform}; use crate::transform::{transpose_if_transform_transposed, Transform};
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -123,6 +127,128 @@ impl Drop for AvHwFrameCtx {
} }
} }
/// Test whether `drm_device` can import the PipeWire DMA-BUF frame via VAAPI.
pub fn test_dma_buf_import(drm_device: &Path, frame: &PwDmaBufFrame) -> Result<()> {
let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?;
let frames =
AvHwFrameCtx::for_capture(&hw_dev, frame.width, frame.height, ff::format::Pixel::BGRA)?;
// SAFETY: frames is a live VAAPI frames context; frame carries valid DMA-BUF metadata.
unsafe {
import_dma_buf_to_vaapi(
frames.as_ptr(),
frame.fd.as_raw_fd(),
frame.width,
frame.height,
frame.format,
frame.modifier,
frame.stride,
frame.offset,
)
}?;
Ok(())
}
/// Import a DMA-BUF into a VAAPI hardware frame via zero-copy `av_hwframe_map`.
///
/// # Safety
/// - `frames_ctx` must point to an initialized AVHWCramesContext for VAAPI
/// - `raw_fd` must be a valid DMA-BUF file descriptor
pub unsafe fn import_dma_buf_to_vaapi(
frames_ctx: *mut ffi::AVBufferRef,
raw_fd: RawFd,
width: u32,
height: u32,
drm_format: u32,
modifier: u64,
stride: u32,
offset: u64,
) -> Result<ff::frame::Video> {
let duped_fd = libc::dup(raw_fd);
if duped_fd < 0 {
bail!("dup(fd) failed: {}", std::io::Error::last_os_error());
}
let mut desc: ffi::AVDRMFrameDescriptor = mem::zeroed();
desc.nb_objects = 1;
desc.objects[0].fd = duped_fd;
desc.objects[0].size = (height as usize) * (stride as usize);
desc.objects[0].format_modifier = modifier;
desc.nb_layers = 1;
desc.layers[0].format = drm_format;
desc.layers[0].nb_planes = 1;
desc.layers[0].planes[0].object_index = 0;
desc.layers[0].planes[0].offset = offset as isize;
desc.layers[0].planes[0].pitch = stride as isize;
let desc_box = Box::new(desc);
let desc_ptr = Box::into_raw(desc_box);
let buf_ref = ffi::av_buffer_create(
desc_ptr as *mut u8,
std::mem::size_of::<ffi::AVDRMFrameDescriptor>(),
Some(cleanup_drm_descriptor),
ptr::null_mut(),
0,
);
if buf_ref.is_null() {
let desc_box = Box::from_raw(desc_ptr);
libc::close(desc_box.objects[0].fd);
bail!("av_buffer_create returned null for DRM descriptor");
}
let mut src = ff::frame::Video::empty();
{
let sp = src.as_mut_ptr();
(*sp).format = ffi::AVPixelFormat::AV_PIX_FMT_DRM_PRIME as i32;
(*sp).width = width as i32;
(*sp).height = height as i32;
(*sp).data[0] = (*buf_ref).data;
(*sp).buf[0] = buf_ref;
}
let mut dst = ff::frame::Video::empty();
unsafe {
let dp = dst.as_mut_ptr();
(*dp).format = ffi::AVPixelFormat::AV_PIX_FMT_VAAPI as i32;
(*dp).hw_frames_ctx = ffi::av_buffer_ref(frames_ctx);
if (*dp).hw_frames_ctx.is_null() {
bail!("av_buffer_ref(frames_ctx) returned null");
}
}
let ret = unsafe {
ffi::av_hwframe_map(
dst.as_mut_ptr(),
src.as_ptr(),
ffi::AV_HWFRAME_MAP_READ as i32,
)
};
if ret < 0 {
let err_str = av_err_to_string(ret);
bail!("av_hwframe_map failed: error {ret} ({err_str})");
}
Ok(dst)
}
unsafe extern "C" fn cleanup_drm_descriptor(_opaque: *mut c_void, data: *mut u8) {
let desc = data as *mut ffi::AVDRMFrameDescriptor;
if !desc.is_null() && (*desc).nb_objects > 0 && (*desc).objects[0].fd >= 0 {
libc::close((*desc).objects[0].fd);
}
let _ = Box::from_raw(data as *mut ffi::AVDRMFrameDescriptor);
}
fn av_err_to_string(err: i32) -> String {
let mut buf = vec![0u8; 128];
unsafe {
ffi::av_strerror(err, buf.as_mut_ptr() as *mut i8, buf.len());
}
String::from_utf8_lossy(&buf)
.trim_end_matches('\0')
.to_string()
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// EncState // EncState
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -139,8 +265,8 @@ pub struct EncState {
unsafe impl Send for EncState {} unsafe impl Send for EncState {}
#[allow(clippy::too_many_arguments)]
impl EncState { impl EncState {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
drm_device: &Path, drm_device: &Path,
output_path: &Path, output_path: &Path,
@@ -152,16 +278,19 @@ impl EncState {
gop_size: u32, gop_size: u32,
fps: u32, fps: u32,
transform: Transform, transform: Transform,
existing_hw_ctx: Option<AvHwDevCtx>,
) -> Result<Self> { ) -> Result<Self> {
tracing::info!( tracing::info!(
"EncState::new: {width}x{height} enc={enc_width}x{enc_height} transform={transform:?}" "EncState::new: {width}x{height} enc={enc_width}x{enc_height} transform={transform:?}"
); );
// 1. VAAPI device // 1. VAAPI device — reuse existing context if provided
let hw_device_ctx = AvHwDevCtx::new_vaapi(drm_device)?; let hw_device_ctx = match existing_hw_ctx {
Some(ctx) => ctx,
None => AvHwDevCtx::new_vaapi(drm_device)?,
};
// 2. Frame context for capture (XRGB/RGBZ)
let frames_rgb = let frames_rgb =
AvHwFrameCtx::for_capture(&hw_device_ctx, width, height, ff::format::Pixel::RGBZ)?; AvHwFrameCtx::for_capture(&hw_device_ctx, width, height, ff::format::Pixel::BGRA)?;
// 3. Filter graph — must be built BEFORE encoder config so we can derive // 3. Filter graph — must be built BEFORE encoder config so we can derive
// hw_frames_ctx from the buffersink output (correct surface pool dimensions). // hw_frames_ctx from the buffersink output (correct surface pool dimensions).
@@ -463,6 +592,309 @@ impl EncState {
} }
} }
// ---------------------------------------------------------------------------
// SwEncState - VAAPI GPU downscale + software H.264 encode
// ---------------------------------------------------------------------------
pub enum FrameOutput {
Muxer(ff::format::context::Output),
Channel(crossbeam_channel::Sender<Vec<u8>>),
}
pub struct SwEncState {
hw_dev: AvHwDevCtx,
frames_rgb: AvHwFrameCtx,
filter_graph: ff::filter::Graph,
sws_ctx: *mut ffi::SwsContext,
enc_video: ff::codec::encoder::video::Video,
output: Option<FrameOutput>,
yuv_frame: *mut ffi::AVFrame,
starting_timestamp: Option<i64>,
frames_written: bool,
}
unsafe impl Send for SwEncState {}
impl SwEncState {
#[allow(clippy::too_many_arguments)]
pub fn new(
drm_device: &Path,
output_path: &Path,
width: u32,
height: u32,
enc_width: u32,
enc_height: u32,
fps: u32,
bitrate: u64,
gop_size: u32,
) -> Result<Self> {
tracing::info!(
"SwEncState::new: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264"
);
let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?;
let frames_rgb =
AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?;
let filter_graph = build_swenc_filter_graph(
&hw_dev,
&frames_rgb,
width,
height,
enc_width,
enc_height,
fps,
)?;
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
let (enc_video, octx) =
create_software_h264_muxer(output_path, enc_width, enc_height, fps, bitrate, gop_size)?;
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
Ok(Self {
hw_dev,
frames_rgb,
filter_graph,
sws_ctx,
enc_video,
output: Some(FrameOutput::Muxer(octx)),
yuv_frame,
starting_timestamp: None,
frames_written: false,
})
}
#[allow(clippy::too_many_arguments)]
pub fn new_webrtc(
drm_device: &Path,
width: u32,
height: u32,
enc_width: u32,
enc_height: u32,
fps: u32,
bitrate: u64,
gop_size: u32,
tx: crossbeam_channel::Sender<Vec<u8>>,
) -> Result<Self> {
tracing::info!(
"SwEncState::new_webrtc: GPU downscale {width}x{height} BGRA -> {enc_width}x{enc_height} NV12, software H.264 -> WebRTC"
);
let hw_dev = AvHwDevCtx::new_vaapi(drm_device)?;
let frames_rgb =
AvHwFrameCtx::for_capture(&hw_dev, width, height, ff::format::Pixel::BGRA)?;
let filter_graph = build_swenc_filter_graph(
&hw_dev,
&frames_rgb,
width,
height,
enc_width,
enc_height,
fps,
)?;
let sws_ctx = create_nv12_to_yuv420p_sws(enc_width, enc_height)?;
let enc_video = create_software_h264_encoder(enc_width, enc_height, fps, bitrate, gop_size)?;
let yuv_frame = alloc_yuv420p_frame(enc_width, enc_height)?;
Ok(Self {
hw_dev,
frames_rgb,
filter_graph,
sws_ctx,
enc_video,
output: Some(FrameOutput::Channel(tx)),
yuv_frame,
starting_timestamp: None,
frames_written: false,
})
}
pub fn frames_rgb(&self) -> &AvHwFrameCtx {
&self.frames_rgb
}
pub fn encode_frame(&mut self, hw_frame: &ff::frame::Video) -> Result<()> {
let mut filter_src_ctx = self.filter_graph.get("in").unwrap();
let mut filter_src = filter_src_ctx.source();
let mut filter_sink_ctx = self.filter_graph.get("out").unwrap();
let mut filter_sink = filter_sink_ctx.sink();
filter_src
.add(hw_frame)
.map_err(|e| anyhow::anyhow!("software pipeline filter source add failed: {e}"))?;
loop {
let mut filtered = ff::frame::Video::empty();
match filter_sink.frame(&mut filtered) {
Ok(()) => {
if filtered.pts().is_none() {
filtered.set_pts(hw_frame.pts());
}
self.encode_filtered_frame(&filtered)?;
}
Err(ff::Error::Other { errno }) if errno == ffi::EAGAIN => break,
Err(e) => bail!("software pipeline filter sink get frame failed: {e}"),
}
}
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
let mut filter_src_ctx = self.filter_graph.get("in").unwrap();
let mut filter_src = filter_src_ctx.source();
let _ = filter_src.flush();
let mut filter_sink_ctx = self.filter_graph.get("out").unwrap();
let mut filter_sink = filter_sink_ctx.sink();
loop {
let mut filtered = ff::frame::Video::empty();
match filter_sink.frame(&mut filtered) {
Ok(()) => self.encode_filtered_frame(&filtered)?,
Err(_) => break,
}
}
unsafe {
let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), ptr::null());
if ret < 0 && ret != ffi::AVERROR_EOF {
bail!("software encoder flush send failed: error {ret}");
}
}
let start_ts = self.starting_timestamp.unwrap_or(0);
self.drain_encoder(start_ts)?;
if self.frames_written {
if let Some(FrameOutput::Muxer(ref mut octx)) = self.output {
octx.write_trailer()
.map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?;
}
}
Ok(())
}
fn encode_filtered_frame(&mut self, filtered: &ff::frame::Video) -> Result<()> {
let mut sw_nv12 = unsafe { ffi::av_frame_alloc() };
if sw_nv12.is_null() {
bail!("av_frame_alloc failed for NV12 transfer frame");
}
// SAFETY: sw_nv12 is an allocated destination frame; filtered is a valid VAAPI NV12
// surface produced by scale_vaapi at encoder dimensions.
let transfer_ret = unsafe { ffi::av_hwframe_transfer_data(sw_nv12, filtered.as_ptr(), 0) };
if transfer_ret < 0 {
// SAFETY: sw_nv12 was allocated above and has not been freed yet.
unsafe { ffi::av_frame_free(&mut sw_nv12) };
bail!(
"av_hwframe_transfer_data failed for GPU-downscaled frame: error {transfer_ret} ({})",
av_err_to_string(transfer_ret)
);
}
// SAFETY: yuv_frame is an owned reusable YUV420P frame at the same dimensions as sw_nv12;
// sws_ctx was created for NV12 -> YUV420P with no resize, so sws_scale only converts format.
unsafe {
let ret = ffi::av_frame_make_writable(self.yuv_frame);
if ret < 0 {
ffi::av_frame_free(&mut sw_nv12);
bail!("av_frame_make_writable failed: error {ret}");
}
ffi::sws_scale(
self.sws_ctx,
(*sw_nv12).data.as_ptr() as *const *const u8,
(*sw_nv12).linesize.as_ptr() as *const i32,
0,
(*sw_nv12).height,
(*self.yuv_frame).data.as_ptr() as *mut *mut u8,
(*self.yuv_frame).linesize.as_ptr() as *const i32,
);
ffi::av_frame_free(&mut sw_nv12);
}
let pts = filtered.pts().unwrap_or(0);
if self.starting_timestamp.is_none() {
self.starting_timestamp = Some(pts);
}
let start_ts = self.starting_timestamp.unwrap_or(0);
// SAFETY: yuv_frame is initialized, writable, and matches the opened encoder format.
unsafe {
(*self.yuv_frame).pts = pts;
let ret = ffi::avcodec_send_frame(self.enc_video.as_mut_ptr(), self.yuv_frame);
if ret < 0 {
bail!("avcodec_send_frame failed for software encoder: error {ret}");
}
}
self.drain_encoder(start_ts)
}
fn drain_encoder(&mut self, start_ts: i64) -> Result<()> {
loop {
let mut pkt = ff::Packet::empty();
// SAFETY: enc_video is an open encoder; pkt is writable packet storage.
let ret = unsafe {
ffi::avcodec_receive_packet(self.enc_video.as_mut_ptr(), pkt.as_mut_ptr())
};
if ret < 0 {
if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF {
break;
}
bail!("avcodec_receive_packet failed: error {ret}");
}
match self.output {
Some(FrameOutput::Muxer(ref mut octx)) => {
let enc_tb = self.enc_video.time_base();
let stream_tb = unsafe {
let streams = (*octx.as_ptr()).streams;
let st = *streams.add(0);
ff::Rational::from((*st).time_base)
};
pkt.rescale_ts(enc_tb, stream_tb);
if let Some(pts) = pkt.pts() {
pkt.set_pts(Some(pts - start_ts));
}
if let Some(dts) = pkt.dts() {
pkt.set_dts(Some(dts - start_ts));
}
pkt.set_stream(0);
pkt.write_interleaved(octx)
.map_err(|e| anyhow::anyhow!("Failed to write packet: {e}"))?;
self.frames_written = true;
}
Some(FrameOutput::Channel(ref tx)) => {
let data: &[u8] = unsafe {
std::slice::from_raw_parts(
(*pkt.as_mut_ptr()).data,
(*pkt.as_mut_ptr()).size as usize,
)
};
let _ = tx.send(data.to_vec());
}
None => {}
}
}
Ok(())
}
}
impl Drop for SwEncState {
fn drop(&mut self) {
if !self.sws_ctx.is_null() {
// SAFETY: sws_ctx is owned by this state and was returned by sws_getContext.
unsafe { ffi::sws_freeContext(self.sws_ctx) };
self.sws_ctx = ptr::null_mut();
}
if !self.yuv_frame.is_null() {
// SAFETY: yuv_frame is owned by this state and was allocated by av_frame_alloc.
unsafe { ffi::av_frame_free(&mut self.yuv_frame) };
}
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Shared encoder creation (used by both wlr-screencopy and portal paths) // Shared encoder creation (used by both wlr-screencopy and portal paths)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -482,12 +914,11 @@ pub fn create_encoder(
transform: Transform, transform: Transform,
bitrate: Option<u64>, bitrate: Option<u64>,
gop_size: Option<u32>, gop_size: Option<u32>,
existing_hw_ctx: Option<AvHwDevCtx>,
) -> Result<EncState> { ) -> Result<EncState> {
let (enc_w, enc_h) = let (enc_w, enc_h) = transpose_if_transform_transposed(transform, width as i32, height as i32);
transpose_if_transform_transposed(transform, width as i32, height as i32); let actual_bitrate =
let actual_bitrate = bitrate.unwrap_or_else(|| { bitrate.unwrap_or_else(|| 2 * (width as u64) * (height as u64) * (fps as u64) / 100);
2 * (width as u64) * (height as u64) * (fps as u64) / 100
});
let actual_gop_size = gop_size.unwrap_or(fps); let actual_gop_size = gop_size.unwrap_or(fps);
EncState::new( EncState::new(
drm_device, drm_device,
@@ -500,9 +931,302 @@ pub fn create_encoder(
actual_gop_size, actual_gop_size,
fps, fps,
transform, transform,
existing_hw_ctx,
) )
} }
// ---------------------------------------------------------------------------
// Software-encode GPU-downscale helpers
// ---------------------------------------------------------------------------
#[allow(clippy::too_many_arguments)]
fn build_swenc_filter_graph(
hw_dev: &AvHwDevCtx,
frames_rgb: &AvHwFrameCtx,
width: u32,
height: u32,
enc_width: u32,
enc_height: u32,
fps: u32,
) -> Result<ff::filter::Graph> {
let mut graph = ff::filter::Graph::new();
let buffersrc =
ff::filter::find("buffer").ok_or_else(|| anyhow::anyhow!("filter 'buffer' not found"))?;
let buffersink = ff::filter::find("buffersink")
.ok_or_else(|| anyhow::anyhow!("filter 'buffersink' not found"))?;
let scale_vaapi = ff::filter::find("scale_vaapi")
.ok_or_else(|| anyhow::anyhow!("filter 'scale_vaapi' not found"))?;
// FFmpeg 8.0+ rejects VAAPI pix_fmt in buffer args before hw_frames_ctx is attached.
// Use a SW placeholder, then override format/hw_frames_ctx with av_buffersrc_parameters_set.
let args = format!(
"video_size={}x{}:pix_fmt=bgra:time_base=1/{fps}:pixel_aspect=1/1",
width, height,
);
let mut src_ctx = graph.add(&buffersrc, "in", &args)?;
let par = unsafe { ffi::av_buffersrc_parameters_alloc() };
if par.is_null() {
bail!("av_buffersrc_parameters_alloc returned null");
}
// SAFETY: par and src_ctx are valid; frames_rgb.ref_clone returns an owned hw_frames_ctx ref
// that buffersrc consumes on successful parameter set.
unsafe {
(*par).format = Into::<ffi::AVPixelFormat>::into(ff::format::Pixel::VAAPI) as i32;
(*par).width = width as i32;
(*par).height = height as i32;
(*par).time_base = ffi::AVRational {
num: 1,
den: fps as i32,
};
(*par).hw_frames_ctx = frames_rgb.ref_clone();
let ret = ffi::av_buffersrc_parameters_set(src_ctx.as_mut_ptr(), par);
ffi::av_free(par as *mut _);
if ret < 0 {
bail!("av_buffersrc_parameters_set failed: error {ret}");
}
}
let mut scale_ctx = graph.add(
&scale_vaapi,
"scale",
&format!("{enc_width}:{enc_height}:format=nv12"),
)?;
// SAFETY: scale_vaapi keeps a ref-counted device context while the graph is alive.
unsafe {
(*scale_ctx.as_mut_ptr()).hw_device_ctx = hw_dev.ref_clone();
}
let mut sink_ctx = graph.add(&buffersink, "out", "")?;
src_ctx.link(0, &mut scale_ctx, 0);
scale_ctx.link(0, &mut sink_ctx, 0);
graph
.validate()
.map_err(|e| anyhow::anyhow!("software GPU filter graph validation failed: {e}"))?;
Ok(graph)
}
fn create_nv12_to_yuv420p_sws(width: u32, height: u32) -> Result<*mut ffi::SwsContext> {
// SAFETY: sws_getContext creates an owned scaler context for same-size NV12 -> YUV420P.
let ctx = unsafe {
ffi::sws_getContext(
width as i32,
height as i32,
ffi::AVPixelFormat::AV_PIX_FMT_NV12,
width as i32,
height as i32,
ffi::AVPixelFormat::AV_PIX_FMT_YUV420P,
2,
ptr::null_mut(),
ptr::null_mut(),
ptr::null_mut(),
)
};
if ctx.is_null() {
bail!("Failed to create NV12 -> YUV420P sws_scale context");
}
Ok(ctx)
}
fn alloc_yuv420p_frame(width: u32, height: u32) -> Result<*mut ffi::AVFrame> {
// SAFETY: Allocate an AVFrame, configure format/dimensions, then allocate writable buffers.
unsafe {
let mut frame = ffi::av_frame_alloc();
if frame.is_null() {
bail!("av_frame_alloc failed");
}
(*frame).width = width as i32;
(*frame).height = height as i32;
(*frame).format = ffi::AVPixelFormat::AV_PIX_FMT_YUV420P as i32;
let ret = ffi::av_frame_get_buffer(frame, 0);
if ret < 0 {
ffi::av_frame_free(&mut frame);
bail!("av_frame_get_buffer failed: error {ret}");
}
Ok(frame)
}
}
fn create_software_h264_muxer(
output_path: &Path,
width: u32,
height: u32,
fps: u32,
bitrate: u64,
gop_size: u32,
) -> Result<(
ff::codec::encoder::video::Video,
ff::format::context::Output,
)> {
let output_cstr = CString::new(output_path.to_str().unwrap())?;
let codec = ff::encoder::find_by_name("libx264")
.or_else(|| ff::encoder::find_by_name("libopenh264"))
.ok_or_else(|| {
anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)")
})?;
let codec_name = codec.name().to_string();
let mut enc = {
let ctx = ff::codec::Context::new_with_codec(codec);
ctx.encoder().video()?
};
enc.set_width(width);
enc.set_height(height);
enc.set_format(ff::format::Pixel::YUV420P);
enc.set_bit_rate(bitrate as usize);
enc.set_gop(gop_size);
enc.set_time_base(ff::Rational::new(1, fps as i32));
enc.set_max_b_frames(0);
// SAFETY: global headers are needed by MP4 and harmless for other common muxers.
unsafe {
(*enc.as_mut_ptr()).flags |= ffi::AV_CODEC_FLAG_GLOBAL_HEADER as i32;
}
if codec_name == "libx264" {
// SAFETY: priv_data belongs to the unopened encoder; strings live for each call.
unsafe {
let key = CString::new("preset").unwrap();
let val = CString::new("ultrafast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("threads").unwrap();
let val = CString::new("6").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
}
}
let opened = enc
.open()
.map_err(|e| anyhow::anyhow!("Failed to open {codec_name} encoder: {e}"))?;
let enc_video = opened.0;
let use_null = output_path
.to_str()
.map(|s| s.contains("null"))
.unwrap_or(false);
let fmt_name = if use_null {
CString::new("null").unwrap()
} else {
CString::new("").unwrap()
};
let fmt_name_ptr = if use_null {
fmt_name.as_ptr()
} else {
ptr::null()
};
let mut fmt_ctx_ptr: *mut ffi::AVFormatContext = ptr::null_mut();
// SAFETY: fmt_ctx_ptr is initialized by FFmpeg; C strings live across the call.
let ret = unsafe {
ffi::avformat_alloc_output_context2(
&mut fmt_ctx_ptr,
ptr::null_mut(),
fmt_name_ptr,
output_cstr.as_ptr(),
)
};
if ret < 0 || fmt_ctx_ptr.is_null() {
bail!("Failed to allocate output format context: error {ret}");
}
// SAFETY: fmt_ctx_ptr is valid; stream and codec parameters are owned by the format context.
let stream_ptr = unsafe { ffi::avformat_new_stream(fmt_ctx_ptr, ptr::null()) };
if stream_ptr.is_null() {
bail!("Failed to create output stream");
}
// SAFETY: stream_ptr and encoder context are valid; parameters are copied into stream.
let ret =
unsafe { ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr()) };
if ret < 0 {
bail!("Failed to copy codec parameters to stream: error {ret}");
}
// SAFETY: stream_ptr is valid and writable during muxer setup.
unsafe {
(*stream_ptr).time_base = (*enc_video.as_ptr()).time_base;
}
// SAFETY: open an AVIO only for muxers that require files; null muxer advertises NOFILE.
unsafe {
if (*(*fmt_ctx_ptr).oformat).flags & ffi::AVFMT_NOFILE == 0 {
let ret = ffi::avio_open(
&mut (*fmt_ctx_ptr).pb,
output_cstr.as_ptr(),
ffi::AVIO_FLAG_WRITE,
);
if ret < 0 {
bail!(
"Failed to open output file '{}': error {ret}",
output_path.display()
);
}
}
}
// SAFETY: fmt_ctx_ptr is fully configured.
let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) };
if ret < 0 {
bail!("Failed to write output header: error {ret}");
}
// SAFETY: ownership of fmt_ctx_ptr transfers to ffmpeg-next Output wrapper.
let octx = unsafe { ff::format::context::Output::wrap(fmt_ctx_ptr) };
tracing::info!("Using software H.264 encoder: {codec_name}");
Ok((enc_video, octx))
}
fn create_software_h264_encoder(
width: u32,
height: u32,
fps: u32,
bitrate: u64,
gop_size: u32,
) -> Result<ff::codec::encoder::video::Video> {
let codec = ff::encoder::find_by_name("libx264")
.or_else(|| ff::encoder::find_by_name("libopenh264"))
.ok_or_else(|| anyhow::anyhow!("No H.264 software encoder found"))?;
let codec_name = codec.name().to_string();
let mut enc = {
let ctx = ff::codec::Context::new_with_codec(codec);
ctx.encoder().video()?
};
enc.set_width(width);
enc.set_height(height);
enc.set_format(ff::format::Pixel::YUV420P);
enc.set_bit_rate(bitrate as usize);
enc.set_gop(gop_size);
enc.set_time_base(ff::Rational::new(1, fps as i32));
enc.set_max_b_frames(0);
if codec_name == "libx264" {
unsafe {
let key = CString::new("preset").unwrap();
let val = CString::new("ultrafast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("threads").unwrap();
let val = CString::new("6").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("x264opts").unwrap();
let val = CString::new("repeat_headers=1").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
}
}
let opened = enc
.open()
.map_err(|e| anyhow::anyhow!("Failed to open {codec_name} encoder: {e}"))?;
tracing::info!("WebRTC encoder: {codec_name} {width}x{height} @ {fps}fps {bitrate}bps");
Ok(opened.0)
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Filter graph (inline) // Filter graph (inline)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -591,8 +1315,7 @@ fn build_filter_graph(
Transform::Flipped270 => "0", Transform::Flipped270 => "0",
Transform::Normal => unreachable!(), Transform::Normal => unreachable!(),
}; };
let mut trans_ctx = let mut trans_ctx = graph.add(&transpose, "transpose", &format!("dir={dir_val}"))?;
graph.add(&transpose, "transpose", &format!("dir={dir_val}"))?;
unsafe { unsafe {
(*trans_ctx.as_mut_ptr()).hw_device_ctx = hw_dev.ref_clone(); (*trans_ctx.as_mut_ptr()).hw_device_ctx = hw_dev.ref_clone();
} }

View File

@@ -37,11 +37,10 @@ impl Dispatch<WlRegistry, GlobalListContents> for RegistryLs {
} }
} }
// 通过 D-Bus 检测 XDG Desktop Portal 的 ScreenCast 接口是否可用 // CAUTION: must NOT use ashpd here — ashpd caches zbus::Connection in a global
// 尝试创建 Screencast proxy如果 Portal 服务未运行则返回 false // OnceLock; if the tokio runtime owning that connection is dropped before
// setup_portal() runs, the cached connection becomes dead and hangs forever.
fn check_portal_available() -> bool { fn check_portal_available() -> bool {
use ashpd::desktop::screencast::Screencast;
let rt = match tokio::runtime::Runtime::new() { let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt, Ok(rt) => rt,
Err(e) => { Err(e) => {
@@ -51,30 +50,43 @@ fn check_portal_available() -> bool {
}; };
rt.block_on(async { rt.block_on(async {
let proxy = match Screencast::new().await { let conn = match zbus::Connection::session().await {
Ok(p) => p, Ok(c) => c,
Err(e) => { Err(e) => {
tracing::info!("Portal not available: {e}"); tracing::info!("D-Bus session bus unavailable: {e}");
return false; return false;
} }
}; };
// Verify the portal actually exposes ScreenCast capabilities, let inner: zbus::Proxy = match zbus::proxy::Builder::new(&conn)
// not just that the D-Bus service is running. .destination("org.freedesktop.portal.Desktop")
match proxy.available_source_types().await { .and_then(|b| b.path("/org/freedesktop/portal/desktop"))
Ok(types) if !types.is_empty() => { .and_then(|b| b.interface("org.freedesktop.portal.ScreenCast"))
tracing::info!("Portal ScreenCast available (source types: {types:?})"); {
Ok(b) => match b.build().await {
Ok(p) => p,
Err(e) => {
tracing::info!("Portal ScreenCast interface not available: {e}");
return false;
}
},
Err(e) => {
tracing::info!("Portal ScreenCast proxy build failed: {e}");
return false;
}
};
let version = match inner.get_property::<u32>("version").await {
Ok(version) => {
tracing::info!("Portal ScreenCast available (version: {version})");
true true
} }
Ok(types) => {
tracing::info!("Portal ScreenCast proxy exists but no source types available ({types:?})");
false
}
Err(e) => { Err(e) => {
tracing::info!("Portal ScreenCast available_source_types query failed: {e}"); tracing::info!("Portal ScreenCast version query failed: {e}");
false false
} }
} };
version
}) })
} }
@@ -125,10 +137,7 @@ pub fn detect_backend(args: &Args) -> Result<CaptureBackend> {
} }
other => { other => {
// 未知后端名称,返回错误 // 未知后端名称,返回错误
anyhow::bail!( anyhow::bail!("Unknown backend '{}'. Use 'screencopy' or 'portal'.", other);
"Unknown backend '{}'. Use 'screencopy' or 'portal'.",
other
);
} }
}; };
} }

545
src/bin/sw_encode_bench.rs Normal file
View File

@@ -0,0 +1,545 @@
// sw_encode_bench.rs — Software encoding pipeline benchmark for screen capture
//
// Benchmarks: Portal capture -> mmap DMA-BUF -> sws_scale BGR0->YUV420P -> libx264 encode
//
// Usage: cargo run --bin sw_encode_bench -- --output /tmp/bench_test.mp4
use std::ffi::CString;
use std::os::fd::AsRawFd;
use std::path::Path;
use std::ptr;
use std::time::Instant;
use anyhow::{bail, Result};
use clap::Parser;
use ffmpeg_next as ff;
use ffmpeg_next::ffi;
use ffmpeg_next::packet::Mut;
use wl_webrtc::args::Args;
use wl_webrtc::cap_portal::{CapPortal, PwCtrlEvent};
#[derive(Parser, Debug)]
#[command(
name = "sw_encode_bench",
about = "Software encoding pipeline benchmark"
)]
struct BenchArgs {
#[arg(short, long)]
output: String,
#[arg(long, default_value_t = 120)]
frames: u32,
#[arg(long, default_value_t = 2560)]
enc_width: u32,
#[arg(long, default_value_t = 1440)]
enc_height: u32,
}
#[derive(Default)]
struct FrameStats {
mmap_us: Vec<u64>,
scale_us: Vec<u64>,
encode_us: Vec<u64>,
total_us: Vec<u64>,
mmap_failures: u32,
}
impl FrameStats {
fn avg_ms(data: &[u64]) -> f64 {
if data.is_empty() {
return 0.0;
}
data.iter().sum::<u64>() as f64 / data.len() as f64 / 1000.0
}
}
fn pix_fmt(p: ff::format::Pixel) -> ffi::AVPixelFormat {
Into::<ffi::AVPixelFormat>::into(p)
}
fn receive_first_frame(cap: &CapPortal) -> Result<wl_webrtc::cap_portal::PwDmaBufFrame> {
loop {
if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"),
PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"),
}
}
match cap
.frame_receiver()
.recv_timeout(std::time::Duration::from_secs(10))
{
Ok(frame) => return Ok(frame),
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
bail!("Timeout waiting for first frame (10s)");
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
bail!("PipeWire frame channel disconnected");
}
}
}
}
fn main() -> Result<()> {
let bench_args = BenchArgs::parse();
println!("=== Software Encode Benchmark ===");
println!("Output: {}", bench_args.output);
println!("Target frames: {}", bench_args.frames);
println!(
"Encode resolution: {}x{}",
bench_args.enc_width, bench_args.enc_height
);
println!();
ff::init()?;
println!("[1/4] Requesting screen capture via XDG Portal...");
println!(" (Select a screen to share in the portal dialog)");
let portal_args = Args {
output: bench_args.output.clone(),
output_name: None,
fps: 60,
codec: "h264".to_string(),
hw_accel: "vaapi".to_string(),
drm_device: None,
bitrate: None,
gop_size: None,
verbose: false,
backend: Some("portal".to_string()),
port: 0,
};
let cap = CapPortal::new(&portal_args)?;
println!("[1/4] Portal connected, PipeWire stream active\n");
println!("[2/4] Waiting for first frame from PipeWire...");
let first_frame = receive_first_frame(&cap)?;
let src_width = first_frame.width;
let src_height = first_frame.height;
let src_stride = first_frame.stride;
let enc_width = bench_args.enc_width;
let enc_height = bench_args.enc_height;
println!(
"[2/4] First frame: {}x{}, stride={}, format=0x{:08X}",
src_width, src_height, src_stride, first_frame.format
);
println!(
" Capture: {}x{} Encode: {}x{}\n",
src_width, src_height, enc_width, enc_height
);
println!("[3/4] Testing mmap on DMA-BUF...");
let mmap_size = (src_stride as usize) * (src_height as usize);
let mmap_ptr = unsafe {
libc::mmap(
ptr::null_mut(),
mmap_size,
libc::PROT_READ,
libc::MAP_SHARED,
first_frame.fd.as_raw_fd(),
first_frame.offset as i64,
)
};
if mmap_ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
bail!(
"mmap on DMA-BUF fd FAILED — AMD driver may not support \
CPU read of screen capture DMA-BUF buffers.\n\
Error: {} (errno={})\n\
\n\
Workarounds:\n\
1. Use VAAPI hardware import (av_hwframe_map) instead of mmap\n\
2. Use wlroots compositor with wlr-screencopy (SHM-based)\n\
3. Use a virtual display or software renderer",
errno,
errno.raw_os_error().unwrap_or(-1)
);
}
println!(
"[3/4] mmap SUCCESS — CPU can read DMA-BUF ({:.1} MB)\n",
mmap_size as f64 / 1024.0 / 1024.0
);
unsafe {
libc::munmap(mmap_ptr, mmap_size);
}
drop(first_frame);
// Set up libx264 encoder via FFI (same pattern as avhw.rs)
println!("[4/4] Setting up libx264 encoder...");
let output_path = Path::new(&bench_args.output);
let output_cstr = CString::new(output_path.to_str().unwrap())?;
// Try libx264 first (best quality/speed), fall back to openh264
let codec = ff::encoder::find_by_name("libx264")
.or_else(|| ff::encoder::find_by_name("libopenh264"))
.ok_or_else(|| {
anyhow::anyhow!("No H.264 software encoder found (tried libx264, libopenh264)")
})?;
println!("[4/4] Using encoder: {}\n", codec.name());
let mut enc = {
let ctx = ff::codec::Context::new_with_codec(codec);
ctx.encoder().video()?
};
enc.set_width(enc_width);
enc.set_height(enc_height);
enc.set_format(ff::format::Pixel::YUV420P);
enc.set_time_base(ff::Rational::new(1, 60));
enc.set_max_b_frames(0);
enc.set_gop(60);
let codec_name = codec.name();
if codec_name == "libx264" {
unsafe {
let key = CString::new("preset").unwrap();
let val = CString::new("veryfast").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
let key = CString::new("tune").unwrap();
let val = CString::new("zerolatency").unwrap();
ffi::av_opt_set((*enc.as_mut_ptr()).priv_data, key.as_ptr(), val.as_ptr(), 0);
}
}
let opened = enc.open()?;
let mut enc_video = opened.0;
// Create output format context via FFI
let mut fmt_ctx_ptr: *mut ffi::AVFormatContext = ptr::null_mut();
let ret = unsafe {
ffi::avformat_alloc_output_context2(
&mut fmt_ctx_ptr,
ptr::null_mut(),
ptr::null(),
output_cstr.as_ptr(),
)
};
if ret < 0 || fmt_ctx_ptr.is_null() {
bail!("Failed to allocate output format context: error {ret}");
}
let stream_ptr = unsafe { ffi::avformat_new_stream(fmt_ctx_ptr, ptr::null()) };
if stream_ptr.is_null() {
bail!("Failed to create new stream");
}
let ret =
unsafe { ffi::avcodec_parameters_from_context((*stream_ptr).codecpar, enc_video.as_ptr()) };
if ret < 0 {
bail!("Failed to copy encoder parameters: error {ret}");
}
unsafe {
(*stream_ptr).time_base = (*enc_video.as_ptr()).time_base;
}
let ret = unsafe {
ffi::avio_open(
&mut (*fmt_ctx_ptr).pb,
output_cstr.as_ptr(),
ffi::AVIO_FLAG_WRITE,
)
};
if ret < 0 {
bail!(
"Failed to open output file '{}': error {ret}",
output_path.display()
);
}
let ret = unsafe { ffi::avformat_write_header(fmt_ctx_ptr, ptr::null_mut()) };
if ret < 0 {
bail!("Failed to write header: error {ret}");
}
let mut octx = unsafe { ff::format::context::Output::wrap(fmt_ctx_ptr) };
// Create sws_scale context: BGRZ (BGR0) -> YUV420P
let bgr0_fmt = pix_fmt(ff::format::Pixel::BGRZ);
let yuv420p_fmt = pix_fmt(ff::format::Pixel::YUV420P);
let sws_ctx = unsafe {
ffi::sws_getContext(
src_width as i32,
src_height as i32,
bgr0_fmt,
enc_width as i32,
enc_height as i32,
yuv420p_fmt,
2,
ptr::null_mut(),
ptr::null_mut(),
ptr::null_mut(),
)
};
if sws_ctx.is_null() {
bail!("Failed to create sws_scale context");
}
// Allocate reusable YUV frame
let mut yuv_frame = unsafe {
let mut f = ffi::av_frame_alloc();
if f.is_null() {
bail!("av_frame_alloc failed");
}
(*f).width = enc_width as i32;
(*f).height = enc_height as i32;
(*f).format = yuv420p_fmt as i32;
let ret = ffi::av_frame_get_buffer(f, 0);
if ret < 0 {
ffi::av_frame_free(&mut f);
bail!("av_frame_get_buffer failed: {ret}");
}
f
};
println!(
"[4/4] Encoder ready: {}, {}x{}\n",
codec_name, enc_width, enc_height
);
println!("=== Encoding {} frames ===\n", bench_args.frames);
let mut stats = FrameStats::default();
let total_start = Instant::now();
let mut frames_encoded: u32 = 0;
let mut pts: i64 = 0;
while frames_encoded < bench_args.frames {
if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => {
eprintln!("PipeWire stream ended after {} frames", frames_encoded);
break;
}
PwCtrlEvent::Error(e) => {
eprintln!("PipeWire error after {} frames: {}", frames_encoded, e);
break;
}
}
}
let frame = match cap
.frame_receiver()
.recv_timeout(std::time::Duration::from_secs(5))
{
Ok(f) => f,
Err(_) => {
eprintln!("Frame timeout/disconnect after {} frames", frames_encoded);
break;
}
};
let frame_start = Instant::now();
let mmap_start = Instant::now();
let frame_size = (frame.stride as usize) * (frame.height as usize);
let mmap_ptr = unsafe {
libc::mmap(
ptr::null_mut(),
frame_size,
libc::PROT_READ,
libc::MAP_SHARED,
frame.fd.as_raw_fd(),
frame.offset as i64,
)
};
if mmap_ptr == libc::MAP_FAILED {
stats.mmap_failures += 1;
eprintln!("mmap failed on frame {}", frames_encoded);
drop(frame);
continue;
}
stats.mmap_us.push(mmap_start.elapsed().as_micros() as u64);
let scale_start = Instant::now();
let src_data = unsafe { std::slice::from_raw_parts(mmap_ptr as *const u8, frame_size) };
unsafe {
ffi::av_frame_make_writable(yuv_frame);
let src_ptr = src_data.as_ptr();
let src_linesize = frame.stride as i32;
ffi::sws_scale(
sws_ctx,
&src_ptr as *const *const u8,
&src_linesize as *const i32,
0,
frame.height as i32,
(*yuv_frame).data.as_ptr() as *mut *mut u8,
(*yuv_frame).linesize.as_ptr() as *mut i32,
);
}
stats
.scale_us
.push(scale_start.elapsed().as_micros() as u64);
unsafe {
libc::munmap(mmap_ptr, frame_size);
}
drop(frame);
let encode_start = Instant::now();
unsafe {
(*yuv_frame).pts = pts;
pts += 1;
let ret = ffi::avcodec_send_frame(enc_video.as_mut_ptr(), yuv_frame);
if ret < 0 {
eprintln!("avcodec_send_frame failed: {ret}");
continue;
}
}
drain_encoder(&mut enc_video, &mut octx)?;
stats
.encode_us
.push(encode_start.elapsed().as_micros() as u64);
stats
.total_us
.push(frame_start.elapsed().as_micros() as u64);
frames_encoded += 1;
if frames_encoded % 30 == 0 {
let fps = frames_encoded as f64 / total_start.elapsed().as_secs_f64();
println!(
" [{}/{}] {:.1} FPS",
frames_encoded, bench_args.frames, fps
);
}
}
let total_elapsed = total_start.elapsed();
println!("\nFlushing encoder...");
unsafe {
ffi::avcodec_send_frame(enc_video.as_mut_ptr(), ptr::null());
}
drain_encoder(&mut enc_video, &mut octx)?;
octx.write_trailer()
.map_err(|e| anyhow::anyhow!("Failed to write trailer: {e}"))?;
// Cleanup
unsafe {
ffi::av_frame_free(&mut yuv_frame as *mut _);
ffi::sws_freeContext(sws_ctx);
}
drop(cap);
// Print results
let mmap_count = stats.mmap_us.len() as u32;
let mmap_success_rate = if mmap_count + stats.mmap_failures > 0 {
mmap_count as f64 / (mmap_count + stats.mmap_failures) as f64 * 100.0
} else {
0.0
};
let total_fps = frames_encoded as f64 / total_elapsed.as_secs_f64();
let avg_total_ms = FrameStats::avg_ms(&stats.total_us);
let max_fps = if avg_total_ms > 0.0 {
1000.0 / avg_total_ms
} else {
0.0
};
println!();
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ Software Encode Benchmark Results ║");
println!("╚══════════════════════════════════════════════════════════════╝");
println!();
println!("Capture resolution: {}x{}", src_width, src_height);
println!("Encode resolution: {}x{}", enc_width, enc_height);
println!("Frames encoded: {}", frames_encoded);
println!("Total time: {:.2}s", total_elapsed.as_secs_f64());
println!();
println!("mmap (DMA-BUF -> CPU):");
println!(
" avg: {:.2} ms/frame",
FrameStats::avg_ms(&stats.mmap_us)
);
println!(
" success rate: {:.1}% ({}/{})",
mmap_success_rate,
mmap_count,
mmap_count + stats.mmap_failures
);
println!();
println!("scale (BGR0 -> YUV420P via sws_scale):");
println!(
" avg: {:.2} ms/frame",
FrameStats::avg_ms(&stats.scale_us)
);
println!();
println!("encode ({}):", codec_name);
println!(
" avg: {:.2} ms/frame",
FrameStats::avg_ms(&stats.encode_us)
);
println!();
println!("total pipeline:");
println!(" avg: {:.2} ms/frame", avg_total_ms);
println!(" achieved FPS: {:.1}", total_fps);
println!(" max theoretical: {:.1} FPS", max_fps);
println!();
if mmap_success_rate < 100.0 {
println!(
"WARNING: Some mmap operations failed ({}/{})",
stats.mmap_failures,
stats.mmap_failures + mmap_count
);
}
if total_fps < 30.0 {
println!(
"NOTE: Achieved FPS ({:.1}) is below 30 FPS target.",
total_fps
);
}
println!("Output written to: {}", bench_args.output);
Ok(())
}
fn drain_encoder(
enc_video: &mut ff::encoder::video::Video,
octx: &mut ff::format::context::Output,
) -> Result<()> {
loop {
let mut pkt = ff::Packet::empty();
let ret = unsafe { ffi::avcodec_receive_packet(enc_video.as_mut_ptr(), pkt.as_mut_ptr()) };
if ret < 0 {
if ret == ffi::AVERROR(ffi::EAGAIN) || ret == ffi::AVERROR_EOF {
break;
}
eprintln!("avcodec_receive_packet failed: {ret}");
break;
}
let enc_tb = enc_video.time_base();
let stream_tb = unsafe {
let streams = (*octx.as_ptr()).streams;
let st = *streams.add(0);
ff::Rational::from((*st).time_base)
};
pkt.rescale_ts(enc_tb, stream_tb);
pkt.set_stream(0);
pkt.write_interleaved(octx)
.map_err(|e| anyhow::anyhow!("write packet failed: {e}"))?;
}
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@@ -12,11 +12,13 @@
// - crossbeam-channel: 高性能有界通道,用于线程间帧传递 // - crossbeam-channel: 高性能有界通道,用于线程间帧传递
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use anyhow::Result; use anyhow::Result;
use crossbeam_channel::{Receiver, Sender, bounded}; use crossbeam_channel::{bounded, Receiver, Sender};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use crate::args::Args; use crate::args::Args;
@@ -45,13 +47,11 @@ pub struct PwDmaBufFrame {
pub pts: i64, pub pts: i64,
} }
/// PipeWire 事件枚举 /// PipeWire 控制事件枚举
/// ///
/// 从 PipeWire 捕获线程发送给消费者的事件类型 /// 从 PipeWire 捕获线程发送给消费者的控制事件。
/// 消费者通过 frame_receiver() 获取的 Receiver 接收这些事件 /// 与帧数据分离,通过独立的 channel 传输,确保控制事件不被帧数据淹没
pub enum PwEvent { pub enum PwCtrlEvent {
/// 收到一帧新的 DMA-BUF 视频帧
Frame(PwDmaBufFrame),
/// 流已结束PipeWire 流断开连接或进入错误状态) /// 流已结束PipeWire 流断开连接或进入错误状态)
StreamEnded, StreamEnded,
/// 发生错误,包含错误描述信息 /// 发生错误,包含错误描述信息
@@ -68,13 +68,10 @@ pub enum PwEvent {
/// 2. frame_receiver() — 获取帧接收端,供消费者轮询 /// 2. frame_receiver() — 获取帧接收端,供消费者轮询
/// 3. Drop — 通过 eventfd 通知 PipeWire 线程安全退出 /// 3. Drop — 通过 eventfd 通知 PipeWire 线程安全退出
pub struct CapPortal { pub struct CapPortal {
/// eventfd 的写入端,用于在 drop 时通知 PipeWire 线程退出
shutdown_fd: OwnedFd, shutdown_fd: OwnedFd,
/// 帧事件接收端,消费者通过此 Receiver 获取帧数据 frame_rx: Receiver<PwDmaBufFrame>,
frame_rx: Receiver<PwEvent>, event_rx: Receiver<PwCtrlEvent>,
/// PipeWire 捕获线程的 JoinHandledrop 时等待线程退出
pw_thread: Option<JoinHandle<()>>, pw_thread: Option<JoinHandle<()>>,
/// Tokio 运行时,仅用于 setup_portal() 中的异步 Portal 调用
rt: Runtime, rt: Runtime,
} }
@@ -83,17 +80,12 @@ pub struct CapPortal {
/// 从主线程传递给 PipeWire 捕获线程的所有必要资源。 /// 从主线程传递给 PipeWire 捕获线程的所有必要资源。
/// 该结构体在线程创建时一次性 move 到线程中使用。 /// 该结构体在线程创建时一次性 move 到线程中使用。
struct PwThreadCtx { struct PwThreadCtx {
/// 帧事件发送端,用于向消费者线程发送帧数据或错误/结束事件 frame_tx: Sender<PwDmaBufFrame>,
frame_tx: Sender<PwEvent>, event_tx: Sender<PwCtrlEvent>,
/// 已丢弃帧的计数器(原子操作),用于统计因通道满而丢弃的帧数 dropped: Arc<AtomicU64>,
dropped: AtomicU64,
/// eventfd 的读取端,注册到 PipeWire 事件循环中,用于接收关闭信号
shutdown_read: OwnedFd, shutdown_read: OwnedFd,
/// Portal 返回的 PipeWire 远程连接文件描述符
pw_fd: OwnedFd, pw_fd: OwnedFd,
/// Portal 返回的 PipeWire 节点 ID标识要捕获的屏幕流
node_id: u32, node_id: u32,
/// 目标帧率(当前保留,未直接用于 PipeWire 协商)
fps: u32, fps: u32,
} }
@@ -103,27 +95,18 @@ impl CapPortal {
/// 执行流程: /// 执行流程:
/// 1. 创建 Tokio 运行时(用于异步 Portal 调用) /// 1. 创建 Tokio 运行时(用于异步 Portal 调用)
/// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id /// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id
/// 3. 创建有界通道(容量 3)用于帧传递 /// 3. 创建有界通道(容量 16)用于帧传递
/// 4. 创建 eventfd 对,用于线程安全的关闭信号传递 /// 4. 创建 eventfd 对,用于线程安全的关闭信号传递
/// 5. 启动 PipeWire 捕获线程 /// 5. 启动 PipeWire 捕获线程
pub fn new(args: &Args) -> Result<Self> { pub fn new(args: &Args) -> Result<Self> {
// 创建独立的 Tokio 运行时,仅用于 setup_portal 中的异步 Portal D-Bus 调用
let rt = Runtime::new()?; let rt = Runtime::new()?;
// 通过 Portal 获取 PipeWire 连接 fd 和节点 ID let no_persist = args.no_persist;
// block_on 在此处同步等待异步 Portal 调用完成 let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?;
let (pw_fd, node_id) = rt.block_on(async {
Self::setup_portal().await
})?;
// 创建有界通道,容量为 3 帧 let (frame_tx, frame_rx) = bounded(16);
// 使用有界通道实现背压:当消费者处理不过来时,生产者会丢弃帧而非无限堆积 let (event_tx, event_rx) = bounded(8);
let (frame_tx, frame_rx) = bounded(3);
// 创建 eventfd 对,用于线程安全的关闭信号传递
// eventfd 是 Linux 内核提供的轻量级进程/线程间通知机制
// 写入端保存在 CapPortal主线程读取端注册到 PipeWire 事件循环中
// 这样 CapPortal drop 时可以安全地通知 PipeWire 线程退出
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
if efd < 0 { if efd < 0 {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
@@ -131,8 +114,6 @@ impl CapPortal {
std::io::Error::last_os_error() std::io::Error::last_os_error()
)); ));
} }
// 复制 eventfd 得到写入端,原始 fd 作为读取端
// 需要 dup 是因为读取端和写入端需要各自独立的 OwnedFd 所有权
let write_fd = unsafe { libc::dup(efd) }; let write_fd = unsafe { libc::dup(efd) };
if write_fd < 0 { if write_fd < 0 {
let err = std::io::Error::last_os_error(); let err = std::io::Error::last_os_error();
@@ -140,38 +121,45 @@ impl CapPortal {
return Err(anyhow::anyhow!("dup eventfd failed: {err}")); return Err(anyhow::anyhow!("dup eventfd failed: {err}"));
} }
// 构建 PipeWire 线程上下文,将所有必要资源 move 进去 let pw_dropped = Arc::new(AtomicU64::new(0));
let ctx = PwThreadCtx { let ctx = PwThreadCtx {
frame_tx, frame_tx,
dropped: AtomicU64::new(0), event_tx,
dropped: pw_dropped.clone(),
shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) }, shutdown_read: unsafe { OwnedFd::from_raw_fd(efd) },
pw_fd, pw_fd,
node_id, node_id,
fps: args.fps, fps: args.fps,
}; };
// 启动 PipeWire 捕获线程,命名便于调试和性能分析
let pw_thread = thread::Builder::new() let pw_thread = thread::Builder::new()
.name("pipewire-capture".into()) .name("pipewire-capture".into())
.spawn(move || { .spawn(move || {
pipewire_thread(ctx); pipewire_thread(ctx);
})
.map_err(|e| {
unsafe { libc::close(write_fd) };
anyhow::anyhow!("thread spawn failed: {e}")
})?; })?;
Ok(Self { Ok(Self {
shutdown_fd: unsafe { OwnedFd::from_raw_fd(write_fd) }, shutdown_fd: unsafe { OwnedFd::from_raw_fd(write_fd) },
frame_rx, frame_rx,
event_rx,
pw_thread: Some(pw_thread), pw_thread: Some(pw_thread),
rt, rt,
}) })
} }
/// 获取帧事件接收端的引用 pub fn frame_receiver(&self) -> &Receiver<PwDmaBufFrame> {
///
/// 消费者通过此方法获取 Receiver然后不断接收 PwEvent 事件来获取帧数据。
pub fn frame_receiver(&self) -> &Receiver<PwEvent> {
&self.frame_rx &self.frame_rx
} }
pub fn event_receiver(&self) -> &Receiver<PwCtrlEvent> {
&self.event_rx
}
/// 通过 XDG Desktop Portal 建立屏幕录制会话 /// 通过 XDG Desktop Portal 建立屏幕录制会话
/// ///
/// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。 /// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。
@@ -183,44 +171,50 @@ impl CapPortal {
/// 5. 打开 PipeWire 远程连接,获取文件描述符 /// 5. 打开 PipeWire 远程连接,获取文件描述符
/// ///
/// 返回 (PipeWire fd, node_id),供 PipeWire 线程连接使用 /// 返回 (PipeWire fd, node_id),供 PipeWire 线程连接使用
async fn setup_portal() -> Result<(OwnedFd, u32)> { async fn setup_portal(no_persist: bool) -> Result<(OwnedFd, u32)> {
use ashpd::desktop::screencast::{ use ashpd::desktop::screencast::{
CursorMode, Screencast, SelectSourcesOptions, SourceType, CursorMode, Screencast, SelectSourcesOptions, SourceType,
}; };
use ashpd::desktop::PersistMode; use ashpd::desktop::PersistMode;
// 创建 Screencast D-Bus 代理,与桌面环境的 Portal 服务通信 let proxy = Screencast::new()
let proxy = Screencast::new().await.map_err(|e| { .await
anyhow::anyhow!("Failed to create Screencast proxy: {e}") .map_err(|e| anyhow::anyhow!("Failed to create Screencast proxy: {e}"))?;
})?;
// 创建 ScreenCast 会话(每个会话对应一次屏幕录制请求)
let session = proxy let session = proxy
.create_session(Default::default()) .create_session(Default::default())
.await .await
.map_err(|e| anyhow::anyhow!("Failed to create ScreenCast session: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to create ScreenCast session: {e}"))?;
// 配置录制源选择参数: let version_supported = proxy.version() >= 4;
// - CursorMode::Embedded: 光标嵌入到帧数据中(而非单独的元数据)
// - SourceType::Monitor: 仅捕获显示器(不捕获窗口) let (persist_mode, saved_token) = if !no_persist && version_supported {
// - multiple: false: 不允许多源选择 let token = load_restore_token();
// - PersistMode::DoNot: 不持久化会话(每次需要重新授权) if token.is_some() {
proxy tracing::info!("Attempting to restore portal session with saved token");
.select_sources( }
&session, (PersistMode::ExplicitlyRevoked, token)
SelectSourcesOptions::default() } else {
(PersistMode::DoNot, None)
};
let mut options = SelectSourcesOptions::default()
.set_cursor_mode(CursorMode::Embedded) .set_cursor_mode(CursorMode::Embedded)
.set_sources(ashpd::enumflags2::BitFlags::from(SourceType::Monitor)) .set_sources(ashpd::enumflags2::BitFlags::from(SourceType::Monitor))
.set_multiple(false) .set_multiple(false)
.set_persist_mode(PersistMode::DoNot), .set_persist_mode(persist_mode);
)
if let Some(ref token) = saved_token {
options = options.set_restore_token(token.as_str());
}
proxy
.select_sources(&session, options)
.await .await
.map_err(|e| { .map_err(|e| {
anyhow::anyhow!("屏幕共享权限被拒绝 / Screen sharing permission denied: {e}") anyhow::anyhow!("Screen sharing permission denied: {e}")
})?; })?;
// 启动录制会话,此时桌面环境会弹出权限确认对话框
// 用户确认后返回包含 PipeWire 流信息的响应
let response = proxy let response = proxy
.start(&session, None, Default::default()) .start(&session, None, Default::default())
.await .await
@@ -228,18 +222,19 @@ impl CapPortal {
.response() .response()
.map_err(|e| anyhow::anyhow!("ScreenCast response error: {e}"))?; .map_err(|e| anyhow::anyhow!("ScreenCast response error: {e}"))?;
// 获取返回的第一个(也是唯一的)视频流 if !no_persist && version_supported {
// 每个流对应一个 PipeWire 节点 if let Some(new_token) = response.restore_token() {
save_restore_token(new_token);
}
}
let stream = response let stream = response
.streams() .streams()
.first() .first()
.ok_or_else(|| anyhow::anyhow!("No streams returned from ScreenCast"))?; .ok_or_else(|| anyhow::anyhow!("No streams returned from ScreenCast"))?;
// 提取 PipeWire 节点 ID用于后续连接到该节点的视频流
let node_id = stream.pipe_wire_node_id(); let node_id = stream.pipe_wire_node_id();
// 打开 PipeWire 远程连接,获取文件描述符
// 这个 fd 允许直接与 PipeWire 守护进程通信
let fd = proxy let fd = proxy
.open_pipe_wire_remote(&session, Default::default()) .open_pipe_wire_remote(&session, Default::default())
.await .await
@@ -251,6 +246,30 @@ impl CapPortal {
} }
} }
fn token_path() -> PathBuf {
let base = dirs::cache_dir()
.unwrap_or_else(|| PathBuf::from("/tmp"));
base.join("wl-webrtc").join("portal-restore-token")
}
fn load_restore_token() -> Option<String> {
let path = token_path();
let token = std::fs::read_to_string(&path).ok()?;
let trimmed = token.trim().to_string();
if trimmed.is_empty() { None } else { Some(trimmed) }
}
fn save_restore_token(token: &str) {
let path = token_path();
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
match std::fs::write(&path, token) {
Ok(()) => tracing::info!("Saved portal restore token"),
Err(e) => tracing::warn!("Failed to save restore token: {e}"),
}
}
impl Drop for CapPortal { impl Drop for CapPortal {
/// 析构时安全关闭 PipeWire 线程 /// 析构时安全关闭 PipeWire 线程
/// ///
@@ -296,52 +315,49 @@ impl Drop for CapPortal {
fn pipewire_thread(ctx: PwThreadCtx) { fn pipewire_thread(ctx: PwThreadCtx) {
use pipewire as pw; use pipewire as pw;
use pw::properties::properties; use pw::properties::properties;
use pw::spa::param::video::VideoInfoRaw;
use pw::stream::{StreamBox, StreamFlags}; use pw::stream::{StreamBox, StreamFlags};
use std::cell::Cell; use std::cell::Cell;
use std::rc::Rc; use std::rc::Rc;
use pw::spa::param::video::VideoInfoRaw;
// 初始化 PipeWire 库,必须在任何 PipeWire 操作之前调用 // 初始化 PipeWire 进程全局库。
//
// pipewire-rs 内部使用 OnceCell 保护 pw::init(),确保只调用一次。
// pw::deinit() 是 unsafe 且要求"进程生命周期内仅调用一次,且所有
// PipeWire 使用已停止"。由于 CapPortal 可被多次创建销毁,此函数
// 不调用 pw::deinit()——进程退出时全局状态由 OS 回收。
pw::init(); pw::init();
// 解构上下文,取出所有必要资源
// fps 重命名为 _fps 表示当前未使用(保留供将来帧率控制使用)
let PwThreadCtx { let PwThreadCtx {
frame_tx, frame_tx,
event_tx,
dropped, dropped,
shutdown_read, shutdown_read,
pw_fd, pw_fd,
node_id, node_id,
fps: _fps, fps,
} = ctx; } = ctx;
// 创建 PipeWire MainLoop主事件循环
// MainLoopBox 是栈分配的 PipeWire 主循环封装
let mainloop = match pw::main_loop::MainLoopBox::new(None) { let mainloop = match pw::main_loop::MainLoopBox::new(None) {
Ok(ml) => ml, Ok(ml) => ml,
Err(e) => { Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("MainLoop::new failed: {e}"))); let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}")));
return; return;
} }
}; };
// 创建 PipeWire Context用于管理核心对象和协议处理
let context = match pw::context::ContextBox::new(mainloop.loop_(), None) { let context = match pw::context::ContextBox::new(mainloop.loop_(), None) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("Context::new failed: {e}"))); let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}")));
return; return;
} }
}; };
// 使用 Portal 提供的 fd 连接到 PipeWire 核心守护进程
// connect_fd 接管该 fd 的所有权(通过 dup不关闭原始 fd
let core = match context.connect_fd(pw_fd, None) { let core = match context.connect_fd(pw_fd, None) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!( let _ = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}")));
"connect_fd failed: {e}"
)));
return; return;
} }
}; };
@@ -358,37 +374,30 @@ fn pipewire_thread(ctx: PwThreadCtx) {
*pw::keys::MEDIA_TYPE => "Video", *pw::keys::MEDIA_TYPE => "Video",
*pw::keys::MEDIA_CATEGORY => "Capture", *pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_ROLE => "Screen", *pw::keys::MEDIA_ROLE => "Screen",
*pw::keys::NODE_FORCE_QUANTUM => "512",
}, },
) { ) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
let _ = frame_tx.send(PwEvent::Error(format!("Stream::new failed: {e}"))); let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}")));
return; return;
} }
}; };
// 共享的格式状态: (宽度, 高度, DRM FourCC 格式, 修饰符) let format_info: Rc<Cell<Option<(u32, u32, u32, u64)>>> = Rc::new(Cell::new(None));
// 使用 Rc<Cell<>> 因为 PipeWire 回调在同一个线程内执行,无需跨线程同步
// Cell<Option<...>> 允许在不可变引用中修改值(内部可变性)
// format_info 在 param_changed 回调中设置,在 process 回调中读取
let format_info: Rc<Cell<Option<(u32, u32, u32, u64)>>> =
Rc::new(Cell::new(None));
let frame_tx_clone = frame_tx.clone(); let event_tx_state = event_tx.clone();
// 注册流事件监听器,包含三个回调:
// - state_changed: 流状态变化通知
// - param_changed: 格式协商完成通知
// - process: 每帧数据处理
let _listener = stream let _listener = stream
.add_local_listener::<()>() .add_local_listener::<()>()
// 流状态变化回调
// 当流进入 Error 或 Unconnected 状态时,通知消费者流已结束
.state_changed(move |_, _, old, new| { .state_changed(move |_, _, old, new| {
tracing::debug!("PipeWire stream state: {old:?} -> {new:?}"); tracing::info!("PipeWire stream state: {old:?} -> {new:?}");
match new { match new {
pw::stream::StreamState::Error(_) pw::stream::StreamState::Error(e) => {
| pw::stream::StreamState::Unconnected => { tracing::error!("PipeWire stream error: {e}");
let _ = frame_tx_clone.send(PwEvent::StreamEnded); let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
}
pw::stream::StreamState::Unconnected => {
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
} }
_ => {} _ => {}
} }
@@ -416,11 +425,16 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let drm_format = spa_to_drm_fourcc(info.format()); let drm_format = spa_to_drm_fourcc(info.format());
// 获取 DRM 修饰符,描述 GPU buffer 的内存布局(如 tiling 模式) // 获取 DRM 修饰符,描述 GPU buffer 的内存布局(如 tiling 模式)
let modifier = info.modifier(); let modifier = info.modifier();
let framerate = info.framerate();
let max_framerate = info.max_framerate();
// 保存协商后的格式信息,供 process 回调读取 // 保存协商后的格式信息,供 process 回调读取
format_info.set(Some((width, height, drm_format, modifier))); format_info.set(Some((width, height, drm_format, modifier)));
tracing::info!( tracing::info!(
"PipeWire format negotiated: {width}x{height}, \ "PipeWire format negotiated: {width}x{height}, \
drm_format={drm_format:#010x}, modifier={modifier:#x}" drm_format={drm_format:#010x}, modifier={modifier:#x}, \
framerate={}/{}, max_framerate={}/{}",
framerate.num, framerate.denom,
max_framerate.num, max_framerate.denom,
); );
} }
}) })
@@ -432,7 +446,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let frame_tx = frame_tx.clone(); let frame_tx = frame_tx.clone();
let dropped = dropped; let dropped = dropped;
move |stream, _| { move |stream, _| {
// 从流中出队原始 buffer包含帧数据的元信息
let raw_buf = unsafe { stream.dequeue_raw_buffer() }; let raw_buf = unsafe { stream.dequeue_raw_buffer() };
if raw_buf.is_null() { if raw_buf.is_null() {
return; return;
@@ -456,14 +470,18 @@ fn pipewire_thread(ctx: PwThreadCtx) {
// 从第一个数据项中获取 DMA-BUF 文件描述符 // 从第一个数据项中获取 DMA-BUF 文件描述符
// 通过 libspa 的 Data 包装类型安全地访问 SPA 数据结构 // 通过 libspa 的 Data 包装类型安全地访问 SPA 数据结构
let data_ref: &pw::spa::buffer::Data = unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) }; let data_ref: &pw::spa::buffer::Data =
unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) };
let fd = data_ref.fd(); let fd = data_ref.fd();
if fd < 0 { if fd < 0 {
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
// 获取 chunk 信息,包含帧数据在 DMA-BUF 中的偏移量和行跨度 if data_ref.as_raw().chunk.is_null() {
unsafe { stream.queue_raw_buffer(raw_buf) };
return;
}
let chunk = data_ref.chunk(); let chunk = data_ref.chunk();
let offset = chunk.offset() as u64; let offset = chunk.offset() as u64;
let stride = chunk.stride() as u32; let stride = chunk.stride() as u32;
@@ -479,7 +497,8 @@ fn pipewire_thread(ctx: PwThreadCtx) {
for i in 0..n_metas { for i in 0..n_metas {
let meta = &*metas.add(i as usize); let meta = &*metas.add(i as usize);
if meta.type_ == libspa::sys::SPA_META_Header if meta.type_ == libspa::sys::SPA_META_Header
&& meta.size as usize >= std::mem::size_of::<libspa::sys::spa_meta_header>() && meta.size as usize
>= std::mem::size_of::<libspa::sys::spa_meta_header>()
&& !meta.data.is_null() && !meta.data.is_null()
{ {
let header = &*(meta.data as *const libspa::sys::spa_meta_header); let header = &*(meta.data as *const libspa::sys::spa_meta_header);
@@ -522,52 +541,30 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pts, pts,
}; };
// 尝试非阻塞发送帧到通道 if let Err(crossbeam_channel::TrySendError::Full(_)) = frame_tx.try_send(frame) {
// 如果通道已满(消费者处理不过来),丢弃该帧并增加丢弃计数
// 每 30 帧丢弃时输出一条警告日志,避免日志洪泛
if let Err(crossbeam_channel::TrySendError::Full(_)) =
frame_tx.try_send(PwEvent::Frame(frame))
{
let prev = dropped.fetch_add(1, Ordering::Relaxed); let prev = dropped.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 30 == 0 { if prev > 0 && prev % 30 == 0 {
tracing::warn!("dropped {prev} frames total: encoder backlog"); tracing::warn!("dropped {prev} frames total: encoder backlog");
} }
} }
// 无论是否成功发送帧,都必须将 buffer 重新入队
// PipeWire 会复用这些 buffer不入队会导致 buffer 泄漏
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
} }
}) })
.register(); .register();
// 空的参数数组,不主动请求特定格式(由 PipeWire 和源端协商决定)
let mut params: [&pw::spa::pod::Pod; 0] = []; let mut params: [&pw::spa::pod::Pod; 0] = [];
// 连接到指定的 PipeWire 节点
// Direction::Input: 作为消费者(输入方向接收数据)
// AUTOCONNECT: 允许 PipeWire 自动连接源和消费者
// MAP_BUFFERS: 映射 buffer 到用户空间DMA-BUF 模式下必须设置)
if let Err(e) = stream.connect( if let Err(e) = stream.connect(
pw::spa::utils::Direction::Input, pw::spa::utils::Direction::Input,
Some(node_id), Some(node_id),
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
&mut params, &mut params,
) { ) {
let _ = frame_tx.send(PwEvent::Error(format!("stream.connect failed: {e}"))); let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}")));
return; return;
} }
let loop_ = mainloop.loop_(); let loop_ = mainloop.loop_();
// 注册信号处理(空回调),阻止 SIGINT/SIGTERM 默认行为终止线程
// 真正的退出通过 shutdown eventfd 控制
loop_.add_signal_local(
pw::loop_::Signal::SIGINT,
Box::new(|| {}),
);
loop_.add_signal_local(
pw::loop_::Signal::SIGTERM,
Box::new(|| {}),
);
// Register the shutdown eventfd on the PipeWire loop. // Register the shutdown eventfd on the PipeWire loop.
// //
@@ -607,10 +604,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
// run() returned — _shutdown_source drops first (reverse declaration order), // run() returned — _shutdown_source drops first (reverse declaration order),
// which unregisters the callback from the loop. Then mainloop drops. // which unregisters the callback from the loop. Then mainloop drops.
// No dangling raw pointers are possible. // No dangling raw pointers are possible.
// PipeWire global state is intentionally not deinitialized here — see pw::init() comment above.
// SAFETY: pipewire has been initialized with pw::init() above and all
// PipeWire resources (mainloop, stream) have been dropped.
unsafe { pw::deinit() };
} }
/// 将四个 ASCII 字符编码为 32 位 FourCC (Four Character Code) 标识符 /// 将四个 ASCII 字符编码为 32 位 FourCC (Four Character Code) 标识符
@@ -628,35 +622,65 @@ const fn fourcc(a: u8, b: u8, c: u8, d: u8) -> u32 {
/// 此函数建立了两者之间的映射关系。 /// 此函数建立了两者之间的映射关系。
/// ///
/// 支持的格式: /// 支持的格式:
/// - BGRA/BGRx: 蓝绿红(Alpha/X) 32位格式
/// - RGBA/RGBx: 红绿蓝(Alpha/X) 32位格式
/// - ARGB/xRGB: Alpha/X-红绿蓝 32位格式 (映射为 AR24/XR24)
/// - ABGR/xBGR: Alpha/X-蓝绿红 32位格式 (映射为 AB24/XB24)
///
/// 不支持的格式返回 0 /// 不支持的格式返回 0
/// DRM 格式名描述像素值位布局(大端序),而非内存字节序。
/// 例如 DRM_FORMAT_ARGB8888 在小端 x86 上内存为 [B,G,R,A] = PipeWire BGRA。
fn spa_to_drm_fourcc(format: libspa::param::video::VideoFormat) -> u32 { fn spa_to_drm_fourcc(format: libspa::param::video::VideoFormat) -> u32 {
use drm_fourcc::DrmFourcc;
use libspa::param::video::VideoFormat; use libspa::param::video::VideoFormat;
match format { match format {
VideoFormat::BGRA => fourcc(b'B', b'G', b'R', b'A'), VideoFormat::BGRA => DrmFourcc::Argb8888 as u32,
VideoFormat::BGRx => fourcc(b'B', b'G', b'R', b'X'), VideoFormat::BGRx => DrmFourcc::Xrgb8888 as u32,
VideoFormat::RGBA => fourcc(b'R', b'G', b'B', b'A'), VideoFormat::RGBA => DrmFourcc::Abgr8888 as u32,
VideoFormat::RGBx => fourcc(b'R', b'G', b'B', b'X'), VideoFormat::RGBx => DrmFourcc::Xbgr8888 as u32,
VideoFormat::ARGB => fourcc(b'A', b'R', b'2', b'4'), VideoFormat::ARGB => DrmFourcc::Bgra8888 as u32,
VideoFormat::xRGB => fourcc(b'X', b'R', b'2', b'4'), VideoFormat::xRGB => DrmFourcc::Bgrx8888 as u32,
VideoFormat::ABGR => fourcc(b'A', b'B', b'2', b'4'), VideoFormat::ABGR => DrmFourcc::Rgba8888 as u32,
VideoFormat::xBGR => fourcc(b'X', b'B', b'2', b'4'), VideoFormat::xBGR => DrmFourcc::Rgbx8888 as u32,
// 不支持的格式返回 0调用者应检查此值 _ => 0,
_ => 0, } }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use drm_fourcc::DrmFourcc;
#[test] #[test]
fn spa_to_drm_fourcc_bgra() { fn spa_to_drm_fourcc_all_32bit() {
use libspa::param::video::VideoFormat; use libspa::param::video::VideoFormat;
assert_eq!(spa_to_drm_fourcc(VideoFormat::BGRA), fourcc(b'B', b'G', b'R', b'A')); assert_eq!(
spa_to_drm_fourcc(VideoFormat::BGRA),
DrmFourcc::Argb8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::BGRx),
DrmFourcc::Xrgb8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::RGBA),
DrmFourcc::Abgr8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::RGBx),
DrmFourcc::Xbgr8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::ARGB),
DrmFourcc::Bgra8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::xRGB),
DrmFourcc::Bgrx8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::ABGR),
DrmFourcc::Rgba8888 as u32
);
assert_eq!(
spa_to_drm_fourcc(VideoFormat::xBGR),
DrmFourcc::Rgbx8888 as u32
);
} }
#[test] #[test]
@@ -664,10 +688,4 @@ mod tests {
use libspa::param::video::VideoFormat; use libspa::param::video::VideoFormat;
assert_eq!(spa_to_drm_fourcc(VideoFormat::NV12), 0); assert_eq!(spa_to_drm_fourcc(VideoFormat::NV12), 0);
} }
#[test]
fn fourcc_values() {
assert_eq!(fourcc(b'B', b'G', b'R', b'A'), 0x41524742);
assert_eq!(fourcc(b'R', b'G', b'B', b'A'), 0x41424752);
}
} }

View File

@@ -1,7 +1,8 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
pub struct FpsLimit<T> { pub struct FpsLimit<T> {
on_deck: Option<(T, Instant)>, on_deck: Option<T>,
last_output_time: Option<Instant>,
min_interval: Duration, min_interval: Duration,
} }
@@ -9,30 +10,32 @@ impl<T> FpsLimit<T> {
pub fn new(fps: u32) -> Self { pub fn new(fps: u32) -> Self {
Self { Self {
on_deck: None, on_deck: None,
last_output_time: None,
min_interval: Duration::from_secs_f64(1.0 / fps as f64), min_interval: Duration::from_secs_f64(1.0 / fps as f64),
} }
} }
/// Feed a new frame. Returns: /// Feed a new frame. Returns:
/// - Some(previous_frame) if enough time elapsed since previous frame /// - Some(()) if enough time elapsed since the last output — proceed to encode current frame
/// - None if frame is buffered (first frame) or previous is dropped (too close) /// - None if too close to the last output — drop current frame
pub fn on_new_frame(&mut self, frame: T, timestamp: Instant) -> Option<T> { pub fn on_new_frame(&mut self, frame: T, timestamp: Instant) -> Option<T> {
let old = self.on_deck.replace((frame, timestamp)); let ready = match self.last_output_time {
match old { None => true,
None => None, // First frame — buffer it Some(last) => timestamp.duration_since(last) >= self.min_interval,
Some((old_frame, old_ts)) => { };
if timestamp.duration_since(old_ts) >= self.min_interval {
Some(old_frame) // Enough time — output previous if ready {
self.last_output_time = Some(timestamp);
self.on_deck = Some(frame);
self.on_deck.take()
} else { } else {
None // Too close — discard previous, keep new let _ = self.on_deck.replace(frame);
} None
}
} }
} }
/// Flush the last buffered frame at end of stream
pub fn flush(&mut self) -> Option<T> { pub fn flush(&mut self) -> Option<T> {
self.on_deck.take().map(|(frame, _ts)| frame) self.on_deck.take()
} }
} }
@@ -41,15 +44,15 @@ mod tests {
use super::*; use super::*;
#[test] #[test]
fn first_frame_is_buffered() { fn first_frame_passes_immediately() {
let mut limiter: FpsLimit<u32> = FpsLimit::new(30); let mut limiter: FpsLimit<u32> = FpsLimit::new(30);
let now = Instant::now(); let now = Instant::now();
let result = limiter.on_new_frame(1u32, now); let result = limiter.on_new_frame(1u32, now);
assert!(result.is_none()); assert_eq!(result, Some(1));
} }
#[test] #[test]
fn frames_too_close_drops_old() { fn frames_too_close_are_dropped() {
let mut limiter: FpsLimit<u32> = FpsLimit::new(30); let mut limiter: FpsLimit<u32> = FpsLimit::new(30);
let now = Instant::now(); let now = Instant::now();
limiter.on_new_frame(1, now); limiter.on_new_frame(1, now);
@@ -58,12 +61,29 @@ mod tests {
} }
#[test] #[test]
fn frames_far_enough_output_old() { fn frames_far_enough_pass() {
let mut limiter: FpsLimit<u32> = FpsLimit::new(30); let mut limiter: FpsLimit<u32> = FpsLimit::new(30);
let now = Instant::now(); let now = Instant::now();
limiter.on_new_frame(1, now); limiter.on_new_frame(1, now);
let result = limiter.on_new_frame(2, now + Duration::from_millis(40)); let result = limiter.on_new_frame(2, now + Duration::from_millis(34));
assert_eq!(result, Some(1)); assert_eq!(result, Some(2));
}
#[test]
fn high_fps_input_downsampled_correctly() {
let mut limiter: FpsLimit<u32> = FpsLimit::new(30);
let base = Instant::now();
let mut outputs = Vec::new();
for i in 0..10u32 {
let t = base + Duration::from_millis(i as u64 * 16);
if let Some(f) = limiter.on_new_frame(i, t) {
outputs.push(f);
}
}
assert!(outputs.len() >= 3, "expected at least 3 outputs, got {} ({:?})", outputs.len(), outputs);
assert_eq!(outputs[0], 0);
} }
#[test] #[test]
@@ -71,7 +91,8 @@ mod tests {
let mut limiter: FpsLimit<u32> = FpsLimit::new(30); let mut limiter: FpsLimit<u32> = FpsLimit::new(30);
let now = Instant::now(); let now = Instant::now();
limiter.on_new_frame(1, now); limiter.on_new_frame(1, now);
assert_eq!(limiter.flush(), Some(1)); limiter.on_new_frame(2, now + Duration::from_millis(1));
assert_eq!(limiter.flush(), Some(2));
assert_eq!(limiter.flush(), None); assert_eq!(limiter.flush(), None);
} }
} }

10
src/lib.rs Normal file
View File

@@ -0,0 +1,10 @@
pub mod args;
pub mod avhw;
pub mod backend_detect;
pub mod cap_portal;
pub mod cap_wlr_screencopy;
pub mod fps_limit;
pub mod state;
pub mod state_portal;
pub mod transform;
pub mod webrtc;

View File

@@ -18,6 +18,7 @@ mod fps_limit; // 帧率限制器
mod state; // wlr-screencopy 后端的主状态机 mod state; // wlr-screencopy 后端的主状态机
mod state_portal; // Portal/PipeWire 后端的主状态机 mod state_portal; // Portal/PipeWire 后端的主状态机
mod transform; // 图像变换(旋转/翻转) mod transform; // 图像变换(旋转/翻转)
mod webrtc; // WebRTC 传输str0m Sans-IO
use crate::args::Args; use crate::args::Args;
use crate::cap_wlr_screencopy::CapWlrScreencopy; use crate::cap_wlr_screencopy::CapWlrScreencopy;
@@ -49,6 +50,7 @@ fn main() -> Result<()> {
} else { } else {
tracing::Level::INFO tracing::Level::INFO
}) })
.with_writer(std::io::stderr)
.init(); .init();
tracing::info!("wl-webrtc starting"); tracing::info!("wl-webrtc starting");
@@ -59,18 +61,18 @@ fn main() -> Result<()> {
anyhow::bail!("HEVC not supported in MVP. Use --codec h264"); anyhow::bail!("HEVC not supported in MVP. Use --codec h264");
} }
if args.output.is_none() && args.port == 0 {
anyhow::bail!("Either --output or --port is required");
}
// 自动检测当前桌面环境可用的截屏后端 // 自动检测当前桌面环境可用的截屏后端
// 会尝试列举 Wayland 全局对象,判断合成器是否支持 wlr-screencopy 协议 // 会尝试列举 Wayland 全局对象,判断合成器是否支持 wlr-screencopy 协议
let backend = crate::backend_detect::detect_backend(&args)?; let backend = crate::backend_detect::detect_backend(&args)?;
// 根据检测结果进入对应的事件循环 // 根据检测结果进入对应的事件循环
match backend { match backend {
crate::backend_detect::CaptureBackend::WlrScreencopy => { crate::backend_detect::CaptureBackend::WlrScreencopy => run_wlr_screencopy(args),
run_wlr_screencopy(args) crate::backend_detect::CaptureBackend::PortalPipeWire => run_portal_pipewire(args),
}
crate::backend_detect::CaptureBackend::PortalPipeWire => {
run_portal_pipewire(args)
}
} }
} }
@@ -305,21 +307,17 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
// 只注册信号 fd没有 Wayland fd // 只注册信号 fd没有 Wayland fd
// 所以 poll.poll 在这里只负责检测 SIGINT/SIGTERM // 所以 poll.poll 在这里只负责检测 SIGINT/SIGTERM
// 实际的帧采集完全依赖 poll_and_encode 的轮询 // 实际的帧采集完全依赖 poll_and_encode 的轮询
poll.registry().register( poll.registry()
&mut signals, .register(&mut signals, mio::Token(1), mio::Interest::READABLE)?;
mio::Token(1),
mio::Interest::READABLE,
)?;
// 主事件循环(超时 10ms比 wlr-screencopy 更短,因为不依赖 Wayland fd 唤醒 // 主事件循环(非阻塞信号检测 + recv_timeout 等待帧
// 10ms 超时的作用是让循环高频转动,以便及时处理 PipeWire 投递的帧 // poll 超时为 0ms非阻塞实际等待由 poll_and_encode 的 recv_timeout 实现
// 如果没有信号poll 最多阻塞 10ms 就会超时返回
let mut running = true; let mut running = true;
while running { while running {
// poll 在此循环中只监听信号 fd,所以 // poll 在此循环中只监听信号 fd(非阻塞)
// - 收到 SIGINT/SIGTERM → 事件触发,设置 running=false // - 收到 SIGINT/SIGTERM → 事件触发,设置 running=false
// - 超时 10ms → 事件为空,继续执行 poll_and_encode // - 无事件 → 立即返回,继续执行 poll_and_encode(内部 recv_timeout 等待帧)
poll.poll(&mut events, Some(std::time::Duration::from_millis(10))) poll.poll(&mut events, Some(std::time::Duration::from_millis(0)))
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
if e.kind() == std::io::ErrorKind::Interrupted { if e.kind() == std::io::ErrorKind::Interrupted {
return; return;
@@ -328,7 +326,6 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
running = false; running = false;
}); });
// 遍历事件,检查是否收到退出信号
for event in &events { for event in &events {
if event.token() == mio::Token(1) { if event.token() == mio::Token(1) {
tracing::info!("Received quit signal"); tracing::info!("Received quit signal");
@@ -341,7 +338,9 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
// poll_and_encode 会从 PipeWire 缓冲区取出帧, // poll_and_encode 会从 PipeWire 缓冲区取出帧,
// 编码为 H.264 并推送。返回 true 表示还有更多帧待处理, // 编码为 H.264 并推送。返回 true 表示还有更多帧待处理,
// 返回 false 表示当前没有帧了while 循环退出等待下一轮 poll // 返回 false 表示当前没有帧了while 循环退出等待下一轮 poll
while state.poll_and_encode()? {} if state.poll_and_encode(true)? {
while state.poll_and_encode(false)? {}
}
// Portal 状态机遇到致命错误时退出 // Portal 状态机遇到致命错误时退出
if state.is_errored() { if state.is_errored() {

View File

@@ -188,23 +188,29 @@ pub struct State<S: CaptureSource> {
// Helpers // Helpers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Scan /dev/dri for all available DRM render nodes (renderD*), sorted by node number.
pub(crate) fn find_drm_render_nodes() -> Vec<PathBuf> {
let Ok(entries) = std::fs::read_dir("/dev/dri") else {
return Vec::new();
};
let mut nodes: Vec<(u32, PathBuf)> = entries
.filter_map(Result::ok)
.filter_map(|entry| {
let path = entry.path();
let name = path.file_name()?.to_str()?;
let number = name.strip_prefix("renderD")?.parse::<u32>().ok()?;
std::fs::metadata(&path).ok()?;
Some((number, path))
})
.collect();
nodes.sort_by_key(|(number, _)| *number);
nodes.into_iter().map(|(_, path)| path).collect()
}
/// Scan /dev/dri for the first available DRM render node (renderD*). /// Scan /dev/dri for the first available DRM render node (renderD*).
fn find_drm_render_node() -> Option<PathBuf> { fn find_drm_render_node() -> Option<PathBuf> {
std::fs::read_dir("/dev/dri") find_drm_render_nodes().into_iter().next()
.ok()?
.filter_map(|e| e.ok())
.filter(|e| {
e.file_name()
.to_str()
.map(|s| s.starts_with("renderD"))
.unwrap_or(false)
})
.filter_map(|e| {
let path = e.path();
std::fs::metadata(&path).ok()?;
Some(path)
})
.min_by_key(|e| e.to_path_buf())
} }
impl<S: CaptureSource> State<S> { impl<S: CaptureSource> State<S> {
@@ -562,11 +568,7 @@ impl<S: CaptureSource> State<S> {
tracing::error!("compositor copy failed"); tracing::error!("compositor copy failed");
let taken = mem::replace(&mut self.in_flight_surface, InFlightSurface::None); let taken = mem::replace(&mut self.in_flight_surface, InFlightSurface::None);
match taken { match taken {
InFlightSurface::CopyQueued { InFlightSurface::CopyQueued { buffer, frame, .. } => {
buffer,
frame,
..
} => {
drop(buffer); drop(buffer);
if let EncConstructionStage::Streaming { cap, .. } = &mut self.stage { if let EncConstructionStage::Streaming { cap, .. } = &mut self.stage {
cap.on_done_with_frame(frame); cap.on_done_with_frame(frame);
@@ -584,32 +586,41 @@ impl<S: CaptureSource> State<S> {
EncConstructionStage::EverythingButFmt { EncConstructionStage::EverythingButFmt {
output_info, output_info,
output, output,
hw_device_ctx: _hw_device_ctx, hw_device_ctx,
cap, cap,
screencopy_manager, screencopy_manager,
dmabuf, dmabuf,
} => (output_info, output, cap, screencopy_manager, dmabuf), } => (
output_info,
output,
hw_device_ctx,
cap,
screencopy_manager,
dmabuf,
),
other => { other => {
tracing::warn!("negotiate_format: not in EverythingButFmt stage"); tracing::warn!("negotiate_format: not in EverythingButFmt stage");
self.stage = other; self.stage = other;
return; return;
} }
}; };
let (output_info, output, cap, screencopy_manager, dmabuf) = stage_data; let (output_info, output, hw_device_ctx, cap, screencopy_manager, dmabuf) = stage_data;
let drm_path = self.resolve_drm_path(); let drm_path = self.resolve_drm_path();
let fps = self.args.fps; let fps = self.args.fps;
let bitrate = self.args.bitrate.unwrap_or_else(|| { let bitrate = self
2 * (width as u64) * (height as u64) * (fps as u64) / 100 .args
}); .bitrate
.unwrap_or_else(|| 2 * (width as u64) * (height as u64) * (fps as u64) / 100);
let enc = match crate::avhw::create_encoder( let enc = match crate::avhw::create_encoder(
&drm_path, &drm_path,
Path::new(&self.args.output), Path::new(self.args.output.as_deref().expect("output required for MP4 mode")),
width, width,
height, height,
fps, fps,
output_info.transform, output_info.transform,
self.args.bitrate, self.args.bitrate,
self.args.gop_size, self.args.gop_size,
Some(hw_device_ctx),
) { ) {
Ok(enc) => enc, Ok(enc) => enc,
Err(e) => { Err(e) => {
@@ -1192,11 +1203,7 @@ impl<S: CaptureSource> Dispatch<ZwpLinuxBufferParamsV1, ()> for State<S> {
tracing::error!("DMA-BUF buffer creation failed"); tracing::error!("DMA-BUF buffer creation failed");
let taken = mem::replace(&mut state.in_flight_surface, InFlightSurface::None); let taken = mem::replace(&mut state.in_flight_surface, InFlightSurface::None);
match taken { match taken {
InFlightSurface::CopyQueued { InFlightSurface::CopyQueued { buffer, frame, .. } => {
buffer,
frame,
..
} => {
drop(buffer); drop(buffer);
if let EncConstructionStage::Streaming { cap, .. } = &mut state.stage { if let EncConstructionStage::Streaming { cap, .. } = &mut state.stage {
cap.on_done_with_frame(frame); cap.on_done_with_frame(frame);
@@ -1228,11 +1235,11 @@ impl Dispatch<ZwlrScreencopyFrameV1, ()> for State<CapWlrScreencopy> {
_qhandle: &QueueHandle<State<CapWlrScreencopy>>, _qhandle: &QueueHandle<State<CapWlrScreencopy>>,
) { ) {
match event { match event {
// SHM buffer offer — in v3 the compositor enumerates supported buffer
// types (buffer and/or linux_dmabuf) before buffer_done. We only
// support DMA-BUF, so just log and wait for linux_dmabuf / buffer_done.
ScreencopyFrameEvent::Buffer { .. } => { ScreencopyFrameEvent::Buffer { .. } => {
tracing::warn!( tracing::debug!("Received SHM Buffer offer — only DMA-BUF capture is supported");
"Received SHM Buffer event — only DMA-BUF capture is supported. Ignoring."
);
return;
} }
ScreencopyFrameEvent::LinuxDmabuf { ScreencopyFrameEvent::LinuxDmabuf {
format, format,
@@ -1240,6 +1247,12 @@ impl Dispatch<ZwlrScreencopyFrameV1, ()> for State<CapWlrScreencopy> {
height, height,
} => { } => {
tracing::debug!("Screencopy LinuxDmabuf: format={format}, {width}x{height}"); tracing::debug!("Screencopy LinuxDmabuf: format={format}, {width}x{height}");
if !matches!(state.in_flight_surface, InFlightSurface::AllocQueued) {
tracing::warn!("Received LinuxDmabuf while no frame allocation was queued");
return;
}
if matches!(state.stage, EncConstructionStage::EverythingButFmt { .. }) { if matches!(state.stage, EncConstructionStage::EverythingButFmt { .. }) {
state.negotiate_format(format, width, height); state.negotiate_format(format, width, height);
if state.errored { if state.errored {
@@ -1251,6 +1264,20 @@ impl Dispatch<ZwlrScreencopyFrameV1, ()> for State<CapWlrScreencopy> {
} }
state.on_frame_allocd((), format, width, height); state.on_frame_allocd((), format, width, height);
} }
// v3 terminal event: all buffer offers have been enumerated.
// If still AllocQueued, the compositor never sent linux_dmabuf —
// DMA-BUF screencopy is unsupported, so we must error out.
ScreencopyFrameEvent::BufferDone => {
if matches!(state.in_flight_surface, InFlightSurface::AllocQueued) {
tracing::error!(
"Compositor did not offer DMA-BUF screencopy (only SHM); \
DMA-BUF capture is required"
);
state.in_flight_surface = InFlightSurface::None;
proxy.destroy();
state.errored = true;
}
}
ScreencopyFrameEvent::Ready { ScreencopyFrameEvent::Ready {
tv_sec_hi, tv_sec_hi,
tv_sec_lo, tv_sec_lo,

View File

@@ -1,16 +1,14 @@
// 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码 // 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码
use std::mem; use std::os::fd::AsRawFd;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::{Duration, Instant};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use ffmpeg_next as ff;
use ffmpeg_next::ffi;
use crate::args::Args; use crate::args::Args;
use crate::avhw::{self, EncState}; use crate::avhw::{self, SwEncState};
use crate::cap_portal::{CapPortal, PwDmaBufFrame, PwEvent}; use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame};
use crate::fps_limit::FpsLimit; use crate::webrtc::WebRtcState;
use crate::transform::Transform;
/// 门户采集的阶段状态 /// 门户采集的阶段状态
/// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数 /// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数
@@ -25,24 +23,20 @@ enum PortalStage {
/// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。
/// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。 /// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。
pub struct StatePortal { pub struct StatePortal {
/// 当前采集阶段
stage: PortalStage, stage: PortalStage,
/// 硬件编码器状态(第一帧到达后才初始化) enc: Option<SwEncState>,
enc: Option<EncState>,
/// 帧率限制器
fps_limit: FpsLimit<()>,
/// PipeWire 屏幕采集端点
cap: CapPortal, cap: CapPortal,
/// 命令行参数
args: Args, args: Args,
/// 是否遇到错误
errored: bool, errored: bool,
/// 是否为第一帧(首帧跳过帧率限制) drm_device: Option<PathBuf>,
first_frame: bool, frames_encoded: u64,
/// DRM 渲染设备路径(如 /dev/dri/renderD128 start_time: Option<Instant>,
drm_device: PathBuf, last_stats_time: Option<Instant>,
/// 第一帧的时间戳(纳秒),用于计算相对 PTS last_stats_frames: u64,
first_pts_ns: Option<i64>, webrtc: Option<WebRtcState>,
webrtc_tx: Option<crossbeam_channel::Sender<Vec<u8>>>,
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
webrtc_frames_sent: u64,
} }
impl StatePortal { impl StatePortal {
@@ -51,38 +45,78 @@ impl StatePortal {
/// 初始化 DRM 设备路径和 PipeWire 采集端点,编码器延迟到第一帧到达时创建。 /// 初始化 DRM 设备路径和 PipeWire 采集端点,编码器延迟到第一帧到达时创建。
pub fn new(args: Args) -> Result<Self> { pub fn new(args: Args) -> Result<Self> {
let drm_device = resolve_drm_device(&args)?; let drm_device = resolve_drm_device(&args)?;
if let Some(ref drm_device) = drm_device {
tracing::info!("Using DRM device: {}", drm_device.display()); tracing::info!("Using DRM device: {}", drm_device.display());
} else {
tracing::info!("DRM device auto-detection enabled");
}
let cap = CapPortal::new(&args)?; let cap = CapPortal::new(&args)?;
let (webrtc, webrtc_tx, webrtc_rx) = if args.port > 0 {
let (tx, rx) = crossbeam_channel::bounded(32);
let wrtc = WebRtcState::new(args.port, args.fps)?;
(Some(wrtc), Some(tx), Some(rx))
} else {
(None, None, None)
};
Ok(Self { Ok(Self {
stage: PortalStage::WaitingForFormat, stage: PortalStage::WaitingForFormat,
enc: None, enc: None,
fps_limit: FpsLimit::new(args.fps),
cap, cap,
args, args,
errored: false, errored: false,
first_frame: true,
drm_device, drm_device,
first_pts_ns: None, frames_encoded: 0,
start_time: None,
last_stats_time: None,
last_stats_frames: 0,
webrtc,
webrtc_tx,
webrtc_rx,
webrtc_frames_sent: 0,
}) })
} }
/// 轮询 PipeWire 事件并编码帧 /// 轮询 PipeWire 事件并编码帧
/// ///
/// 尝试从采集端点接收一帧事件。返回 `Ok(true)` 表示已处理事件 /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms
/// `Ok(false)` 表示暂无数据。内部根据当前阶段(等待格式/流式)分发处理 /// `block=false` 时使用 try_recv 非阻塞检查
pub fn poll_and_encode(&mut self) -> Result<bool> { /// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。
let event = match self.cap.frame_receiver().try_recv() { pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> {
Ok(event) => event, // WebRTC: process signaling, network, and forward encoded frames
self.poll_webrtc()?;
if let Ok(ctrl) = self.cap.event_receiver().try_recv() {
match ctrl {
PwCtrlEvent::StreamEnded => {
tracing::warn!("PipeWire stream ended");
self.errored = true;
return Ok(true);
}
PwCtrlEvent::Error(e) => {
tracing::error!("PipeWire error: {e}");
self.errored = true;
return Ok(true);
}
}
}
let frame = if block {
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) {
Ok(frame) => frame,
Err(_) => return Ok(false), Err(_) => return Ok(false),
}
} else {
match self.cap.frame_receiver().try_recv() {
Ok(frame) => frame,
Err(_) => return Ok(false),
}
}; };
match event {
PwEvent::Frame(frame) => {
match self.stage { match self.stage {
PortalStage::WaitingForFormat => { PortalStage::WaitingForFormat => {
// 第一帧到达:记录格式信息并用该分辨率创建编码器
tracing::info!( tracing::info!(
"First DMA-BUF frame: {}x{} format=0x{:08X} stride={} modifier=0x{:X}", "First DMA-BUF frame: {}x{} format=0x{:08X} stride={} modifier=0x{:X}",
frame.width, frame.width,
@@ -92,192 +126,182 @@ impl StatePortal {
frame.modifier frame.modifier
); );
let enc = avhw::create_encoder( let drm_path = self.resolve_drm_device_for_frame(&frame)?;
&self.drm_device, let (enc_width, enc_height) = portal_encode_dimensions(frame.width, frame.height);
self.args.output.as_ref(), tracing::info!(
"Portal software encode target: {}x{} -> {}x{} @ {} fps",
frame.width, frame.width,
frame.height, frame.height,
enc_width,
enc_height,
self.args.fps, self.args.fps,
Transform::Normal, );
self.args.bitrate, let actual_bitrate = self.args.bitrate.unwrap_or_else(|| {
self.args.gop_size, 2 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100
)?; });
let actual_gop_size = self.args.gop_size.unwrap_or_else(|| {
if self.webrtc_tx.is_some() {
(self.args.fps / 2).max(10)
} else {
self.args.fps
}
});
let enc = if let Some(ref tx) = self.webrtc_tx {
avhw::SwEncState::new_webrtc(
&drm_path,
frame.width,
frame.height,
enc_width,
enc_height,
self.args.fps,
actual_bitrate,
actual_gop_size,
tx.clone(),
)?
} else {
avhw::SwEncState::new(
&drm_path,
std::path::Path::new(self.args.output.as_deref().expect("output required for MP4 mode")),
frame.width,
frame.height,
enc_width,
enc_height,
self.args.fps,
actual_bitrate,
actual_gop_size,
)?
};
self.enc = Some(enc); self.enc = Some(enc);
self.stage = PortalStage::Streaming; self.stage = PortalStage::Streaming;
self.start_time = Some(Instant::now());
self.last_stats_time = Some(Instant::now());
tracing::info!("First frame processed, encoder initialized, transitioning to Streaming");
drop(frame); drop(frame);
} }
PortalStage::Streaming => { PortalStage::Streaming => {
// 流式阶段:处理每一帧 DMA-BUF 数据
self.handle_pw_frame(frame)?; self.handle_pw_frame(frame)?;
} }
} }
}
PwEvent::StreamEnded => { // WebRTC: drain encoded frames produced by this poll before returning.
// PipeWire 流结束(如用户停止了屏幕共享) self.poll_webrtc()?;
tracing::warn!("PipeWire stream ended");
self.errored = true;
}
PwEvent::Error(e) => {
// PipeWire 返回错误
tracing::error!("PipeWire error: {e}");
self.errored = true;
}
}
Ok(true) Ok(true)
} }
/// 处理单帧 DMA-BUF 数据 fn resolve_drm_device_for_frame(&mut self, frame: &PwDmaBufFrame) -> Result<PathBuf> {
/// if let Some(ref drm) = self.drm_device {
/// 完整的帧处理流水线: return Ok(drm.clone());
/// 1. 帧率限制(首帧跳过) }
/// 2. 构建 DRM 描述符
/// 3. 分配 DRM_PRIME 源帧 let candidates = crate::state::find_drm_render_nodes();
/// 4. 分配 VAAPI 硬件目标帧 if candidates.is_empty() {
/// 5. 通过 DMA-BUF 导入将帧数据导入 VAAPI bail!("No DRM render device found. Specify --drm-device.");
/// 6. 计算 PTS 时间戳 }
/// 7. 回收 DRM 描述符内存
/// 8. 编码输出 let mut failures = Vec::new();
fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> { for candidate in &candidates {
// 1. FPS limiting (first frame bypasses) match crate::avhw::test_dma_buf_import(candidate, frame) {
// 帧率限制(首帧跳过限制,确保立即编码) Ok(()) => {
if self.first_frame { tracing::info!(
self.first_frame = false; "Auto-detected DRM device: {} (tested {} candidates)",
} else { candidate.display(),
let now = std::time::Instant::now(); candidates.len(),
if self.fps_limit.on_new_frame((), now).is_none() { );
return Ok(()); self.drm_device = Some(candidate.clone());
return Ok(candidate.clone());
}
Err(e) => {
tracing::debug!(
"DRM device {} cannot import DMA-BUF: {e}",
candidate.display(),
);
failures.push((candidate, e));
}
} }
} }
// 2. Build DRM descriptor for DMA-BUF import
// 根据 DMA-BUF 帧信息构建 FFmpeg DRM 描述符
let desc = build_drm_descriptor(&frame);
let desc_box = Box::new(desc);
// 3. Allocate raw DRM_PRIME source frame using Video wrapper
// 分配 DRM_PRIME 格式的源帧,将描述符指针挂载到 data[0]
let mut raw_frame = ff::frame::Video::empty();
unsafe {
let raw_ptr = raw_frame.as_mut_ptr();
(*raw_ptr).data[0] = Box::into_raw(desc_box) as *mut u8;
(*raw_ptr).format = ffi::AVPixelFormat::AV_PIX_FMT_DRM_PRIME as i32;
(*raw_ptr).width = frame.width as i32;
(*raw_ptr).height = frame.height as i32;
}
// 4. Get encoder reference
// 获取编码器引用
let enc = match self.enc.as_mut() {
Some(e) => e,
None => {
// Recover the Box to prevent memory leak of the descriptor
// 编码器未初始化时回收描述符以防止内存泄漏
unsafe {
let desc_ptr = (*raw_frame.as_ptr()).data[0] as *mut ffi::AVDRMFrameDescriptor;
if !desc_ptr.is_null() {
let _ = Box::from_raw(desc_ptr);
}
}
bail!("encoder not initialized");
}
};
// 5. Allocate VAAPI hardware target frame
// 分配 VAAPI 硬件帧缓冲区
let mut hw_frame = ff::frame::Video::empty();
let ret = unsafe {
ffi::av_hwframe_get_buffer(enc.frames_rgb().as_ptr(), hw_frame.as_mut_ptr(), 0)
};
if ret < 0 {
// Recover the Box to prevent memory leak of the descriptor
// 分配失败时回收描述符防止内存泄漏
unsafe {
let desc_ptr = (*raw_frame.as_ptr()).data[0] as *mut ffi::AVDRMFrameDescriptor;
if !desc_ptr.is_null() {
let _ = Box::from_raw(desc_ptr);
}
}
bail!("av_hwframe_get_buffer failed: error {ret}");
}
// 6. Import DMA-BUF into VAAPI via transfer_data
// 通过 DMA-BUF 导入将帧数据从 DRM 传输到 VAAPI 硬件表面
let ret = unsafe {
ffi::av_hwframe_transfer_data(hw_frame.as_mut_ptr(), raw_frame.as_ptr(), 0)
};
if ret < 0 {
// 传输失败时回收描述符防止内存泄漏
unsafe {
let desc_ptr = (*raw_frame.as_ptr()).data[0] as *mut ffi::AVDRMFrameDescriptor;
if !desc_ptr.is_null() {
let _ = Box::from_raw(desc_ptr);
}
}
if ret == -(ffi::EINVAL as i32) {
bail!( bail!(
"VAAPI does not support DMA-BUF modifier 0x{:X}", "No DRM render device can import the DMA-BUF frame. Tried: {}",
frame.modifier failures
.into_iter()
.map(|(p, e)| format!("{} ({e})", p.display()))
.collect::<Vec<_>>()
.join(", ")
); );
} }
bail!("av_hwframe_transfer_data failed: error {ret}");
}
// 7. Set PTS — convert PipeWire nanoseconds to encoder frame-number units /// 处理单帧 DMA-BUF 数据
// PipeWire PTS is CLOCK_MONOTONIC in nanoseconds. ///
// Encoder time_base = 1/fps, so PTS must be in frame numbers. /// 通过 `av_hwframe_map` 零拷贝导入 VAAPI然后交给 SwEncState 完成:
// Use elapsed time since first frame to avoid i64 overflow on absolute timestamps. /// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。
// fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> {
// PTS 计算:将 PipeWire 的纳秒时间戳转换为编码器的帧号单位 let enc = match self.enc.as_mut() {
// PipeWire 使用 CLOCK_MONOTONIC 纳秒时间戳,编码器 time_base = 1/fps Some(enc) => enc,
// 使用相对时间避免绝对时间戳导致的 i64 溢出 None => bail!("encoder not initialized"),
let fps_i64 = self.args.fps as i64; };
let base_ns = *self.first_pts_ns.get_or_insert(frame.pts.max(0));
let elapsed_ns = (frame.pts.max(0) - base_ns).max(0); let mut vaapi_frame = unsafe {
let pts = elapsed_ns * fps_i64 / 1_000_000_000; avhw::import_dma_buf_to_vaapi(
enc.frames_rgb().as_ptr(),
frame.fd.as_raw_fd(),
frame.width,
frame.height,
frame.format,
frame.modifier,
frame.stride,
frame.offset,
)
}?;
let pts = self.frames_encoded as i64;
unsafe { unsafe {
(*hw_frame.as_mut_ptr()).pts = pts; (*vaapi_frame.as_mut_ptr()).pts = pts;
} }
// 8. Recover the Boxed descriptor from raw_frame *before* encoding. enc.encode_frame(&vaapi_frame)?;
// av_hwframe_transfer_data has already imported the DMA-BUF into the self.frames_encoded += 1;
// VAAPI surface, so FFmpeg no longer references the descriptor struct.
// Doing this before encode_frame ensures the descriptor is reclaimed if let Some(last) = self.last_stats_time {
// even if encode_frame returns early via `?`. if last.elapsed() >= Duration::from_secs(10) {
// let delta_frames = self.frames_encoded - self.last_stats_frames;
// 在编码前回收描述符内存。 let delta_secs = last.elapsed().as_secs_f64();
// 此时 DMA-BUF 数据已导入 VAAPI 表面FFmpeg 不再引用描述符结构体。 let fps = delta_frames as f64 / delta_secs;
// 在 encode_frame 之前回收确保即使编码返回错误也能正确释放内存。 tracing::info!(
unsafe { "encoded={}, fps={fps:.1}",
let desc_ptr = (*raw_frame.as_ptr()).data[0] as *mut ffi::AVDRMFrameDescriptor; self.frames_encoded,
if !desc_ptr.is_null() { );
let _ = Box::from_raw(desc_ptr); self.last_stats_time = Some(Instant::now());
self.last_stats_frames = self.frames_encoded;
} }
} }
// 9. Encode — safe to early-return via `?` now that descriptor is recovered.
// 编码帧数据(此时描述符已回收,可安全通过 `?` 提前返回)
enc.encode_frame(&hw_frame)?;
// raw_frame and hw_frame drop here via Video::drop → av_frame_free
// raw_frame 和 hw_frame 在此处通过 Video::drop → av_frame_free 释放
Ok(())
}
/// 刷新编码器缓冲区,输出所有剩余帧
pub fn flush(&mut self) -> Result<()> {
if let Some(enc) = &mut self.enc {
enc.flush()?;
}
Ok(()) Ok(())
} }
/// 关闭状态:刷新编码器并清理资源 /// 关闭状态:刷新编码器并清理资源
///
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
pub fn shutdown(&mut self) { pub fn shutdown(&mut self) {
if let Err(e) = self.flush() { if let Some(mut enc) = self.enc.take() {
if let Err(e) = enc.flush() {
tracing::error!("Flush error during shutdown: {e}"); tracing::error!("Flush error during shutdown: {e}");
} }
}
if let Some(start) = self.start_time {
if self.frames_encoded > 0 {
let elapsed = start.elapsed().as_secs_f64();
let fps = self.frames_encoded as f64 / elapsed;
tracing::info!(
"Total: {} frames in {:.1}s, avg {:.1}fps",
self.frames_encoded,
elapsed,
fps,
);
}
}
tracing::info!("StatePortal shutdown complete"); tracing::info!("StatePortal shutdown complete");
} }
@@ -285,53 +309,80 @@ impl StatePortal {
pub fn is_errored(&self) -> bool { pub fn is_errored(&self) -> bool {
self.errored self.errored
} }
fn poll_webrtc(&mut self) -> Result<()> {
let Some(ref mut wrtc) = self.webrtc else { return Ok(()); };
wrtc.handle_signaling()?;
wrtc.poll_and_feed()?;
if let Some(ref rx) = self.webrtc_rx {
let mut count = 0u32;
while let Ok(data) = rx.try_recv() {
count += 1;
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
}
if count > 0 {
tracing::info!("WebRTC forwarded {count} frames from channel");
}
}
Ok(())
}
} }
/// 根据 DMA-BUF 帧信息构建 FFmpeg DRM 帧描述符 impl Drop for StatePortal {
/// fn drop(&mut self) {
/// 将 PipeWire 提供的 DMA-BUF 参数fd、偏移量、步长、修饰符等 self.shutdown();
/// 转换为 FFmpeg 的 AVDRMFrameDescriptor 结构体,用于零拷贝硬件导入。 }
fn build_drm_descriptor(frame: &PwDmaBufFrame) -> ffi::AVDRMFrameDescriptor { }
let mut desc: ffi::AVDRMFrameDescriptor = unsafe { mem::zeroed() };
// DMA-BUF 对象层:一个 fd 对应一个内存对象 fn portal_encode_dimensions(width: u32, height: u32) -> (u32, u32) {
const TARGET_W: u32 = 2560;
const TARGET_H: u32 = 1440;
if width <= TARGET_W && height <= TARGET_H {
return (width & !1, height & !1);
}
let width_limited_h = ((height as u64) * (TARGET_W as u64) / (width as u64)) as u32;
if width_limited_h <= TARGET_H {
(TARGET_W & !1, width_limited_h & !1)
} else {
let height_limited_w = ((width as u64) * (TARGET_H as u64) / (height as u64)) as u32;
(height_limited_w & !1, TARGET_H & !1)
}
}
/// 解析 DRM 渲染设备路径
///
/// 仅使用命令行指定的设备路径;未指定则在首帧到达时自动检测。
fn resolve_drm_device(args: &Args) -> Result<Option<PathBuf>> {
if let Some(ref drm) = args.drm_device {
return Ok(Some(PathBuf::from(drm)));
}
Ok(None)
}
#[cfg(test)]
fn build_drm_descriptor(frame: &PwDmaBufFrame) -> ffmpeg_next::ffi::AVDRMFrameDescriptor {
let mut desc: ffmpeg_next::ffi::AVDRMFrameDescriptor = unsafe { std::mem::zeroed() };
desc.nb_objects = 1; desc.nb_objects = 1;
desc.objects[0].fd = frame.fd.as_raw_fd(); desc.objects[0].fd = frame.fd.as_raw_fd();
desc.objects[0].size = 0; // 大小为 0 表示整个 fd desc.objects[0].size = 0;
desc.objects[0].format_modifier = frame.modifier; desc.objects[0].format_modifier = frame.modifier;
// 像素格式层:单层单平面布局(如 XR24 格式)
desc.nb_layers = 1; desc.nb_layers = 1;
desc.layers[0].format = frame.format; desc.layers[0].format = frame.format;
desc.layers[0].nb_planes = 1; desc.layers[0].nb_planes = 1;
desc.layers[0].planes[0].object_index = 0; desc.layers[0].planes[0].object_index = 0;
desc.layers[0].planes[0].offset = frame.offset as isize; desc.layers[0].planes[0].offset = frame.offset as isize;
desc.layers[0].planes[0].pitch = frame.stride as isize; desc.layers[0].planes[0].pitch = frame.stride as isize;
desc desc
} }
use std::os::fd::AsRawFd;
/// 解析 DRM 渲染设备路径
///
/// 优先使用命令行指定的设备路径,否则依次尝试
/// `/dev/dri/renderD128` 和 `/dev/dri/renderD129`。
fn resolve_drm_device(args: &Args) -> Result<PathBuf> {
if let Some(ref drm) = args.drm_device {
return Ok(PathBuf::from(drm));
}
for render in &["/dev/dri/renderD128", "/dev/dri/renderD129"] {
let path = PathBuf::from(render);
if path.exists() {
return Ok(path);
}
}
bail!("No DRM render device found. Specify --drm-device.")
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -385,8 +436,50 @@ mod tests {
backend: None, backend: None,
port: 0, port: 0,
}; };
let result = resolve_drm_device(&args); let result = resolve_drm_device(&args).unwrap();
assert!(result.is_ok()); assert_eq!(
assert_eq!(result.unwrap(), std::path::PathBuf::from("/dev/dri/renderD128")); result,
Some(std::path::PathBuf::from("/dev/dri/renderD128"))
);
} }
#[test]
fn resolve_drm_device_none_when_not_specified() {
let args = Args {
output: "test.mp4".to_string(),
output_name: None,
fps: 30,
codec: "h264".to_string(),
hw_accel: "vaapi".to_string(),
drm_device: None,
bitrate: None,
gop_size: None,
verbose: false,
backend: None,
port: 0,
};
let result = resolve_drm_device(&args).unwrap();
assert_eq!(result, None);
}
#[test]
fn build_drm_descriptor_custom_offset_and_stride() {
let frame = PwDmaBufFrame {
fd: unsafe { OwnedFd::from_raw_fd(libc::dup(2)) },
offset: 4096,
stride: 3840 * 4,
modifier: 0x0100000000000001, // AMD modifiers
width: 3840,
height: 2160,
format: 0x34325258,
pts: 0,
};
let desc = build_drm_descriptor(&frame);
assert_eq!(desc.nb_objects, 1);
assert_eq!(desc.objects[0].format_modifier, 0x0100000000000001);
assert_eq!(desc.layers[0].planes[0].offset, 4096);
assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4);
}
} }

531
src/webrtc.rs Normal file
View File

@@ -0,0 +1,531 @@
// WebRTC 传输模块 — 使用 str0m (Sans-IO) 将 H.264 编码帧推送到浏览器
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, UdpSocket};
use std::time::Instant;
use anyhow::{bail, Result};
use str0m::change::SdpOffer;
use str0m::format::Codec;
use str0m::media::{Frequency, MediaKind, MediaTime, Mid, Pt};
use str0m::net::{Protocol, Receive};
use str0m::{Candidate, Event, IceConnectionState, Input, Output, Rtc, RtcConfig};
// ── 嵌入式 HTML 测试页面 ──────────────────────────────────────────────────
const HTML_PAGE: &str = r#"<!DOCTYPE html>
<html>
<head><title>wl-webrtc P0</title>
<style>body{background:#000;color:#fff;font-family:monospace;display:flex;flex-direction:column;align-items:center;justify-content:center;height:100vh;margin:0}
video{max-width:90vw;max-height:80vh;border:1px solid #333}
#status{margin:12px;font-size:14px;color:#aaa}
#debug{position:fixed;bottom:8px;left:8px;font-size:11px;color:#666;max-width:90vw;white-space:pre-wrap}
</style></head>
<body>
<div id="status">Connecting...</div>
<video id="video" autoplay playsinline muted></video>
<pre id="debug"></pre>
<script>
const status = document.getElementById('status');
const video = document.getElementById('video');
const debug = document.getElementById('debug');
let pc = null;
const log = msg => { debug.textContent += msg + '\n'; console.log(msg); };
function preferH264(sdp) {
const lines = sdp.split('\r\n');
const h264Pts = lines
.filter(line => line.startsWith('a=rtpmap:') && line.toUpperCase().includes('H264/90000'))
.map(line => line.match(/^a=rtpmap:(\d+)/)?.[1])
.filter(Boolean);
if (h264Pts.length === 0) return sdp;
return lines.map(line => {
if (!line.startsWith('m=video ')) return line;
const parts = line.split(' ');
const header = parts.slice(0, 3);
const pts = parts.slice(3);
const preferred = h264Pts.filter(pt => pts.includes(pt));
const rest = pts.filter(pt => !preferred.includes(pt));
return [...header, ...preferred, ...rest].join(' ');
}).join('\r\n');
}
function installStatsLogger(peer) {
setInterval(() => {
if (peer !== pc) return;
const v = video;
log(`video: readyState=${v.readyState} currentTime=${v.currentTime.toFixed(2)} ` +
`paused=${v.paused} width=${v.videoWidth} height=${v.videoHeight} ` +
`srcObject=${v.srcObject ? 'yes' : 'no'}`);
peer.getStats().then(stats => {
stats.forEach(report => {
if (report.type === 'inbound-rtp' && report.kind === 'video') {
log(`RTP-in: packetsReceived=${report.packetsReceived} packetsLost=${report.packetsLost} ` +
`bytesReceived=${report.bytesReceived} framesDecoded=${report.framesDecoded} ` +
`framesDropped=${report.framesDropped} codecId=${report.codecId}`);
}
if (report.type === 'codec' && report.mimeType && report.mimeType.includes('H264')) {
log(`Codec: ${report.mimeType} ${report.payloadType} sdpFmtpLine=${report.sdpFmtpLine}`);
}
});
}).catch(() => {});
}, 2000);
}
function connect() {
if (pc) pc.close();
pc = new RTCPeerConnection();
const peer = pc;
peer.ontrack = e => {
log('ontrack: streams=' + e.streams.length + ' kind=' + e.track.kind);
video.srcObject = e.streams[0];
status.textContent = 'Track received';
};
peer.oniceconnectionstatechange = () => {
log('ICE: ' + peer.iceConnectionState);
status.textContent = 'ICE: ' + peer.iceConnectionState;
};
peer.addTransceiver('video', { direction: 'recvonly' });
installStatsLogger(peer);
peer.createOffer().then(offer => {
offer.sdp = preferH264(offer.sdp);
return peer.setLocalDescription(offer);
})
.then(() => new Promise(resolve => {
if (peer.iceGatheringState === 'complete') resolve();
else peer.onicegatheringstatechange = () => { if (peer.iceGatheringState === 'complete') resolve(); };
}))
.then(() => fetch('/sdp', { method: 'POST', body: JSON.stringify(peer.localDescription) }))
.then(r => { if (!r.ok) throw new Error('SDP exchange failed: ' + r.status); return r.json(); })
.then(answer => { if (answer.error) throw new Error(answer.error); return peer.setRemoteDescription(answer); })
.then(() => log('SDP answer set'))
.catch(e => {
status.textContent = 'Error: ' + e.message;
log('ERROR: ' + e.message + ' — retrying in 2s...');
console.error(e);
setTimeout(connect, 2000);
});
}
connect();
</script>
</body></html>"#;
// ── WebRTC 状态 ───────────────────────────────────────────────────────────
pub struct WebRtcState {
signal_listener: TcpListener,
inner: Option<WebRtcInner>,
fps: u32,
}
struct WebRtcInner {
rtc: Rtc,
socket: UdpSocket,
udp_addr: SocketAddr,
video_mid: Option<Mid>,
video_pt: Option<Pt>,
connected: bool,
need_keyframe: bool,
rtp_clock: u32,
buf: Vec<u8>,
}
impl WebRtcState {
pub fn new(port: u16, fps: u32) -> Result<Self> {
let signal_listener = TcpListener::bind(format!("0.0.0.0:{port}"))?;
signal_listener.set_nonblocking(true)?;
tracing::info!("WebRTC signaling on http://0.0.0.0:{port}/");
Ok(Self {
signal_listener,
inner: None,
fps,
})
}
pub fn handle_signaling(&mut self) -> Result<bool> {
let mut handled = false;
loop {
let (mut stream, _addr) = match self.signal_listener.accept() {
Ok(s) => s,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => bail!("TCP accept error: {e}"),
};
handled = true;
stream.set_nonblocking(true)?;
let mut req = vec![0u8; 65536];
let n = match stream.read(&mut req) {
Ok(n) => n,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
Err(e) => {
tracing::warn!("TCP read error: {e}");
continue;
}
};
let req_str = String::from_utf8_lossy(&req[..n]);
if req_str.starts_with("GET / ")
|| req_str.starts_with("GET /sdp ")
&& !req_str.contains("Content-Type: application/json")
{
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
HTML_PAGE.len(),
HTML_PAGE
);
let _ = stream.write_all(resp.as_bytes());
} else if req_str.starts_with("POST /sdp") {
let body = extract_body(&req_str);
if body.is_empty() {
let resp = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\nempty body";
let _ = stream.write_all(resp.as_bytes());
continue;
}
match WebRtcInner::new(self.fps)
.and_then(|mut new_inner| {
let answer_json = new_inner.handle_sdp_offer(body.as_bytes())?;
Ok((new_inner, answer_json))
}) {
Ok((new_inner, answer_json)) => {
let replacing = self.inner.is_some();
self.inner = Some(new_inner);
if replacing {
tracing::info!("Replaced WebRTC connection (old dropped)");
} else {
tracing::info!("New WebRTC connection");
}
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
answer_json.len(),
answer_json
);
let _ = stream.write_all(resp.as_bytes());
}
Err(e) => {
tracing::error!("SDP offer handling failed: {e}");
let resp = format!("HTTP/1.1 500 Error\r\nConnection: close\r\n\r\n{e}");
let _ = stream.write_all(resp.as_bytes());
}
}
} else {
let resp = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n";
let _ = stream.write_all(resp.as_bytes());
}
}
Ok(handled)
}
pub fn poll_rtc(&mut self) -> Result<()> {
if let Some(inner) = self.inner.as_mut() {
if inner.poll_rtc()? {
tracing::warn!("WebRTC connection closed/failed; clearing connection state");
self.inner = None;
}
}
Ok(())
}
pub fn feed_network(&mut self) -> Result<()> {
if let Some(inner) = self.inner.as_mut() {
inner.feed_network()?;
}
Ok(())
}
pub fn poll_and_feed(&mut self) -> Result<()> {
self.poll_rtc()?;
self.feed_network()?;
self.poll_rtc()
}
pub fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<()> {
if let Some(inner) = self.inner.as_mut() {
inner.write_h264_frame(data, frame_number, fps)?;
}
Ok(())
}
pub fn is_connected(&self) -> bool {
self.inner.as_ref().is_some_and(WebRtcInner::is_connected)
}
}
impl WebRtcInner {
fn new(fps: u32) -> Result<Self> {
let _ = fps;
let mut rtc = RtcConfig::new().build(Instant::now());
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;
let local_addr = socket.local_addr()?;
let lan_ip = local_ip().unwrap_or_else(|| {
tracing::warn!("Failed to detect LAN IP, falling back to 127.0.0.1");
"127.0.0.1".to_string()
});
let candidate_addr: SocketAddr = format!("{lan_ip}:{}", local_addr.port()).parse()?;
let candidate = Candidate::host(candidate_addr, "udp")
.map_err(|e| anyhow::anyhow!("candidate: {e}"))?;
rtc.add_local_candidate(candidate);
tracing::info!("WebRTC UDP: {candidate_addr} (bound 0.0.0.0)");
Ok(Self {
rtc,
socket,
udp_addr: candidate_addr,
video_mid: None,
video_pt: None,
connected: false,
need_keyframe: false,
rtp_clock: 0,
buf: vec![0u8; 65535],
})
}
fn handle_sdp_offer(&mut self, body: &[u8]) -> Result<String> {
let offer: SdpOffer = serde_json::from_slice(body)
.map_err(|e| anyhow::anyhow!("parse SDP offer: {e}"))?;
let answer = self
.rtc
.sdp_api()
.accept_offer(offer)
.map_err(|e| anyhow::anyhow!("accept_offer: {e}"))?;
self.need_keyframe = true;
tracing::info!("SDP exchange complete, waiting for ICE/DTLS...");
self.discover_video_params();
let answer_json =
serde_json::to_vec(&answer).map_err(|e| anyhow::anyhow!("serialize answer: {e}"))?;
String::from_utf8(answer_json).map_err(|e| anyhow::anyhow!("answer utf8: {e}"))
}
fn discover_video_params(&mut self) {
for s in ["0", "1", "2", "3"] {
let mid: Mid = s.into();
if let Some(media) = self.rtc.media(mid) {
if media.kind() == MediaKind::Video {
tracing::info!("Found video media: mid={mid}");
self.video_mid = Some(mid);
break;
}
}
}
if let Some(mid) = self.video_mid {
if let Some(writer) = self.rtc.writer(mid) {
for pp in writer.payload_params() {
tracing::debug!("Codec: pt={:?} spec={:?}", pp.pt(), pp.spec());
if pp.spec().codec.is_video() && pp.spec().codec == Codec::H264 {
self.video_pt = Some(pp.pt());
tracing::info!("H.264 payload type: {:?}", pp.pt());
break;
}
}
}
}
}
fn poll_rtc(&mut self) -> Result<bool> {
loop {
match self.rtc.poll_output() {
Ok(Output::Transmit(t)) => {
tracing::info!("TX {} bytes -> {}", t.contents.len(), t.destination);
if let Err(e) = self.socket.send_to(&t.contents, t.destination) {
tracing::warn!("UDP send error: {e}");
}
}
Ok(Output::Event(e)) => {
tracing::info!("RTC event: {e:?}");
match &e {
Event::Connected => {
tracing::info!("WebRTC connected!");
self.connected = true;
self.need_keyframe = true;
self.discover_video_params();
}
Event::IceConnectionStateChange(IceConnectionState::Disconnected) => {
tracing::warn!("WebRTC disconnected");
self.connected = false;
}
Event::MediaAdded(ma) => {
tracing::info!("Media added: mid={:?}", ma.mid);
}
_ => {
tracing::debug!("WebRTC event: {:?}", e);
}
}
}
Ok(Output::Timeout(_t)) => break,
Err(e) => {
tracing::error!("rtc.poll_output error: {e}");
break;
}
}
}
Ok(false)
}
fn feed_network(&mut self) -> Result<()> {
let mut recv_count = 0u32;
loop {
match self.socket.recv_from(&mut self.buf) {
Ok((n, source)) => {
recv_count += 1;
if recv_count <= 5 {
tracing::info!("UDP recv {} bytes from {}", n, source);
}
let input = Input::Receive(
Instant::now(),
Receive {
proto: Protocol::Udp,
source,
destination: self.udp_addr,
contents: self.buf[..n]
.try_into()
.map_err(|e| anyhow::anyhow!("receive contents: {e}"))?,
},
);
self.rtc
.handle_input(input)
.map_err(|e| anyhow::anyhow!("handle_input({n} bytes from {source}): {e}"))?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => bail!("UDP recv error: {e}"),
}
}
self.rtc
.handle_input(Input::Timeout(Instant::now()))
.map_err(|e| anyhow::anyhow!("handle timeout: {e}"))?;
Ok(())
}
fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<()> {
if !self.connected {
return Ok(());
}
let mid = match self.video_mid {
Some(m) => m,
None => {
tracing::warn!("write_h264: no video_mid");
return Ok(());
}
};
let pt = match self.video_pt {
Some(p) => p,
None => {
tracing::warn!("write_h264: no video_pt");
return Ok(());
}
};
if self.need_keyframe {
if !is_idr_nalu(data) {
tracing::debug!(
"write_h264: skipping non-IDR frame ({} bytes), waiting for keyframe",
data.len()
);
return Ok(());
}
tracing::info!(
"write_h264: got IDR keyframe ({} bytes), starting playback",
data.len()
);
self.need_keyframe = false;
}
let ticks_per_second = 90_000u64;
let fps = fps.max(1) as u64;
let rtp_timestamp = frame_number.saturating_mul(ticks_per_second) / fps;
self.rtp_clock = rtp_timestamp as u32;
let rtp_time = MediaTime::new(rtp_timestamp, Frequency::NINETY_KHZ);
let writer = match self.rtc.writer(mid) {
Some(w) => w,
None => {
tracing::warn!("write_h264: no writer for mid={mid}");
return Ok(());
}
};
tracing::debug!(
"write_h264: {} bytes, pt={:?}, rtp={}",
data.len(),
pt,
self.rtp_clock
);
writer
.write(pt, Instant::now(), rtp_time, data)
.map_err(|e| anyhow::anyhow!("writer.write: {e}"))?;
self.poll_rtc()?;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
}
// ── 工具函数 ──────────────────────────────────────────────────────────────
/// 从 HTTP 请求中提取 body在 \r\n\r\n 之后)
fn extract_body(req: &str) -> &str {
if let Some(idx) = req.find("\r\n\r\n") {
req.get(idx + 4..).unwrap_or("")
} else {
""
}
}
fn local_ip() -> Option<String> {
std::net::UdpSocket::bind("0.0.0.0:0")
.ok()
.and_then(|s| {
s.connect("1.1.1.1:80").ok()?;
let addr = s.local_addr().ok()?;
drop(s);
let ip = addr.ip().to_string();
if ip == "0.0.0.0" || ip.starts_with("127.") {
return None;
}
Some(ip)
})
}
fn is_idr_nalu(data: &[u8]) -> bool {
let mut i = 0;
while i + 4 < data.len() {
if data[i..i + 4] == [0, 0, 0, 1] {
let nal_type = data[i + 4] & 0x1F;
if nal_type == 5 {
return true;
}
i += 5;
} else if i + 3 < data.len() && data[i..i + 3] == [0, 0, 1] {
let nal_type = data[i + 3] & 0x1F;
if nal_type == 5 {
return true;
}
i += 4;
} else {
i += 1;
}
}
false
}