Compare commits

...

17 Commits

Author SHA1 Message Date
dailz
503e4dbc22 feat(portal): independent WebRTC thread + channel tuning for 60fps mouse latency
- Move WebRTC send to dedicated wl-webrtc-webrtc thread (was inline in main loop)
- Reduce frame_rx 16→1, input_tx 2→1 (drop-on-full), webrtc_tx 32→2
- Recv_timeout 10ms→2ms to reduce pipeline latency
- Fix sent_gap_p95 stats bug: compute gap at actual send time in WebRTC
  thread instead of batch-draining at snapshot time (was always 0.0ms)
- High profile via AVCodecContext.profile, veryfast preset, 5x bitrate
- Stats drain via sent_gap channel with record_send_from_thread()
- Shutdown: drop input_tx → join encode → drop webrtc_tx → join webrtc
2026-06-07 18:30:09 +08:00
dailz
caccfec44e fix(portal): compositor stall detection + filler frames + PipeWire state logging
P0: Detect compositor frame delivery stalls (>100ms no frames) and log
    stall/resume events with duration. Rate-limited to 1 warn/sec.

P1: Insert duplicate raw CpuNv12Frame filler during stalls at target fps.
    Keeps WebRTC stream smooth (sent_fps 20-40 instead of 3-5 during
    compositor pauses). Stops after 2s max stale. WebRTC mode only.

P2: Replace silent _ => {} in PipeWire state_changed callback with
    explicit Paused/Streaming/Connecting log messages.

P4: Add PwCtrlEvent::FormatChanged for mid-stream dimension changes.
    param_changed detects resolution renegotiation (skips first call).
    Logs warning in poll_and_encode; full encoder reinit deferred.

Verified: cargo check 0 errors, 70/70 tests, release build, --stats live.
2026-06-07 17:20:54 +08:00
dailz
826f544569 feat(portal): async encode pipeline - decouple capture from encoding
Split synchronous encode pipeline so sws_scale + libx264 runs on a
dedicated thread, leaving only VAAPI import + GPU scale + GPU→CPU
transfer on the main capture thread.

Problem: encode_p95 occasionally hit 74ms, blocking the entire capture
pipeline and causing capture_gap_max=356ms stutter.

Solution:
- avhw.rs: Split SwEncState into SwEncImport (main thread: VAAPI import,
  filter_graph scale, GPU→CPU transfer) and SwEncEncode (encode thread:
  sws_scale NV12→YUV420P, libx264 encode). New CpuNv12Frame struct
  carries owned pixel data across threads via crossbeam channel.
  SwEncState wraps both for backward compat (MP4/sync path untouched).
- state_portal.rs: WebRTC portal path spawns 'wl-webrtc-encode' thread
  with bounded(2) input channel (drop-newest backpressure) and separate
  timing channel. Graceful shutdown: drop webrtc_rx → drop input_tx →
  join encode thread → flush sync encoder.
- stats.rs: Add record_import() + record_encode_thread() for async timing.

Results: encode_p95 stable at 2.9-4.2ms (was 11-74ms), capture_fps
stable 59-60fps, cap_gap_p95 17-19ms. Remaining capture stalls traced
to PipeWire compositor frame delivery (external, not our code).
2026-06-07 16:55:28 +08:00
dailz
aae030f309 fix(webrtc): SO_SNDBUF 2MB + VBV rate limiting + stats integration
P0 - UDP send buffer: set SO_SNDBUF=2MB to prevent EAGAIN on large IDR
frames (218KB/256KB keyframes caused 18+ EAGAIN bursts). Actual Linux
buffer 4096KB confirmed.

P1 - VBV rate limiting: cap rc_max_rate=bitrate and rc_buffer_size=
bitrate/4 for WebRTC encode path, preventing oversized IDR frames.

Stats: integrate PipelineStats into cap_portal (dropped_count), state.rs
(wlroots path), webrtc.rs (browser getStats enhancement + stats panel).
2026-06-07 16:55:07 +08:00
dailz
029fe13e37 feat(stats): add --stats flag and PipelineStats windowed diagnostics
Add lightweight per-second pipeline statistics for stutter diagnosis:
- --stats CLI flag enables structured stats logging
- PipelineStats tracks capture/encode/send timing with p95/pmax
- FrameTimings records import/scale/transfer/sws/encode per-frame
- StatsSnapshot produces one structured log line per second
2026-06-07 16:54:45 +08:00
dailz
f3da1e4e6c fix(webrtc): propagate poll_output error as cleanup signal to prevent zombie state (closes #14) 2026-06-06 21:48:38 +08:00
dailz
e6e05fb44a fix(webrtc): fix is_idr_nalu boundary bug missing tail NAL units (closes #13) 2026-06-06 21:34:22 +08:00
dailz
8b04893ceb fix(security): remove error details from HTTP 500 response (#12)
The 500 error response previously included the raw error message {e}
in the body, potentially leaking internal implementation details (SDP
parse errors, ICE candidate info) to clients.

The detailed error is already logged server-side via tracing::error!,
so the response body is now a fixed generic string with a proper
HTTP/1.1 status line.
2026-06-06 21:22:57 +08:00
dailz
1beaea8088 fix(webrtc): use MediaAdded event to discover video mid instead of hardcoded iteration (closes #11) 2026-06-06 21:16:55 +08:00
dailz
fc4733ffe8 fix: return Ok(true) on ICE Disconnected to prevent resource leak
poll_rtc() always returned Ok(false), preventing WebRtcState from
clearing self.inner on disconnect. This leaked the UDP socket, Rtc
instance, and 65KB buffer permanently if the client never reconnected.

Closes #10
2026-06-06 20:57:25 +08:00
dailz
d5679be3a4 fix(state_portal): replace expect() with bail-style error propagation (closes #9) 2026-06-06 20:19:51 +08:00
dailz
36f07c92e9 fix(state_portal): prevent shutdown deadlock on full bounded channel (closes #8)
shutdown() calls enc.flush() → drain_encoder() → tx.send() on a
crossbeam bounded(32) channel.  If the channel is full and the
receiver (webrtc_rx) is alive but not being drained, send() blocks
forever — a self-deadlock since both ends belong to the same struct.

Two-layer fix:
- avhw.rs: replace tx.send() with tx.try_send(); handle Full (drop
  frame) and Disconnected (set flag) separately.
- state_portal.rs: drop webrtc_rx before flushing in shutdown() so
  try_send returns Disconnected immediately.

Regression tests added for the channel semantics.
2026-06-06 20:02:09 +08:00
dailz
7c1c9b2e19 fix(avhw): add SAFETY comments to all undocumented unsafe blocks
Close #7

- Add // SAFETY: comments to 19 undocumented unsafe blocks and impls
- Add nb_streams/null guard on stream array dereference (drain_encoder)
- Add clippy undocumented_unsafe_blocks = warn lint to prevent regression

avhw.rs now has 0 clippy unsafe documentation warnings.
2026-06-06 15:54:09 +08:00
dailz
226768c3e3 fix(avhw): handle tx.send() failure and pause encoding on WebRTC disconnect (closes #6)
- Replace 'let _ = tx.send()' with proper error handling: log warning,
  set webrtc_disconnected flag, and break drain loop on SendError
- Add Arc<AtomicBool> webrtc_paused shared between State/StatePortal
  and SwEncState, synced from wrtc.is_connected() in poll_webrtc()
- Skip encoding in encode_filtered_frame() when paused or disconnected
- Drain and discard stale channel frames on disconnect
- Resume encoding automatically on WebRTC reconnection
2026-06-06 15:12:49 +08:00
dailz
fd170b66d9 fix(unsafe): add SAFETY comment and runtime guards for from_raw_parts in drain_encoder
Issue: #5

- Read AVPacket fields into local variable to avoid repeated pointer deref
- Guard against size <= 0 (prevents c_int negative wrap to huge usize)
- Guard against null data pointer (from_raw_parts(null, 0) is UB in Rust)
- Add SAFETY comment matching existing codebase convention (30+ instances)
2026-06-06 11:56:47 +08:00
dailz
9a5b09cd7f fix(security): harden token file permissions (closes #2)
- save_restore_token: use create_new(true) + mode(0o600) for exclusive
  atomic file creation, preventing symlink attacks and predictable
  temp file exploitation
- token_path: return Option, eliminate insecure /tmp fallback
- load_restore_token: reject insecure files (symlinks, wrong owner,
  group/world-readable permissions)
- Directory creation uses DirBuilderExt::mode(0o700) bypassing umask
- Added verify_secure_dir and ensure_secure_parent with full metadata
  validation (owner, permissions, symlink rejection)
- Added 11 regression tests covering all security scenarios
2026-06-06 11:05:00 +08:00
dailz
46367ef6b5 fix(state): add WebRTC support to wlr-screencopy backend
Fixes #1 -- --port mode with wlr-screencopy backend caused panic at
negotiate_format() because self.args.output is None and .expect() was
called unconditionally.

Changes:
- Introduce StreamingEncoder enum wrapping EncState (MP4) and
  SwEncState (WebRTC) with unified frames_rgb/encode_frame/flush API
- Add WebRTC fields to State<S> (webrtc, webrtc_tx, webrtc_rx,
  webrtc_frames_sent) matching Portal backend pattern
- State::new() returns Result<Self> for clean WebRtcState init failure
- negotiate_format() branches on webrtc_tx: WebRTC path uses
  SwEncState::new_webrtc(), MP4 path unchanged (hardware VAAPI)
- Add poll_webrtc() method to drive signaling + channel drain
- Event loop calls poll_webrtc() each iteration
- Fix pre-existing test/bench Args construction (Option<String> output,
  missing no_persist field)
2026-06-04 22:10:46 +08:00
15 changed files with 2486 additions and 528 deletions

4
.gitignore vendored
View File

@@ -17,3 +17,7 @@ Thumbs.db
# Sisyphus orchestration artifacts # Sisyphus orchestration artifacts
.sisyphus/ .sisyphus/
.omo/
.playwright-mcp/
wl-webrtc.log
webrtc-p0-success.png

14
Cargo.lock generated
View File

@@ -1126,6 +1126,15 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.8.0" version = "2.8.0"
@@ -2025,10 +2034,14 @@ version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [ dependencies = [
"matchers",
"nu-ansi-term", "nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
] ]
@@ -2505,6 +2518,7 @@ dependencies = [
"signal-hook", "signal-hook",
"signal-hook-mio", "signal-hook-mio",
"str0m", "str0m",
"tempfile",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@@ -14,7 +14,7 @@ signal-hook = "0.3"
signal-hook-mio = { version = "0.2", features = ["support-v1_0"] } signal-hook-mio = { version = "0.2", features = ["support-v1_0"] }
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1" anyhow = "1"
drm = "0.12" drm = "0.12"
drm-fourcc = "2" drm-fourcc = "2"
@@ -28,3 +28,9 @@ crossbeam-channel = "0.5"
str0m = "0.20" str0m = "0.20"
serde_json = "1" serde_json = "1"
dirs = "6" dirs = "6"
[dev-dependencies]
tempfile = "3.27.0"
[lints.clippy]
undocumented_unsafe_blocks = "warn"

View File

@@ -50,4 +50,8 @@ pub struct Args {
/// Force re-authorization dialog (ignore saved portal restore token) /// Force re-authorization dialog (ignore saved portal restore token)
#[arg(long)] #[arg(long)]
pub no_persist: bool, pub no_persist: bool,
/// Enable per-second pipeline statistics output for stutter diagnosis
#[arg(long)]
pub stats: bool,
} }

File diff suppressed because it is too large Load Diff

View File

@@ -178,7 +178,7 @@ mod tests {
// 测试辅助函数:构造指定后端参数的 Args 实例 // 测试辅助函数:构造指定后端参数的 Args 实例
fn make_args(backend: Option<&str>) -> Args { fn make_args(backend: Option<&str>) -> Args {
Args { Args {
output: "test.mp4".to_string(), output: Some("test.mp4".to_string()),
output_name: None, output_name: None,
fps: 30, fps: 30,
codec: "h264".to_string(), codec: "h264".to_string(),
@@ -189,6 +189,8 @@ mod tests {
verbose: false, verbose: false,
backend: backend.map(String::from), backend: backend.map(String::from),
port: 0, port: 0,
no_persist: false,
stats: false,
} }
} }

View File

@@ -66,6 +66,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result<wl_webrtc::cap_portal::PwDmaBu
if let Ok(ctrl) = cap.event_receiver().try_recv() { if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl { match ctrl {
PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"), PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"),
PwCtrlEvent::FormatChanged { .. } => {}
PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"), PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"),
} }
} }
@@ -102,7 +103,7 @@ fn main() -> Result<()> {
println!(" (Select a screen to share in the portal dialog)"); println!(" (Select a screen to share in the portal dialog)");
let portal_args = Args { let portal_args = Args {
output: bench_args.output.clone(), output: Some(bench_args.output.clone()),
output_name: None, output_name: None,
fps: 60, fps: 60,
codec: "h264".to_string(), codec: "h264".to_string(),
@@ -113,6 +114,8 @@ fn main() -> Result<()> {
verbose: false, verbose: false,
backend: Some("portal".to_string()), backend: Some("portal".to_string()),
port: 0, port: 0,
no_persist: false,
stats: false,
}; };
let cap = CapPortal::new(&portal_args)?; let cap = CapPortal::new(&portal_args)?;
@@ -326,6 +329,7 @@ fn main() -> Result<()> {
eprintln!("PipeWire error after {} frames: {}", frames_encoded, e); eprintln!("PipeWire error after {} frames: {}", frames_encoded, e);
break; break;
} }
PwCtrlEvent::FormatChanged { .. } => {}
} }
} }

View File

@@ -138,6 +138,7 @@ fn receive_first_frame(cap: &CapPortal) -> Result<wl_webrtc::cap_portal::PwDmaBu
if let Ok(ctrl) = cap.event_receiver().try_recv() { if let Ok(ctrl) = cap.event_receiver().try_recv() {
match ctrl { match ctrl {
PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"), PwCtrlEvent::StreamEnded => bail!("PipeWire stream ended before first frame"),
PwCtrlEvent::FormatChanged { .. } => {}
PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"), PwCtrlEvent::Error(e) => bail!("PipeWire error: {e}"),
} }
} }
@@ -519,6 +520,7 @@ fn run_cpu_pipeline(
"PipeWire error after {} CPU frames: {e}", "PipeWire error after {} CPU frames: {e}",
stats.frames_encoded stats.frames_encoded
), ),
PwCtrlEvent::FormatChanged { .. } => {}
} }
} }
@@ -659,6 +661,7 @@ fn run_gpu_pipeline(
"PipeWire error after {} GPU frames: {e}", "PipeWire error after {} GPU frames: {e}",
stats.frames_encoded stats.frames_encoded
), ),
PwCtrlEvent::FormatChanged { .. } => {}
} }
} }
@@ -871,7 +874,7 @@ fn main() -> Result<()> {
println!(" (Select a screen to share in the portal dialog)"); println!(" (Select a screen to share in the portal dialog)");
let portal_args = Args { let portal_args = Args {
output: bench_args.output.clone(), output: Some(bench_args.output.clone()),
output_name: None, output_name: None,
fps: 60, fps: 60,
codec: "h264".to_string(), codec: "h264".to_string(),
@@ -882,6 +885,8 @@ fn main() -> Result<()> {
verbose: false, verbose: false,
backend: Some("portal".to_string()), backend: Some("portal".to_string()),
port: 0, port: 0,
no_persist: false,
stats: false,
}; };
let cap = CapPortal::new(&portal_args)?; let cap = CapPortal::new(&portal_args)?;

View File

@@ -54,6 +54,8 @@ pub struct PwDmaBufFrame {
pub enum PwCtrlEvent { pub enum PwCtrlEvent {
/// 流已结束PipeWire 流断开连接或进入错误状态) /// 流已结束PipeWire 流断开连接或进入错误状态)
StreamEnded, StreamEnded,
/// Format/dimensions changed mid-stream
FormatChanged { width: u32, height: u32 },
/// 发生错误,包含错误描述信息 /// 发生错误,包含错误描述信息
Error(String), Error(String),
} }
@@ -73,6 +75,7 @@ pub struct CapPortal {
event_rx: Receiver<PwCtrlEvent>, event_rx: Receiver<PwCtrlEvent>,
pw_thread: Option<JoinHandle<()>>, pw_thread: Option<JoinHandle<()>>,
rt: Runtime, rt: Runtime,
pw_dropped: Arc<AtomicU64>,
} }
/// PipeWire 捕获线程的上下文数据 /// PipeWire 捕获线程的上下文数据
@@ -95,7 +98,7 @@ impl CapPortal {
/// 执行流程: /// 执行流程:
/// 1. 创建 Tokio 运行时(用于异步 Portal 调用) /// 1. 创建 Tokio 运行时(用于异步 Portal 调用)
/// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id /// 2. 通过 XDG Desktop Portal 请求屏幕录制权限,获取 PipeWire fd 和 node_id
/// 3. 创建有界通道(容量 16)用于帧传递 /// 3. 创建有界通道(容量 1用于帧传递(最新帧优先,避免队列积压延迟)
/// 4. 创建 eventfd 对,用于线程安全的关闭信号传递 /// 4. 创建 eventfd 对,用于线程安全的关闭信号传递
/// 5. 启动 PipeWire 捕获线程 /// 5. 启动 PipeWire 捕获线程
pub fn new(args: &Args) -> Result<Self> { pub fn new(args: &Args) -> Result<Self> {
@@ -104,7 +107,7 @@ impl CapPortal {
let no_persist = args.no_persist; let no_persist = args.no_persist;
let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?; let (pw_fd, node_id) = rt.block_on(async { Self::setup_portal(no_persist).await })?;
let (frame_tx, frame_rx) = bounded(16); let (frame_tx, frame_rx) = bounded(1);
let (event_tx, event_rx) = bounded(8); let (event_tx, event_rx) = bounded(8);
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
@@ -149,6 +152,7 @@ impl CapPortal {
event_rx, event_rx,
pw_thread: Some(pw_thread), pw_thread: Some(pw_thread),
rt, rt,
pw_dropped,
}) })
} }
@@ -160,6 +164,16 @@ impl CapPortal {
&self.event_rx &self.event_rx
} }
/// Returns the total number of PipeWire frames dropped due to channel backlog.
pub fn dropped_count(&self) -> u64 {
self.pw_dropped.load(Ordering::Relaxed)
}
/// Returns the number of frames currently waiting in the capture channel.
pub fn capture_queue_depth(&self) -> usize {
self.frame_rx.len()
}
/// 通过 XDG Desktop Portal 建立屏幕录制会话 /// 通过 XDG Desktop Portal 建立屏幕录制会话
/// ///
/// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。 /// 与桌面环境的 D-Bus 服务交互,请求用户授权屏幕录制。
@@ -246,27 +260,162 @@ impl CapPortal {
} }
} }
fn token_path() -> PathBuf { fn token_path() -> Option<PathBuf> {
let base = dirs::cache_dir() dirs::cache_dir().map(|base| base.join("wl-webrtc").join("portal-restore-token"))
.unwrap_or_else(|| PathBuf::from("/tmp")); }
base.join("wl-webrtc").join("portal-restore-token")
/// Verify that `path` is a directory owned by the current user with no group/other permissions.
/// Rejects symlinks at the path itself (but allows the resolved target to be a real dir).
fn verify_secure_dir(path: &std::path::Path) -> bool {
use std::os::unix::fs::{MetadataExt, PermissionsExt};
match std::fs::symlink_metadata(path) {
Ok(meta) => {
if meta.file_type().is_symlink() {
tracing::warn!("Token parent dir is a symlink, rejecting: {}", path.display());
return false;
}
// Must be a directory
if !meta.is_dir() {
tracing::warn!("Token parent path is not a directory: {}", path.display());
return false;
}
// Must be owned by current user
if meta.uid() != unsafe { libc::getuid() } {
tracing::warn!("Token parent dir not owned by current user: {}", path.display());
return false;
}
// No group or other permissions (mode must be 0o700 exactly within the 0o777 mask)
let mode = meta.permissions().mode() & 0o777;
if mode != 0o700 {
tracing::warn!(
"Token parent dir has insecure permissions {:o}, expected 0700: {}",
mode,
path.display()
);
return false;
}
true
}
Err(e) => {
tracing::warn!("Failed to stat token parent dir: {e}");
false
}
}
}
/// Ensure the parent directory exists with restrictive permissions (0o700).
/// Returns false if the directory could not be created or is insecure.
fn ensure_secure_parent(parent: &std::path::Path) -> bool {
use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
if parent.exists() {
// Directory exists — try to tighten permissions, then verify.
// set_permissions follows symlinks, which is fine here since
// we verify with symlink_metadata in verify_secure_dir.
if let Err(e) = std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700)) {
tracing::warn!("Failed to set directory permissions: {e}");
return false;
}
return verify_secure_dir(parent);
}
// Create with restrictive mode — DirBuilderExt::mode bypasses umask.
let mut builder = std::fs::DirBuilder::new();
builder.recursive(true);
builder.mode(0o700);
if let Err(e) = builder.create(parent) {
tracing::warn!("Failed to create token directory: {e}");
return false;
}
// Verify after creation (belt-and-suspenders)
verify_secure_dir(parent)
} }
fn load_restore_token() -> Option<String> { fn load_restore_token() -> Option<String> {
let path = token_path(); load_restore_token_from(token_path()?)
}
fn load_restore_token_from(path: PathBuf) -> Option<String> {
use std::os::unix::fs::{MetadataExt, PermissionsExt};
let meta = match std::fs::symlink_metadata(&path) {
Ok(m) => m,
Err(_) => return None,
};
if meta.file_type().is_symlink() {
tracing::warn!("Token file is a symlink, refusing to read: {}", path.display());
return None;
}
if !meta.is_file() {
tracing::warn!("Token path is not a regular file: {}", path.display());
return None;
}
if meta.uid() != unsafe { libc::getuid() } {
tracing::warn!("Token file not owned by current user: {}", path.display());
return None;
}
let mode = meta.permissions().mode() & 0o777;
if mode & 0o077 != 0 {
tracing::warn!(
"Token file has insecure permissions {:o}, refusing to read: {}",
mode,
path.display()
);
return None;
}
let token = std::fs::read_to_string(&path).ok()?; let token = std::fs::read_to_string(&path).ok()?;
let trimmed = token.trim().to_string(); let trimmed = token.trim().to_string();
if trimmed.is_empty() { None } else { Some(trimmed) } if trimmed.is_empty() { None } else { Some(trimmed) }
} }
fn save_restore_token(token: &str) { fn save_restore_token(token: &str) {
let path = token_path(); let Some(path) = token_path() else {
if let Some(parent) = path.parent() { tracing::warn!("No secure cache directory available, skipping token save");
let _ = std::fs::create_dir_all(parent); return;
};
save_restore_token_to(token, &path);
}
fn save_restore_token_to(token: &str, path: &std::path::Path) {
use std::fs::OpenOptions;
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
let Some(parent) = path.parent() else {
tracing::warn!("Token path has no parent directory");
return;
};
if !ensure_secure_parent(parent) {
tracing::warn!("Parent directory is insecure, refusing to save token");
return;
} }
match std::fs::write(&path, token) {
// Use a unique temp file to prevent symlink attacks.
// create_new(true) guarantees exclusive creation — fails if file already exists,
// and does NOT follow existing symlinks.
let tmp_path = path.with_extension(format!("{}.tmp", std::process::id()));
let result = (|| -> std::io::Result<()> {
let mut f = OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o600)
.open(&tmp_path)?;
f.write_all(token.as_bytes())?;
f.sync_all()?;
std::fs::rename(&tmp_path, path)?;
Ok(())
})();
match result {
Ok(()) => tracing::info!("Saved portal restore token"), Ok(()) => tracing::info!("Saved portal restore token"),
Err(e) => tracing::warn!("Failed to save restore token: {e}"), Err(e) => {
let _ = std::fs::remove_file(&tmp_path);
tracing::warn!("Failed to save restore token: {e}");
}
} }
} }
@@ -341,7 +490,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let mainloop = match pw::main_loop::MainLoopBox::new(None) { let mainloop = match pw::main_loop::MainLoopBox::new(None) {
Ok(ml) => ml, Ok(ml) => ml,
Err(e) => { Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))); if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("MainLoop::new failed: {e}"))) {
tracing::error!("MainLoop::new failed and error channel also failed: {e}");
}
return; return;
} }
}; };
@@ -349,7 +500,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let context = match pw::context::ContextBox::new(mainloop.loop_(), None) { let context = match pw::context::ContextBox::new(mainloop.loop_(), None) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))); if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Context::new failed: {e}"))) {
tracing::error!("Context::new failed and error channel also failed: {e}");
}
return; return;
} }
}; };
@@ -357,7 +510,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let core = match context.connect_fd(pw_fd, None) { let core = match context.connect_fd(pw_fd, None) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}"))); if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("connect_fd failed: {e}"))) {
tracing::error!("connect_fd failed and error channel also failed: {e}");
}
return; return;
} }
}; };
@@ -379,7 +534,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
) { ) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))); if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("Stream::new failed: {e}"))) {
tracing::error!("Stream::new failed and error channel also failed: {e}");
}
return; return;
} }
}; };
@@ -399,7 +556,13 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pw::stream::StreamState::Unconnected => { pw::stream::StreamState::Unconnected => {
let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded); let _ = event_tx_state.try_send(PwCtrlEvent::StreamEnded);
} }
_ => {} pw::stream::StreamState::Paused => {
tracing::warn!("PipeWire stream paused (compositor may be switching content)");
}
pw::stream::StreamState::Streaming => {
tracing::info!("PipeWire stream (re)started");
}
pw::stream::StreamState::Connecting => {}
} }
}) })
// 参数变化回调(格式协商) // 参数变化回调(格式协商)
@@ -407,6 +570,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
// id 为参数类型param 包含具体的格式参数(分辨率、像素格式等) // id 为参数类型param 包含具体的格式参数(分辨率、像素格式等)
.param_changed({ .param_changed({
let format_info = format_info.clone(); let format_info = format_info.clone();
let event_tx = event_tx.clone();
move |_, _, id, param| { move |_, _, id, param| {
// 仅处理 Format 类型的参数变化 // 仅处理 Format 类型的参数变化
let Some(param) = param else { return }; let Some(param) = param else { return };
@@ -428,7 +592,18 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let framerate = info.framerate(); let framerate = info.framerate();
let max_framerate = info.max_framerate(); let max_framerate = info.max_framerate();
// 保存协商后的格式信息,供 process 回调读取 // 保存协商后的格式信息,供 process 回调读取
let previous_format = format_info.get();
format_info.set(Some((width, height, drm_format, modifier))); format_info.set(Some((width, height, drm_format, modifier)));
if let Some((previous_width, previous_height, _, _)) = previous_format {
if width != previous_width || height != previous_height {
tracing::warn!(
"PipeWire dimensions changed: {}x{} (format renegotiation)",
width,
height
);
let _ = event_tx.try_send(PwCtrlEvent::FormatChanged { width, height });
}
}
tracing::info!( tracing::info!(
"PipeWire format negotiated: {width}x{height}, \ "PipeWire format negotiated: {width}x{height}, \
drm_format={drm_format:#010x}, modifier={modifier:#x}, \ drm_format={drm_format:#010x}, modifier={modifier:#x}, \
@@ -449,12 +624,14 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let raw_buf = unsafe { stream.dequeue_raw_buffer() }; let raw_buf = unsafe { stream.dequeue_raw_buffer() };
if raw_buf.is_null() { if raw_buf.is_null() {
tracing::trace!("process: null raw_buf");
return; return;
} }
// 获取 SPA buffer 结构体,包含数据数组、元数据等 // 获取 SPA buffer 结构体,包含数据数组、元数据等
let spa_buf = unsafe { (*raw_buf).buffer }; let spa_buf = unsafe { (*raw_buf).buffer };
if spa_buf.is_null() { if spa_buf.is_null() {
tracing::trace!("process: null spa_buf");
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
@@ -464,6 +641,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
let n_datas = unsafe { (*spa_buf).n_datas }; let n_datas = unsafe { (*spa_buf).n_datas };
let datas_ptr = unsafe { (*spa_buf).datas }; let datas_ptr = unsafe { (*spa_buf).datas };
if n_datas == 0 || datas_ptr.is_null() { if n_datas == 0 || datas_ptr.is_null() {
tracing::trace!("process: no data (n_datas={n_datas})");
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
@@ -474,11 +652,13 @@ fn pipewire_thread(ctx: PwThreadCtx) {
unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) }; unsafe { &*(datas_ptr as *const pw::spa::buffer::Data) };
let fd = data_ref.fd(); let fd = data_ref.fd();
if fd < 0 { if fd < 0 {
tracing::trace!("process: invalid fd={fd}");
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
if data_ref.as_raw().chunk.is_null() { if data_ref.as_raw().chunk.is_null() {
tracing::trace!("process: null chunk");
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
@@ -516,6 +696,7 @@ fn pipewire_thread(ctx: PwThreadCtx) {
return; return;
}; };
if width == 0 || height == 0 || format == 0 { if width == 0 || height == 0 || format == 0 {
tracing::trace!("process: invalid dimensions {width}x{height} format={format}");
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
return; return;
} }
@@ -541,11 +722,12 @@ fn pipewire_thread(ctx: PwThreadCtx) {
pts, pts,
}; };
if let Err(crossbeam_channel::TrySendError::Full(_)) = frame_tx.try_send(frame) { match frame_tx.try_send(frame) {
let prev = dropped.fetch_add(1, Ordering::Relaxed); Ok(()) => {}
if prev > 0 && prev % 30 == 0 { Err(crossbeam_channel::TrySendError::Full(_)) => {
tracing::warn!("dropped {prev} frames total: encoder backlog"); dropped.fetch_add(1, Ordering::Relaxed);
} }
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {}
} }
unsafe { stream.queue_raw_buffer(raw_buf) }; unsafe { stream.queue_raw_buffer(raw_buf) };
} }
@@ -560,7 +742,9 @@ fn pipewire_thread(ctx: PwThreadCtx) {
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
&mut params, &mut params,
) { ) {
let _ = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))); if let Err(e) = event_tx.try_send(PwCtrlEvent::Error(format!("stream.connect failed: {e}"))) {
tracing::error!("stream.connect failed and error channel also failed: {e}");
}
return; return;
} }
@@ -645,6 +829,7 @@ fn spa_to_drm_fourcc(format: libspa::param::video::VideoFormat) -> u32 {
mod tests { mod tests {
use super::*; use super::*;
use drm_fourcc::DrmFourcc; use drm_fourcc::DrmFourcc;
use std::os::unix::fs::PermissionsExt;
#[test] #[test]
fn spa_to_drm_fourcc_all_32bit() { fn spa_to_drm_fourcc_all_32bit() {
@@ -688,4 +873,160 @@ mod tests {
use libspa::param::video::VideoFormat; use libspa::param::video::VideoFormat;
assert_eq!(spa_to_drm_fourcc(VideoFormat::NV12), 0); assert_eq!(spa_to_drm_fourcc(VideoFormat::NV12), 0);
} }
#[test]
fn token_path_never_uses_tmp() {
assert!(token_path().is_some(), "token_path should resolve on Linux");
let path = token_path().unwrap();
assert!(!path.starts_with("/tmp"), "must not fallback to /tmp");
}
#[test]
fn verify_secure_dir_rejects_wrong_permissions() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
// 0o700 should pass
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700)).unwrap();
assert!(verify_secure_dir(path));
// 0o755 should fail
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).unwrap();
assert!(!verify_secure_dir(path));
// 0o777 should fail
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o777)).unwrap();
assert!(!verify_secure_dir(path));
}
#[test]
fn verify_secure_dir_rejects_non_directory() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("not-a-dir");
std::fs::write(&file_path, b"test").unwrap();
assert!(!verify_secure_dir(&file_path));
}
#[test]
fn ensure_secure_parent_creates_with_0700() {
let base = tempfile::tempdir().unwrap();
let new_dir = base.path().join("wl-test-new-dir");
assert!(!new_dir.exists());
assert!(ensure_secure_parent(&new_dir));
assert!(new_dir.is_dir());
let meta = std::fs::symlink_metadata(&new_dir).unwrap();
let mode = meta.permissions().mode() & 0o777;
assert_eq!(mode, 0o700, "created directory should be 0700, got {mode:o}");
}
#[test]
fn ensure_secure_parent_tightens_existing_dir() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
// Simulate an existing directory with loose permissions
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).unwrap();
assert!(ensure_secure_parent(path));
let meta = std::fs::symlink_metadata(path).unwrap();
let mode = meta.permissions().mode() & 0o777;
assert_eq!(mode, 0o700, "tightened directory should be 0700, got {mode:o}");
}
#[test]
fn save_creates_file_with_0600() {
let dir = tempfile::tempdir().unwrap();
let token_path = dir.path().join("portal-restore-token");
save_restore_token_to("secret-token-123", &token_path);
assert!(token_path.exists());
let meta = std::fs::symlink_metadata(&token_path).unwrap();
let mode = meta.permissions().mode() & 0o777;
assert_eq!(mode, 0o600, "token file should be 0600, got {mode:o}");
assert_eq!(std::fs::read_to_string(&token_path).unwrap(), "secret-token-123");
}
#[test]
fn load_reads_secure_file() {
let dir = tempfile::tempdir().unwrap();
let token_path = dir.path().join("portal-restore-token");
// Write a valid 0o600 token file
use std::os::unix::fs::OpenOptionsExt;
let mut f = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o600)
.open(&token_path)
.unwrap();
std::io::Write::write_all(&mut f, b"my-secret\n").unwrap();
let result = load_restore_token_from(token_path);
assert_eq!(result, Some("my-secret".to_string()));
}
#[test]
fn load_rejects_group_readable_file() {
let dir = tempfile::tempdir().unwrap();
let token_path = dir.path().join("portal-restore-token");
// Write with 0o640 (group readable) — should be rejected
use std::os::unix::fs::OpenOptionsExt;
let mut f = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o640)
.open(&token_path)
.unwrap();
std::io::Write::write_all(&mut f, b"leaked-token\n").unwrap();
let result = load_restore_token_from(token_path);
assert!(result.is_none(), "should reject group-readable token file");
}
#[test]
fn load_rejects_world_readable_file() {
let dir = tempfile::tempdir().unwrap();
let token_path = dir.path().join("portal-restore-token");
use std::os::unix::fs::OpenOptionsExt;
let mut f = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o604)
.open(&token_path)
.unwrap();
std::io::Write::write_all(&mut f, b"leaked-token\n").unwrap();
let result = load_restore_token_from(token_path);
assert!(result.is_none(), "should reject world-readable token file");
}
#[test]
fn load_rejects_symlink() {
let dir = tempfile::tempdir().unwrap();
let real_path = dir.path().join("real-file");
let link_path = dir.path().join("portal-restore-token");
std::fs::write(&real_path, b"target-content\n").unwrap();
std::os::unix::fs::symlink(&real_path, &link_path).unwrap();
let result = load_restore_token_from(link_path);
assert!(result.is_none(), "should reject symlinked token file");
}
#[test]
fn save_then_load_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let token_path = dir.path().join("portal-restore-token");
save_restore_token_to("roundtrip-token", &token_path);
let loaded = load_restore_token_from(token_path);
assert_eq!(loaded, Some("roundtrip-token".to_string()));
}
} }

View File

@@ -4,6 +4,7 @@ pub mod backend_detect;
pub mod cap_portal; pub mod cap_portal;
pub mod cap_wlr_screencopy; pub mod cap_wlr_screencopy;
pub mod fps_limit; pub mod fps_limit;
pub mod stats;
pub mod state; pub mod state;
pub mod state_portal; pub mod state_portal;
pub mod transform; pub mod transform;

View File

@@ -15,6 +15,7 @@ mod backend_detect; // 截屏后端自动检测wlroots vs Portal/PipeWire
mod cap_portal; // XDG Portal 屏幕捕获 mod cap_portal; // XDG Portal 屏幕捕获
mod cap_wlr_screencopy; // wlroots wlr-screencopy 截屏协议 mod cap_wlr_screencopy; // wlroots wlr-screencopy 截屏协议
mod fps_limit; // 帧率限制器 mod fps_limit; // 帧率限制器
mod stats; // 管道性能统计(卡顿诊断)
mod state; // wlr-screencopy 后端的主状态机 mod state; // wlr-screencopy 后端的主状态机
mod state_portal; // Portal/PipeWire 后端的主状态机 mod state_portal; // Portal/PipeWire 后端的主状态机
mod transform; // 图像变换(旋转/翻转) mod transform; // 图像变换(旋转/翻转)
@@ -43,18 +44,23 @@ fn main() -> Result<()> {
// 解析命令行参数 // 解析命令行参数
let args = Args::parse(); let args = Args::parse();
// 根据是否启用 verbose 模式设置日志级别 // 根据 verbose 模式或 RUST_LOG 环境变量设置日志级别
// 支持 RUST_LOG 粒度控制(如 RUST_LOG=wl_webrtc::webrtc=trace
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| {
if args.verbose {
tracing_subscriber::EnvFilter::new("debug")
} else {
tracing_subscriber::EnvFilter::new("info")
}
});
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(if args.verbose { .with_env_filter(env_filter)
tracing::Level::DEBUG
} else {
tracing::Level::INFO
})
.with_writer(std::io::stderr) .with_writer(std::io::stderr)
.init(); .init();
tracing::info!("wl-webrtc starting"); tracing::info!("wl-webrtc starting");
tracing::debug!("Args: {:?}", args); tracing::debug!("Args: output={:?} fps={} codec={} port={} verbose={}", args.output, args.fps, args.codec, args.port, args.verbose);
// MVP 阶段仅支持 H.264 编码,不支持 HEVC // MVP 阶段仅支持 H.264 编码,不支持 HEVC
if args.codec != "h264" { if args.codec != "h264" {
@@ -100,7 +106,7 @@ fn run_wlr_screencopy(args: Args) -> Result<()> {
let qhandle = queue.handle(); let qhandle = queue.handle();
// State 是 wlr-screencopy 后端的核心状态机, // State 是 wlr-screencopy 后端的核心状态机,
// 内部管理输出探测、截屏请求、编码器构建、帧采集等阶段 // 内部管理输出探测、截屏请求、编码器构建、帧采集等阶段
let mut state = State::new(gm, args, qhandle); let mut state = State::new(gm, args, qhandle)?;
// Extract the Wayland fd and consume any immediately-available events. // Extract the Wayland fd and consume any immediately-available events.
// prepare_read() flushes outgoing requests; read() pulls whatever the // prepare_read() flushes outgoing requests; read() pulls whatever the
@@ -246,9 +252,11 @@ fn run_wlr_screencopy(args: Args) -> Result<()> {
// - Streaming: 正常采集中,请求下一帧 // - Streaming: 正常采集中,请求下一帧
state.queue_alloc_frame(); state.queue_alloc_frame();
state.poll_webrtc()?;
// 状态机遇到致命错误时退出 // 状态机遇到致命错误时退出
if state.errored { if state.errored {
tracing::error!("Fatal error in state machine, exiting"); tracing::error!("Fatal error in state machine (check preceding error logs), exiting");
running = false; running = false;
} }
@@ -344,7 +352,7 @@ fn run_portal_pipewire(args: Args) -> Result<()> {
// Portal 状态机遇到致命错误时退出 // Portal 状态机遇到致命错误时退出
if state.is_errored() { if state.is_errored() {
tracing::error!("Fatal error in portal state machine, exiting"); tracing::error!("Fatal error in portal state machine (check preceding error logs), exiting");
running = false; running = false;
} }
} }

View File

@@ -3,6 +3,8 @@ use std::mem;
use std::os::fd::{AsFd, OwnedFd}; use std::os::fd::{AsFd, OwnedFd};
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use anyhow::Result; use anyhow::Result;
@@ -41,10 +43,12 @@ use ffmpeg_next as ff;
use ffmpeg_next::ffi; use ffmpeg_next::ffi;
use crate::args::Args; use crate::args::Args;
use crate::avhw::{AvHwDevCtx, EncState}; use crate::avhw::{AvHwDevCtx, EncState, SwEncState};
use crate::cap_wlr_screencopy::CapWlrScreencopy; use crate::cap_wlr_screencopy::CapWlrScreencopy;
use crate::fps_limit::FpsLimit; use crate::fps_limit::FpsLimit;
use crate::stats::{FrameTimings, PipelineStats};
use crate::transform::{transpose_if_transform_transposed, Transform}; use crate::transform::{transpose_if_transform_transposed, Transform};
use crate::webrtc::WebRtcState;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// CaptureSource trait // CaptureSource trait
@@ -113,6 +117,42 @@ struct WlrHeadInfo {
/// User data for XdgOutput dispatch to identify which WlOutput it belongs to. /// User data for XdgOutput dispatch to identify which WlOutput it belongs to.
pub struct OutputId(pub u32); pub struct OutputId(pub u32);
// ---------------------------------------------------------------------------
// StreamingEncoder
// ---------------------------------------------------------------------------
/// Wraps the two possible encoder backends for the streaming stage.
///
/// - `Mp4(EncState)` — hardware VAAPI encoder writing to an MP4 file
/// - `WebRtc(SwEncState)` — software encoder feeding H.264 NALUs into a WebRTC channel
pub enum StreamingEncoder {
Mp4(EncState),
WebRtc(SwEncState),
}
impl StreamingEncoder {
fn frames_rgb(&self) -> &crate::avhw::AvHwFrameCtx {
match self {
StreamingEncoder::Mp4(enc) => enc.frames_rgb(),
StreamingEncoder::WebRtc(enc) => enc.frames_rgb(),
}
}
fn encode_frame(&mut self, hw_frame: &ffmpeg_next::frame::Video) -> anyhow::Result<()> {
match self {
StreamingEncoder::Mp4(enc) => enc.encode_frame(hw_frame),
StreamingEncoder::WebRtc(enc) => enc.encode_frame(hw_frame),
}
}
pub fn flush(&mut self) -> anyhow::Result<()> {
match self {
StreamingEncoder::Mp4(enc) => enc.flush(),
StreamingEncoder::WebRtc(enc) => enc.flush(),
}
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// EncConstructionStage // EncConstructionStage
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -142,7 +182,7 @@ pub enum EncConstructionStage<S: CaptureSource> {
Streaming { Streaming {
output_info: OutputInfo, output_info: OutputInfo,
output: WlOutput, output: WlOutput,
enc: EncState, enc: StreamingEncoder,
cap: S, cap: S,
screencopy_manager: ZwlrScreencopyManagerV1, screencopy_manager: ZwlrScreencopyManagerV1,
dmabuf: ZwpLinuxDmabufV1, dmabuf: ZwpLinuxDmabufV1,
@@ -174,6 +214,9 @@ pub struct State<S: CaptureSource> {
pub stage: EncConstructionStage<S>, pub stage: EncConstructionStage<S>,
pub in_flight_surface: InFlightSurface<S>, pub in_flight_surface: InFlightSurface<S>,
pub starting_timestamp: Option<i64>, pub starting_timestamp: Option<i64>,
pub stats_start_time: Option<Instant>,
pub stats_last_time: Option<Instant>,
pub stats_frames: u64,
pub first_frame: bool, pub first_frame: bool,
pub args: Args, pub args: Args,
pub errored: bool, pub errored: bool,
@@ -182,6 +225,12 @@ pub struct State<S: CaptureSource> {
pub qhandle: QueueHandle<State<S>>, pub qhandle: QueueHandle<State<S>>,
pub drm_device: Option<PathBuf>, pub drm_device: Option<PathBuf>,
pub drm_device_from_compositor: Option<PathBuf>, pub drm_device_from_compositor: Option<PathBuf>,
pub webrtc: Option<WebRtcState>,
pub webrtc_tx: Option<crossbeam_channel::Sender<Vec<u8>>>,
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>,
webrtc_frames_sent: u64,
webrtc_paused: Option<Arc<AtomicBool>>,
stats: PipelineStats,
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -228,9 +277,20 @@ impl<S: CaptureSource> State<S> {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
impl<S: CaptureSource> State<S> { impl<S: CaptureSource> State<S> {
pub fn new(gm: GlobalList, args: Args, qhandle: QueueHandle<State<S>>) -> Self { pub fn new(gm: GlobalList, args: Args, qhandle: QueueHandle<State<S>>) -> Result<Self> {
let fps = args.fps; let fps = args.fps;
let drm_device = args.drm_device.as_ref().map(PathBuf::from); let drm_device = args.drm_device.as_ref().map(PathBuf::from);
let (webrtc, webrtc_tx, webrtc_rx, webrtc_paused) = if args.port > 0 {
let (tx, rx) = crossbeam_channel::bounded(32);
let wrtc = WebRtcState::new(args.port, args.fps)?;
// paused=true until first WebRTC client connects
let paused = Arc::new(AtomicBool::new(true));
(Some(wrtc), Some(tx), Some(rx), Some(paused))
} else {
(None, None, None, None)
};
let mut state = Self { let mut state = Self {
stage: EncConstructionStage::ProbingOutputs { stage: EncConstructionStage::ProbingOutputs {
outputs: Vec::new(), outputs: Vec::new(),
@@ -247,6 +307,9 @@ impl<S: CaptureSource> State<S> {
}, },
in_flight_surface: InFlightSurface::None, in_flight_surface: InFlightSurface::None,
starting_timestamp: None, starting_timestamp: None,
stats_start_time: None,
stats_last_time: None,
stats_frames: 0,
first_frame: true, first_frame: true,
fps_limit: FpsLimit::new(fps), fps_limit: FpsLimit::new(fps),
args, args,
@@ -255,6 +318,12 @@ impl<S: CaptureSource> State<S> {
qhandle, qhandle,
drm_device, drm_device,
drm_device_from_compositor: None, drm_device_from_compositor: None,
webrtc,
webrtc_tx,
webrtc_rx,
webrtc_frames_sent: 0,
webrtc_paused,
stats: PipelineStats::new(),
}; };
// registry_queue_init consumes registry events internally during its // registry_queue_init consumes registry events internally during its
@@ -262,7 +331,7 @@ impl<S: CaptureSource> State<S> {
// We must manually bind the initial globals here. // We must manually bind the initial globals here.
state.bind_initial_globals(); state.bind_initial_globals();
state Ok(state)
} }
/// Iterate over the GlobalList from registry_queue_init and bind all /// Iterate over the GlobalList from registry_queue_init and bind all
@@ -432,7 +501,7 @@ impl<S: CaptureSource> State<S> {
// is a freshly allocated empty Video frame. // is a freshly allocated empty Video frame.
let ret = unsafe { ffi::av_hwframe_get_buffer(frames_rgb_ctx, surface.as_mut_ptr(), 0) }; let ret = unsafe { ffi::av_hwframe_get_buffer(frames_rgb_ctx, surface.as_mut_ptr(), 0) };
if ret < 0 { if ret < 0 {
tracing::error!("av_hwframe_get_buffer failed: error {}", ret); tracing::error!("av_hwframe_get_buffer failed: {}", crate::avhw::ff_err(ret));
self.errored = true; self.errored = true;
return; return;
} }
@@ -445,7 +514,7 @@ impl<S: CaptureSource> State<S> {
} }
let ret = unsafe { ffi::av_hwframe_map(map_frame.as_mut_ptr(), surface.as_ptr(), 0) }; let ret = unsafe { ffi::av_hwframe_map(map_frame.as_mut_ptr(), surface.as_ptr(), 0) };
if ret < 0 { if ret < 0 {
tracing::error!("av_hwframe_map failed: error {}", ret); tracing::error!("av_hwframe_map failed: {}", crate::avhw::ff_err(ret));
self.errored = true; self.errored = true;
return; return;
} }
@@ -470,7 +539,7 @@ impl<S: CaptureSource> State<S> {
// takes ownership of the fd, and the original fd is owned by map_frame. // takes ownership of the fd, and the original fd is owned by map_frame.
let fd_dup = unsafe { libc::dup(obj.fd) }; let fd_dup = unsafe { libc::dup(obj.fd) };
if fd_dup < 0 { if fd_dup < 0 {
tracing::error!("failed to dup dma-buf fd"); tracing::error!("failed to dup dma-buf fd: {}", std::io::Error::last_os_error());
// wayland-client does not auto-destroy params on Drop. // wayland-client does not auto-destroy params on Drop.
params.destroy(); params.destroy();
self.errored = true; self.errored = true;
@@ -514,6 +583,8 @@ impl<S: CaptureSource> State<S> {
where where
S::Frame: Default, S::Frame: Default,
{ {
self.stats.record_capture();
let (mut surface, _drm_map, frame, buffer) = let (mut surface, _drm_map, frame, buffer) =
match mem::replace(&mut self.in_flight_surface, InFlightSurface::None) { match mem::replace(&mut self.in_flight_surface, InFlightSurface::None) {
InFlightSurface::CopyQueued { InFlightSurface::CopyQueued {
@@ -554,10 +625,29 @@ impl<S: CaptureSource> State<S> {
.is_some() .is_some()
}; };
if should_encode { if should_encode {
let encode_start = Instant::now();
if let Err(e) = enc.encode_frame(&surface) { if let Err(e) = enc.encode_frame(&surface) {
tracing::error!("encode_frame failed: {}", e); tracing::error!("encode_frame failed: {}", e);
self.errored = true; self.errored = true;
} }
let encode_elapsed = encode_start.elapsed().as_micros() as u64;
self.stats.record_encode(&FrameTimings {
total_us: encode_elapsed,
..Default::default()
});
}
self.stats_frames += 1;
if let Some(last) = self.stats_last_time {
if last.elapsed() >= std::time::Duration::from_secs(10) {
let delta = self.stats_frames;
let fps = delta as f64 / last.elapsed().as_secs_f64();
tracing::info!(frames = self.stats_frames, fps = format!("{fps:.1}"), "encoding stats");
self.stats_last_time = Some(std::time::Instant::now());
self.stats_frames = 0;
}
} else {
self.stats_start_time = Some(std::time::Instant::now());
self.stats_last_time = Some(std::time::Instant::now());
} }
} }
@@ -581,6 +671,55 @@ impl<S: CaptureSource> State<S> {
self.errored = true; self.errored = true;
} }
pub fn poll_webrtc(&mut self) -> Result<()> {
let Some(ref mut wrtc) = self.webrtc else { return Ok(()) };
wrtc.handle_signaling()?;
wrtc.poll_and_feed()?;
let connected = wrtc.is_connected();
if let Some(ref paused) = self.webrtc_paused {
let was_paused = paused.load(Ordering::Relaxed);
let now_paused = !connected;
if was_paused && !now_paused {
tracing::info!("WebRTC client connected, resuming encoding");
} else if !was_paused && now_paused {
tracing::warn!("WebRTC client disconnected, pausing encoding");
}
paused.store(now_paused, Ordering::Relaxed);
}
if let Some(ref rx) = self.webrtc_rx {
let mut count = 0u32;
while let Ok(data) = rx.try_recv() {
if !connected {
continue;
}
count += 1;
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
self.stats.record_send(0.0, None);
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
}
if count > 0 {
tracing::debug!("WebRTC forwarded {count} frames from channel");
}
}
if self.args.stats && self.stats.should_snapshot() {
self.stats.set_queue_depths(
0,
self.webrtc_rx.as_ref().map(|r| r.len()).unwrap_or(0),
);
let snap = self.stats.snapshot_and_reset();
tracing::info!("stats: {snap}");
}
Ok(())
}
pub fn negotiate_format(&mut self, format: u32, width: u32, height: u32) { pub fn negotiate_format(&mut self, format: u32, width: u32, height: u32) {
let stage_data = match mem::replace(&mut self.stage, EncConstructionStage::Intermediate) { let stage_data = match mem::replace(&mut self.stage, EncConstructionStage::Intermediate) {
EncConstructionStage::EverythingButFmt { EncConstructionStage::EverythingButFmt {
@@ -611,22 +750,49 @@ impl<S: CaptureSource> State<S> {
.args .args
.bitrate .bitrate
.unwrap_or_else(|| 2 * (width as u64) * (height as u64) * (fps as u64) / 100); .unwrap_or_else(|| 2 * (width as u64) * (height as u64) * (fps as u64) / 100);
let enc = match crate::avhw::create_encoder(
&drm_path, let enc = if let Some(ref tx) = self.webrtc_tx {
Path::new(self.args.output.as_deref().expect("output required for MP4 mode")), let (enc_w, enc_h) =
width, transpose_if_transform_transposed(output_info.transform, width as i32, height as i32);
height, let actual_gop_size = self.args.gop_size.unwrap_or((fps / 2).max(10));
fps, match SwEncState::new_webrtc(
output_info.transform, &drm_path,
self.args.bitrate, width,
self.args.gop_size, height,
Some(hw_device_ctx), enc_w as u32,
) { enc_h as u32,
Ok(enc) => enc, fps,
Err(e) => { bitrate,
tracing::error!("EncState::new failed: {}", e); actual_gop_size,
self.errored = true; tx.clone(),
return; self.webrtc_paused.as_ref().expect("webrtc_paused must exist when webrtc_tx exists").clone(),
) {
Ok(enc) => StreamingEncoder::WebRtc(enc),
Err(e) => {
tracing::error!("SwEncState::new_webrtc failed: {}", e);
self.errored = true;
return;
}
}
} else {
let output_path = self.args.output.as_deref().expect("output required for MP4 mode");
match crate::avhw::create_encoder(
&drm_path,
Path::new(output_path),
width,
height,
fps,
output_info.transform,
self.args.bitrate,
self.args.gop_size,
Some(hw_device_ctx),
) {
Ok(enc) => StreamingEncoder::Mp4(enc),
Err(e) => {
tracing::error!("EncState::new failed: {}", e);
self.errored = true;
return;
}
} }
}; };
tracing::info!( tracing::info!(
@@ -869,7 +1035,6 @@ impl<S: CaptureSource> Dispatch<WlRegistry, GlobalListContents> for State<S> {
qhandle: &QueueHandle<State<S>>, qhandle: &QueueHandle<State<S>>,
) { ) {
use wayland_client::protocol::wl_registry::Event as RegistryEvent; use wayland_client::protocol::wl_registry::Event as RegistryEvent;
tracing::debug!("Dispatch<WlRegistry>::event fired: {:?}", event);
match event { match event {
RegistryEvent::Global { RegistryEvent::Global {

View File

@@ -1,14 +1,17 @@
// 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码 // 采集门户状态模块 —— 通过 PipeWire/DMA-BUF 进行屏幕采集并编码
use std::os::fd::AsRawFd; use std::os::fd::AsRawFd;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{bail, Result}; use anyhow::{bail, Result}; // 错误处理工具
use crate::args::Args; use crate::args::Args; // 命令行参数
use crate::avhw::{self, SwEncState}; use crate::avhw::{self, CpuNv12Frame, SwEncEncode, SwEncImport, SwEncState}; // 软件编码器状态VAAPI 导入 + H.264 编码)
use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame}; use crate::cap_portal::{CapPortal, PwCtrlEvent, PwDmaBufFrame}; // PipeWire 屏幕采集端点
use crate::webrtc::WebRtcState; use crate::stats::{FrameTimings, PipelineStats}; // 管道统计(帧计时、每秒快照)
use crate::webrtc::WebRtcState; // WebRTC 信令与媒体传输
/// 门户采集的阶段状态 /// 门户采集的阶段状态
/// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数 /// - WaitingForFormat: 等待接收到第一帧 DMA-BUF 以确定视频格式参数
@@ -18,25 +21,49 @@ enum PortalStage {
Streaming, Streaming,
} }
struct EncodeThreadTiming {
sws_us: u64,
encode_us: u64,
output_bytes: usize,
}
struct EncodeThread {
handle: Option<std::thread::JoinHandle<()>>,
input_tx: crossbeam_channel::Sender<CpuNv12Frame>,
timing_rx: crossbeam_channel::Receiver<EncodeThreadTiming>,
}
struct WebrtcThread {
handle: Option<std::thread::JoinHandle<()>>,
sent_gap_rx: crossbeam_channel::Receiver<f64>,
}
/// 门户模式的主状态机 /// 门户模式的主状态机
/// ///
/// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。 /// 负责管理从 PipeWire 采集屏幕帧、通过 VAAPI 硬件编码的完整生命周期。
/// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。 /// 工作流程:等待第一帧 → 创建编码器 → 持续编码帧数据。
pub struct StatePortal { pub struct StatePortal {
stage: PortalStage, stage: PortalStage, // 当前采集阶段(等待首帧 / 流式编码中)
enc: Option<SwEncState>, enc: Option<SwEncState>, // 软件编码器,首帧到达后初始化
cap: CapPortal, enc_import: Option<SwEncImport>,
args: Args, enc_thread: Option<EncodeThread>,
errored: bool, cap: CapPortal, // PipeWire 屏幕采集端点
drm_device: Option<PathBuf>, args: Args, // 用户命令行参数
frames_encoded: u64, errored: bool, // 是否遇到不可恢复的错误
start_time: Option<Instant>, drm_device: Option<PathBuf>, // DRM 渲染设备路径(可自动检测)
last_stats_time: Option<Instant>, frames_encoded: u64, // 已编码帧数(用于 PTS 编号)
last_stats_frames: u64, start_time: Option<Instant>, // 编码开始时间
stats: PipelineStats, // 管道统计(窗口化帧计时 + 每秒快照)
pw_dropped_prev: u64, // 上一窗口的 PipeWire 丢弃帧数(用于增量计算)
webrtc: Option<WebRtcState>, webrtc: Option<WebRtcState>,
webrtc_tx: Option<crossbeam_channel::Sender<Vec<u8>>>, webrtc_thread: Option<WebrtcThread>,
webrtc_rx: Option<crossbeam_channel::Receiver<Vec<u8>>>, webrtc_paused: Option<Arc<AtomicBool>>,
webrtc_frames_sent: u64, last_capture_arrival: Option<Instant>, // timestamp of last real frame arrival
stall_start: Option<Instant>, // when current stall began
last_stall_log: Option<Instant>, // rate-limiting for stall warnings
last_fillable_frame: Option<CpuNv12Frame>, // cached last frame for filler duplication
next_filler_at: Option<Instant>, // when to send next filler frame
filler_frames_sent: u64,
} }
impl StatePortal { impl StatePortal {
@@ -53,41 +80,46 @@ impl StatePortal {
let cap = CapPortal::new(&args)?; let cap = CapPortal::new(&args)?;
let (webrtc, webrtc_tx, webrtc_rx) = if args.port > 0 { let (webrtc, webrtc_paused) = if args.port > 0 {
let (tx, rx) = crossbeam_channel::bounded(32);
let wrtc = WebRtcState::new(args.port, args.fps)?; let wrtc = WebRtcState::new(args.port, args.fps)?;
(Some(wrtc), Some(tx), Some(rx)) let paused = Arc::new(AtomicBool::new(true));
(Some(wrtc), Some(paused))
} else { } else {
(None, None, None) (None, None)
}; };
Ok(Self { Ok(Self {
stage: PortalStage::WaitingForFormat, stage: PortalStage::WaitingForFormat,
enc: None, enc: None,
enc_import: None,
enc_thread: None,
cap, cap,
args, args,
errored: false, errored: false,
drm_device, drm_device,
frames_encoded: 0, frames_encoded: 0,
start_time: None, start_time: None,
last_stats_time: None, stats: PipelineStats::new(),
last_stats_frames: 0, pw_dropped_prev: 0,
webrtc, webrtc,
webrtc_tx, webrtc_thread: None,
webrtc_rx, webrtc_paused,
webrtc_frames_sent: 0, last_capture_arrival: None,
stall_start: None,
last_stall_log: None,
last_fillable_frame: None,
next_filler_at: None,
filler_frames_sent: 0,
}) })
} }
/// 轮询 PipeWire 事件并编码帧 /// 轮询 PipeWire 事件并编码帧
/// ///
/// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 10ms /// `block=true` 时使用 recv_timeout 阻塞等待帧(最多 2ms
/// `block=false` 时使用 try_recv 非阻塞检查。 /// `block=false` 时使用 try_recv 非阻塞检查。
/// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。 /// 返回 `Ok(true)` 表示已处理事件,`Ok(false)` 表示暂无数据。
pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> { pub fn poll_and_encode(&mut self, block: bool) -> Result<bool> {
// WebRTC: process signaling, network, and forward encoded frames // 检查 PipeWire 控制事件(流结束 / 错误)
self.poll_webrtc()?;
if let Ok(ctrl) = self.cap.event_receiver().try_recv() { if let Ok(ctrl) = self.cap.event_receiver().try_recv() {
match ctrl { match ctrl {
PwCtrlEvent::StreamEnded => { PwCtrlEvent::StreamEnded => {
@@ -100,23 +132,43 @@ impl StatePortal {
self.errored = true; self.errored = true;
return Ok(true); return Ok(true);
} }
PwCtrlEvent::FormatChanged { width, height } => {
tracing::warn!(
"PipeWire format renegotiation: new dimensions {}x{} — encoder output remains at original resolution",
width,
height
);
// No action yet — VAAPI import/scale handles the conversion.
// Full encoder reinit is a future enhancement.
}
} }
} }
// 根据阻塞模式选择不同的帧接收策略
let frame = if block { let frame = if block {
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(10)) { // 阻塞模式:最多等待 2ms 接收帧
match self.cap.frame_receiver().recv_timeout(std::time::Duration::from_millis(2)) {
Ok(frame) => frame, Ok(frame) => frame,
Err(_) => return Ok(false), Err(_) => {
self.record_capture_timeout();
return Ok(false);
}
} }
} else { } else {
// 非阻塞模式:立即尝试接收,无数据则返回
match self.cap.frame_receiver().try_recv() { match self.cap.frame_receiver().try_recv() {
Ok(frame) => frame, Ok(frame) => frame,
Err(_) => return Ok(false), Err(_) => {
self.record_capture_timeout();
return Ok(false);
}
} }
}; };
self.record_frame_arrival();
match self.stage { match self.stage {
PortalStage::WaitingForFormat => { PortalStage::WaitingForFormat => {
// 首帧到达,记录 DMA-BUF 格式信息
tracing::info!( tracing::info!(
"First DMA-BUF frame: {}x{} format=0x{:08X} stride={} modifier=0x{:X}", "First DMA-BUF frame: {}x{} format=0x{:08X} stride={} modifier=0x{:X}",
frame.width, frame.width,
@@ -126,7 +178,9 @@ impl StatePortal {
frame.modifier frame.modifier
); );
// 自动检测或确认 DRM 设备是否支持导入该帧
let drm_path = self.resolve_drm_device_for_frame(&frame)?; let drm_path = self.resolve_drm_device_for_frame(&frame)?;
// 计算编码目标分辨率(不超过 2560x1440
let (enc_width, enc_height) = portal_encode_dimensions(frame.width, frame.height); let (enc_width, enc_height) = portal_encode_dimensions(frame.width, frame.height);
tracing::info!( tracing::info!(
"Portal software encode target: {}x{} -> {}x{} @ {} fps", "Portal software encode target: {}x{} -> {}x{} @ {} fps",
@@ -136,75 +190,233 @@ impl StatePortal {
enc_height, enc_height,
self.args.fps, self.args.fps,
); );
// 码率:未指定时按分辨率 × 帧率动态计算
let actual_bitrate = self.args.bitrate.unwrap_or_else(|| { let actual_bitrate = self.args.bitrate.unwrap_or_else(|| {
2 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100 5 * (enc_width as u64) * (enc_height as u64) * (self.args.fps as u64) / 100
}); });
// GOP 大小WebRTC 模式使用更小的 GOPfps/2最低10MP4 模式使用 fps
let actual_gop_size = self.args.gop_size.unwrap_or_else(|| { let actual_gop_size = self.args.gop_size.unwrap_or_else(|| {
if self.webrtc_tx.is_some() { if self.webrtc.is_some() {
(self.args.fps / 2).max(10) (self.args.fps / 2).max(10)
} else { } else {
self.args.fps self.args.fps
} }
}); });
let enc = if let Some(ref tx) = self.webrtc_tx { // 根据是否启用 WebRTC 选择不同的编码器构造方式
avhw::SwEncState::new_webrtc( if self.webrtc.is_some() {
let paused = self.webrtc_paused.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: webrtc_paused missing while WebRTC mode is active"))?;
let import = SwEncImport::new(
&drm_path, &drm_path,
frame.width, frame.width,
frame.height, frame.height,
enc_width, enc_width,
enc_height, enc_height,
self.args.fps, self.args.fps,
actual_bitrate, )?;
actual_gop_size, let (webrtc_tx, webrtc_rx) = crossbeam_channel::bounded(2);
tx.clone(), let (input_tx, input_rx) = crossbeam_channel::bounded::<CpuNv12Frame>(1);
)? let (timing_tx, timing_rx) = crossbeam_channel::bounded::<EncodeThreadTiming>(32);
} else { let encode = SwEncEncode::new_webrtc(
avhw::SwEncState::new(
&drm_path,
std::path::Path::new(self.args.output.as_deref().expect("output required for MP4 mode")),
frame.width,
frame.height,
enc_width, enc_width,
enc_height, enc_height,
self.args.fps, self.args.fps,
actual_bitrate, actual_bitrate,
actual_gop_size, actual_gop_size,
)? webrtc_tx,
}; paused.clone(),
)?;
let handle = std::thread::Builder::new()
.name("wl-webrtc-encode".into())
.spawn(move || encode_thread_loop(encode, input_rx, timing_tx))?;
self.enc_import = Some(import);
self.enc_thread = Some(EncodeThread { handle: Some(handle), input_tx, timing_rx });
self.enc = Some(enc); let wrtc = self.webrtc.take()
self.stage = PortalStage::Streaming; .ok_or_else(|| anyhow::anyhow!("internal: WebRtcState missing during init"))?;
let paused = self.webrtc_paused.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal: webrtc_paused missing"))?
.clone();
let fps = self.args.fps;
let (sent_gap_tx, sent_gap_rx) = crossbeam_channel::bounded(64);
let webrtc_handle = std::thread::Builder::new()
.name("wl-webrtc-webrtc".into())
.spawn(move || webrtc_thread_loop(wrtc, webrtc_rx, fps, paused, sent_gap_tx))?;
self.webrtc_thread = Some(WebrtcThread { handle: Some(webrtc_handle), sent_gap_rx });
} else {
// MP4 模式:编码输出写入文件
let output_path = self.args.output.as_deref()
.ok_or_else(|| anyhow::anyhow!("--output is required in MP4 file output mode; use --port > 0 for WebRTC mode"))?;
let enc = avhw::SwEncState::new(
&drm_path,
std::path::Path::new(output_path),
frame.width,
frame.height,
enc_width,
enc_height,
self.args.fps,
actual_bitrate,
actual_gop_size,
)?;
self.enc = Some(enc);
};
self.stage = PortalStage::Streaming; // 切换到流式编码阶段
self.start_time = Some(Instant::now()); self.start_time = Some(Instant::now());
self.last_stats_time = Some(Instant::now());
tracing::info!("First frame processed, encoder initialized, transitioning to Streaming"); tracing::info!("First frame processed, encoder initialized, transitioning to Streaming");
drop(frame); drop(frame); // 首帧仅用于初始化,不参与编码
} }
PortalStage::Streaming => { PortalStage::Streaming => {
// 记录采集帧到达(用于 capture gap 和 capture_fps 统计)
self.stats.record_capture();
self.last_capture_arrival = Some(Instant::now());
// 流式编码阶段:直接处理帧
self.handle_pw_frame(frame)?; self.handle_pw_frame(frame)?;
} }
} }
// WebRTC: drain encoded frames produced by this poll before returning. // 每秒输出一次结构化管道统计(仅 --stats 启用时记录日志)
self.poll_webrtc()?; if self.args.stats && self.stats.should_snapshot() {
self.stats.set_pipewire_dropped(0, 0);
self.stats.set_queue_depths(0, 0);
if let Some(ref enc_thread) = self.enc_thread {
while let Ok(timing) = enc_thread.timing_rx.try_recv() {
self.stats.record_encode_thread(
timing.sws_us,
timing.encode_us,
timing.output_bytes,
);
}
}
if let Some(ref webrtc_thread) = self.webrtc_thread {
while let Ok(gap_ms) = webrtc_thread.sent_gap_rx.try_recv() {
self.stats.record_send_from_thread(gap_ms);
}
}
let snap = self.stats.snapshot_and_reset();
if self.filler_frames_sent > 0 {
tracing::info!("stats: {snap} filler_frames_sent={}", self.filler_frames_sent);
} else {
tracing::info!("stats: {snap}");
}
}
Ok(true) Ok(true)
} }
fn record_capture_timeout(&mut self) {
let Some(last_capture_arrival) = self.last_capture_arrival else {
return;
};
let now = Instant::now();
let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1)));
let stall_threshold = Duration::from_millis(100).max(frame_interval * 3);
if now.duration_since(last_capture_arrival) <= stall_threshold {
return;
}
if self.stall_start.is_none() {
self.stall_start = Some(now);
self.last_stall_log = Some(now);
tracing::warn!("compositor frame delivery stalled");
} else {
let should_log = self
.last_stall_log
.map_or(true, |last_log| now.duration_since(last_log) >= Duration::from_secs(1));
if should_log {
self.last_stall_log = Some(now);
tracing::warn!("compositor frame delivery stalled");
}
}
self.maybe_send_filler_frame();
}
fn maybe_send_filler_frame(&mut self) {
if self.webrtc_thread.is_none() || self.stall_start.is_none() {
return;
}
let Some(cached) = &self.last_fillable_frame else {
return;
};
const MAX_FILLER_DURATION: Duration = Duration::from_secs(2);
if let Some(stall_start) = self.stall_start {
if stall_start.elapsed() > MAX_FILLER_DURATION {
return;
}
}
let now = Instant::now();
let frame_interval = Duration::from_secs_f64(1.0 / f64::from(self.args.fps.max(1)));
let Some(next) = self.next_filler_at else {
self.next_filler_at = Some(now + frame_interval);
return;
};
if now < next {
return;
}
let filler = CpuNv12Frame {
y_data: cached.y_data.clone(),
uv_data: cached.uv_data.clone(),
y_stride: cached.y_stride,
uv_stride: cached.uv_stride,
pts: self.frames_encoded as i64,
};
if let Some(enc_thread) = &self.enc_thread {
match enc_thread.input_tx.try_send(filler) {
Ok(()) => {
self.frames_encoded += 1;
self.filler_frames_sent += 1;
self.next_filler_at = Some(next + frame_interval);
}
Err(crossbeam_channel::TrySendError::Full(_)) => {}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
tracing::error!("Encode thread disconnected during filler");
self.errored = true;
}
}
}
}
fn record_frame_arrival(&mut self) {
if let Some(stall_start) = self.stall_start.take() {
tracing::info!(
"compositor frame delivery resumed after {:.0}ms",
stall_start.elapsed().as_secs_f64() * 1000.0
);
self.last_stall_log = None;
}
self.last_capture_arrival = Some(Instant::now());
self.next_filler_at = None;
}
/// 为当前帧解析可用的 DRM 渲染设备
///
/// 如果用户已通过 `--drm-device` 指定设备,直接返回;
/// 否则遍历系统中所有 DRM render node逐个尝试导入 DMA-BUF 帧来找到兼容设备。
fn resolve_drm_device_for_frame(&mut self, frame: &PwDmaBufFrame) -> Result<PathBuf> { fn resolve_drm_device_for_frame(&mut self, frame: &PwDmaBufFrame) -> Result<PathBuf> {
// 用户已显式指定 DRM 设备,直接使用
if let Some(ref drm) = self.drm_device { if let Some(ref drm) = self.drm_device {
return Ok(drm.clone()); return Ok(drm.clone());
} }
// 查找系统中所有 DRM render node如 /dev/dri/renderD128
let candidates = crate::state::find_drm_render_nodes(); let candidates = crate::state::find_drm_render_nodes();
if candidates.is_empty() { if candidates.is_empty() {
bail!("No DRM render device found. Specify --drm-device."); bail!("No DRM render device found. Specify --drm-device.");
} }
// 逐个尝试导入 DMA-BUF 帧,找到第一个兼容的设备
let mut failures = Vec::new(); let mut failures = Vec::new();
for candidate in &candidates { for candidate in &candidates {
match crate::avhw::test_dma_buf_import(candidate, frame) { match crate::avhw::test_dma_buf_import(candidate, frame) {
Ok(()) => { Ok(()) => {
// 成功导入,缓存检测结果并返回
tracing::info!( tracing::info!(
"Auto-detected DRM device: {} (tested {} candidates)", "Auto-detected DRM device: {} (tested {} candidates)",
candidate.display(), candidate.display(),
@@ -214,6 +426,7 @@ impl StatePortal {
return Ok(candidate.clone()); return Ok(candidate.clone());
} }
Err(e) => { Err(e) => {
// 导入失败,记录原因,继续尝试下一个设备
tracing::debug!( tracing::debug!(
"DRM device {} cannot import DMA-BUF: {e}", "DRM device {} cannot import DMA-BUF: {e}",
candidate.display(), candidate.display(),
@@ -223,8 +436,8 @@ impl StatePortal {
} }
} }
// 所有候选设备均失败,返回详细错误信息
bail!( bail!(
"No DRM render device can import the DMA-BUF frame. Tried: {}",
failures failures
.into_iter() .into_iter()
.map(|(p, e)| format!("{} ({e})", p.display())) .map(|(p, e)| format!("{} ({e})", p.display()))
@@ -238,44 +451,92 @@ impl StatePortal {
/// 通过 `av_hwframe_map` 零拷贝导入 VAAPI然后交给 SwEncState 完成: /// 通过 `av_hwframe_map` 零拷贝导入 VAAPI然后交给 SwEncState 完成:
/// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。 /// scale_vaapi GPU 缩放、2K NV12 回读、YUV420P 格式转换、软件 H.264 编码。
fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> { fn handle_pw_frame(&mut self, frame: PwDmaBufFrame) -> Result<()> {
let enc = match self.enc.as_mut() { let t_import_start = Instant::now();
Some(enc) => enc,
None => bail!("encoder not initialized"),
};
let mut vaapi_frame = unsafe {
avhw::import_dma_buf_to_vaapi(
enc.frames_rgb().as_ptr(),
frame.fd.as_raw_fd(),
frame.width,
frame.height,
frame.format,
frame.modifier,
frame.stride,
frame.offset,
)
}?;
let pts = self.frames_encoded as i64; let pts = self.frames_encoded as i64;
unsafe {
(*vaapi_frame.as_mut_ptr()).pts = pts;
}
enc.encode_frame(&vaapi_frame)?; if let Some(enc) = self.enc.as_mut() {
self.frames_encoded += 1; // 将 DMA-BUF 帧零拷贝导入 VAAPI 硬件帧池
let mut vaapi_frame = unsafe {
avhw::import_dma_buf_to_vaapi(
enc.frames_rgb().as_ptr(),
frame.fd.as_raw_fd(),
frame.width,
frame.height,
frame.format,
frame.modifier,
frame.stride,
frame.offset,
)
}?;
if let Some(last) = self.last_stats_time { let import_us = t_import_start.elapsed().as_micros() as u64;
if last.elapsed() >= Duration::from_secs(10) { let t_encode_start = Instant::now();
let delta_frames = self.frames_encoded - self.last_stats_frames;
let delta_secs = last.elapsed().as_secs_f64(); // 设置帧的显示时间戳PTS基于已编码帧序号
let fps = delta_frames as f64 / delta_secs; unsafe {
tracing::info!( (*vaapi_frame.as_mut_ptr()).pts = pts;
"encoded={}, fps={fps:.1}",
self.frames_encoded,
);
self.last_stats_time = Some(Instant::now());
self.last_stats_frames = self.frames_encoded;
} }
// 送入编码器完成:缩放 → 回读 → 格式转换 → H.264 编码
enc.encode_frame(&vaapi_frame)?;
let total_us = t_import_start.elapsed().as_micros() as u64;
let encode_us = t_encode_start.elapsed().as_micros() as u64;
self.frames_encoded += 1;
// 记录帧计时到管道统计import + encode 内部各阶段暂不可分离,用 total 覆盖)
let timings = FrameTimings {
import_us,
encode_us,
total_us,
..Default::default()
};
self.stats.record_encode(&timings);
} else if let Some(import) = self.enc_import.as_mut() {
let mut vaapi_frame = unsafe {
avhw::import_dma_buf_to_vaapi(
import.frames_rgb().as_ptr(),
frame.fd.as_raw_fd(),
frame.width,
frame.height,
frame.format,
frame.modifier,
frame.stride,
frame.offset,
)
}?;
unsafe {
(*vaapi_frame.as_mut_ptr()).pts = pts;
}
let cpu_nv12 = import.import_and_scale(&vaapi_frame)?;
let import_us = t_import_start.elapsed().as_micros() as u64;
self.stats.record_import(import_us);
let enc_thread = self.enc_thread.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: encode thread missing while async import is active"))?;
let fillable_frame = CpuNv12Frame {
y_data: cpu_nv12.y_data.clone(),
uv_data: cpu_nv12.uv_data.clone(),
y_stride: cpu_nv12.y_stride,
uv_stride: cpu_nv12.uv_stride,
pts: 0,
};
match enc_thread.input_tx.try_send(cpu_nv12) {
Ok(()) => {
self.frames_encoded += 1;
self.last_fillable_frame = Some(fillable_frame);
}
Err(crossbeam_channel::TrySendError::Full(_)) => {
tracing::debug!("Encode thread input full, dropping portal frame");
}
Err(crossbeam_channel::TrySendError::Disconnected(_frame)) => {
tracing::error!("Encode thread input disconnected");
self.errored = true;
}
}
} else {
bail!("encoder not initialized");
} }
Ok(()) Ok(())
@@ -285,6 +546,26 @@ impl StatePortal {
/// ///
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
pub fn shutdown(&mut self) { pub fn shutdown(&mut self) {
self.last_fillable_frame = None;
// 1. Stop encode thread (drops webrtc_tx → signals WebRTC thread to exit)
if let Some(mut enc_thread) = self.enc_thread.take() {
drop(enc_thread.input_tx);
if let Some(handle) = enc_thread.handle.take() {
if handle.join().is_err() {
tracing::error!("Encode thread panicked during shutdown");
}
}
}
self.enc_import = None;
// 2. Wait for WebRTC thread (exits when webrtc_tx is dropped by encode thread)
if let Some(mut webrtc_thread) = self.webrtc_thread.take() {
if let Some(handle) = webrtc_thread.handle.take() {
if handle.join().is_err() {
tracing::error!("WebRTC thread panicked during shutdown");
}
}
}
// 3. Flush MP4 encoder if present
if let Some(mut enc) = self.enc.take() { if let Some(mut enc) = self.enc.take() {
if let Err(e) = enc.flush() { if let Err(e) = enc.flush() {
tracing::error!("Flush error during shutdown: {e}"); tracing::error!("Flush error during shutdown: {e}");
@@ -309,49 +590,142 @@ impl StatePortal {
pub fn is_errored(&self) -> bool { pub fn is_errored(&self) -> bool {
self.errored self.errored
} }
}
fn poll_webrtc(&mut self) -> Result<()> { fn encode_thread_loop(
let Some(ref mut wrtc) = self.webrtc else { return Ok(()); }; mut encode: SwEncEncode,
input_rx: crossbeam_channel::Receiver<CpuNv12Frame>,
wrtc.handle_signaling()?; timing_tx: crossbeam_channel::Sender<EncodeThreadTiming>,
wrtc.poll_and_feed()?; ) {
loop {
if let Some(ref rx) = self.webrtc_rx { match input_rx.recv() {
let mut count = 0u32; Ok(frame) => {
while let Ok(data) = rx.try_recv() { let t_start = Instant::now();
count += 1; match encode.encode_cpu_frame(&frame) {
if let Err(e) = wrtc.write_h264_frame(&data, self.webrtc_frames_sent, self.args.fps) { Ok(()) => {
tracing::debug!("WebRTC write frame error: {e}"); let elapsed = t_start.elapsed().as_micros() as u64;
let _ = timing_tx.try_send(EncodeThreadTiming {
sws_us: 0,
encode_us: elapsed,
output_bytes: 0,
});
}
Err(e) => {
tracing::error!("Encode thread error: {e}");
break;
}
} }
self.webrtc_frames_sent = self.webrtc_frames_sent.saturating_add(1);
} }
if count > 0 { Err(_) => {
tracing::info!("WebRTC forwarded {count} frames from channel"); tracing::info!("Encode thread input closed, flushing encoder");
if let Err(e) = encode.flush() {
tracing::error!("Encode thread flush error: {e}");
}
break;
} }
} }
Ok(())
} }
tracing::info!("Encode thread exiting");
}
fn webrtc_thread_loop(
mut wrtc: WebRtcState,
webrtc_rx: crossbeam_channel::Receiver<Vec<u8>>,
fps: u32,
paused: Arc<AtomicBool>,
sent_gap_tx: crossbeam_channel::Sender<f64>,
) {
let mut frames_sent: u64 = 0;
let mut last_send: Option<std::time::Instant> = None;
let timeout = Duration::from_millis(1);
loop {
if let Err(e) = wrtc.handle_signaling() {
tracing::error!("WebRTC signaling error: {e}");
break;
}
if let Err(e) = wrtc.poll_and_feed() {
tracing::error!("WebRTC poll error: {e}");
break;
}
let connected = wrtc.is_connected();
let was_paused = paused.load(Ordering::Relaxed);
let now_paused = !connected;
if was_paused && !now_paused {
tracing::info!("WebRTC client connected, resuming encoding");
} else if !was_paused && now_paused {
tracing::warn!("WebRTC client disconnected, pausing encoding");
}
paused.store(now_paused, Ordering::Relaxed);
if connected {
while let Ok(data) = webrtc_rx.try_recv() {
if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
frames_sent = frames_sent.saturating_add(1);
let gap_ms = last_send
.map(|l| l.elapsed().as_secs_f64() * 1000.0)
.unwrap_or(0.0);
last_send = Some(std::time::Instant::now());
let _ = sent_gap_tx.try_send(gap_ms);
}
} else {
while webrtc_rx.try_recv().is_ok() {}
}
match webrtc_rx.recv_timeout(timeout) {
Ok(data) => {
if wrtc.is_connected() {
if let Err(e) = wrtc.write_h264_frame(&data, frames_sent, fps) {
tracing::debug!("WebRTC write frame error: {e}");
}
frames_sent = frames_sent.saturating_add(1);
let gap_ms = last_send
.map(|l| l.elapsed().as_secs_f64() * 1000.0)
.unwrap_or(0.0);
last_send = Some(std::time::Instant::now());
let _ = sent_gap_tx.try_send(gap_ms);
}
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
tracing::info!("WebRTC channel disconnected, exiting thread");
return;
}
}
}
tracing::info!("WebRTC thread exiting");
} }
impl Drop for StatePortal { impl Drop for StatePortal {
// 析构时自动调用 shutdown确保编码器被刷新、资源被释放
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(); self.shutdown();
} }
} }
/// 计算编码目标分辨率
///
/// 将原始分辨率等比缩放至不超过 2560×14402K并确保宽高为偶数
/// H.264 编码要求偶数尺寸)。
fn portal_encode_dimensions(width: u32, height: u32) -> (u32, u32) { fn portal_encode_dimensions(width: u32, height: u32) -> (u32, u32) {
const TARGET_W: u32 = 2560; const TARGET_W: u32 = 2560; // 目标最大宽度
const TARGET_H: u32 = 1440; const TARGET_H: u32 = 1440; // 目标最大高度
// 原始分辨率已在 2K 以内,直接对齐偶数
if width <= TARGET_W && height <= TARGET_H { if width <= TARGET_W && height <= TARGET_H {
return (width & !1, height & !1); return (width & !1, height & !1); // & !1 确保为偶数
} }
// 按宽度限制等比缩放
let width_limited_h = ((height as u64) * (TARGET_W as u64) / (width as u64)) as u32; let width_limited_h = ((height as u64) * (TARGET_W as u64) / (width as u64)) as u32;
if width_limited_h <= TARGET_H { if width_limited_h <= TARGET_H {
(TARGET_W & !1, width_limited_h & !1) (TARGET_W & !1, width_limited_h & !1)
} else { } else {
// 按高度限制等比缩放
let height_limited_w = ((width as u64) * (TARGET_H as u64) / (height as u64)) as u32; let height_limited_w = ((width as u64) * (TARGET_H as u64) / (height as u64)) as u32;
(height_limited_w & !1, TARGET_H & !1) (height_limited_w & !1, TARGET_H & !1)
} }
@@ -367,19 +741,23 @@ fn resolve_drm_device(args: &Args) -> Result<Option<PathBuf>> {
Ok(None) Ok(None)
} }
/// 构建测试用的 AVDRMFrameDescriptor仅测试用途
///
/// 将 PwDmaBufFrame 转换为 FFmpeg 的 DRM 帧描述符结构体,
/// 用于验证 DMA-BUF 元数据映射的正确性。
#[cfg(test)] #[cfg(test)]
fn build_drm_descriptor(frame: &PwDmaBufFrame) -> ffmpeg_next::ffi::AVDRMFrameDescriptor { fn build_drm_descriptor(frame: &PwDmaBufFrame) -> ffmpeg_next::ffi::AVDRMFrameDescriptor {
let mut desc: ffmpeg_next::ffi::AVDRMFrameDescriptor = unsafe { std::mem::zeroed() }; let mut desc: ffmpeg_next::ffi::AVDRMFrameDescriptor = unsafe { std::mem::zeroed() };
desc.nb_objects = 1; desc.nb_objects = 1; // 单个 DMA-BUF 对象
desc.objects[0].fd = frame.fd.as_raw_fd(); desc.objects[0].fd = frame.fd.as_raw_fd(); // DMA-BUF 文件描述符
desc.objects[0].size = 0; desc.objects[0].size = 0; // 大小设为 0内核自动确定
desc.objects[0].format_modifier = frame.modifier; desc.objects[0].format_modifier = frame.modifier; // DRM 格式修饰符如线性、tiled
desc.nb_layers = 1; desc.nb_layers = 1; // 单层
desc.layers[0].format = frame.format; desc.layers[0].format = frame.format; // 像素格式(如 XR24
desc.layers[0].nb_planes = 1; desc.layers[0].nb_planes = 1; // 单平面
desc.layers[0].planes[0].object_index = 0; desc.layers[0].planes[0].object_index = 0; // 指向第 0 个对象
desc.layers[0].planes[0].offset = frame.offset as isize; desc.layers[0].planes[0].offset = frame.offset as isize; // 帧数据偏移
desc.layers[0].planes[0].pitch = frame.stride as isize; desc.layers[0].planes[0].pitch = frame.stride as isize; // 行跨度stride
desc desc
} }
@@ -391,15 +769,16 @@ mod tests {
/// 创建测试用的 DMA-BUF 帧数据(使用 stderr fd 的副本作为占位) /// 创建测试用的 DMA-BUF 帧数据(使用 stderr fd 的副本作为占位)
fn make_test_frame() -> PwDmaBufFrame { fn make_test_frame() -> PwDmaBufFrame {
// Create a dummy fd from stderr (always valid fd 2) // Create a dummy fd from stderr (always valid fd 2)
// 使用 stderrfd 2的副本作为虚拟文件描述符
let fd = unsafe { OwnedFd::from_raw_fd(libc::dup(2)) }; let fd = unsafe { OwnedFd::from_raw_fd(libc::dup(2)) };
PwDmaBufFrame { PwDmaBufFrame {
fd, fd,
offset: 0, offset: 0,
stride: 1920 * 4, stride: 1920 * 4, // 每行 1920 像素 × 4 字节XRGB
modifier: 0, // DRM_FORMAT_MOD_LINEAR modifier: 0, // DRM_FORMAT_MOD_LINEAR(线性布局)
width: 1920, width: 1920,
height: 1080, height: 1080,
format: 0x34325258, // XR24 little-endian format: 0x34325258, // XR24 little-endianXRGB8888
pts: 12345, pts: 12345,
} }
} }
@@ -424,7 +803,7 @@ mod tests {
#[test] #[test]
fn resolve_drm_device_explicit() { fn resolve_drm_device_explicit() {
let args = Args { let args = Args {
output: "test.mp4".to_string(), output: Some("test.mp4".to_string()),
output_name: None, output_name: None,
fps: 30, fps: 30,
codec: "h264".to_string(), codec: "h264".to_string(),
@@ -435,6 +814,8 @@ mod tests {
verbose: false, verbose: false,
backend: None, backend: None,
port: 0, port: 0,
no_persist: false,
stats: false,
}; };
let result = resolve_drm_device(&args).unwrap(); let result = resolve_drm_device(&args).unwrap();
assert_eq!( assert_eq!(
@@ -446,7 +827,7 @@ mod tests {
#[test] #[test]
fn resolve_drm_device_none_when_not_specified() { fn resolve_drm_device_none_when_not_specified() {
let args = Args { let args = Args {
output: "test.mp4".to_string(), output: Some("test.mp4".to_string()),
output_name: None, output_name: None,
fps: 30, fps: 30,
codec: "h264".to_string(), codec: "h264".to_string(),
@@ -457,18 +838,21 @@ mod tests {
verbose: false, verbose: false,
backend: None, backend: None,
port: 0, port: 0,
no_persist: false,
stats: false,
}; };
let result = resolve_drm_device(&args).unwrap(); let result = resolve_drm_device(&args).unwrap();
assert_eq!(result, None); assert_eq!(result, None);
} }
/// 测试:使用自定义偏移量和 stride 构建 DRM 描述符
#[test] #[test]
fn build_drm_descriptor_custom_offset_and_stride() { fn build_drm_descriptor_custom_offset_and_stride() {
let frame = PwDmaBufFrame { let frame = PwDmaBufFrame {
fd: unsafe { OwnedFd::from_raw_fd(libc::dup(2)) }, fd: unsafe { OwnedFd::from_raw_fd(libc::dup(2)) },
offset: 4096, offset: 4096, // 4KB 对齐偏移
stride: 3840 * 4, stride: 3840 * 4, // 4K 宽度 × 4 字节
modifier: 0x0100000000000001, // AMD modifiers modifier: 0x0100000000000001, // AMD modifiers
width: 3840, width: 3840,
height: 2160, height: 2160,
format: 0x34325258, format: 0x34325258,
@@ -482,4 +866,46 @@ mod tests {
assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4); assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4);
} }
// ── issue #8 regression ──
#[test]
fn try_send_full_channel_returns_full_not_block() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
tx.send(vec![1]).unwrap();
tx.send(vec![2]).unwrap();
assert!(matches!(
tx.try_send(vec![3]),
Err(crossbeam_channel::TrySendError::Full(_))
));
assert_eq!(rx.len(), 2);
}
#[test]
fn try_send_after_rx_dropped_returns_disconnected() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
drop(rx);
assert!(matches!(
tx.try_send(vec![1]),
Err(crossbeam_channel::TrySendError::Disconnected(_))
));
}
// given: full bounded channel
// when: rx is dropped, then try_send
// expect: Disconnected, not blocking
#[test]
fn shutdown_rx_drop_prevents_deadlock_on_full_channel() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
tx.send(vec![1]).unwrap();
tx.send(vec![2]).unwrap();
drop(rx);
assert!(matches!(
tx.try_send(vec![3]),
Err(crossbeam_channel::TrySendError::Disconnected(_))
));
}
} }

500
src/stats.rs Normal file
View File

@@ -0,0 +1,500 @@
// stats.rs — Lightweight windowed pipeline statistics for stutter diagnosis
//
// Tracks per-second snapshots of capture/encode/send pipeline metrics.
// Designed for low overhead: only counters and timing samples are collected,
// with one structured log line emitted per second when `--stats` is enabled.
use std::time::Instant;
/// Per-stage timing for a single encode pipeline frame.
///
/// All values are in microseconds. The caller records timestamps around
/// each stage and passes the deltas to [`PipelineStats::record_frame`].
#[derive(Debug, Default)]
pub struct FrameTimings {
/// DMA-BUF import (av_hwframe_map)
pub import_us: u64,
/// GPU scale (scale_vaapi filter)
pub scale_us: u64,
/// GPU→CPU transfer (av_hwframe_transfer_data)
pub transfer_us: u64,
/// sws_scale NV12→YUV420P
pub sws_us: u64,
/// H.264 encode (avcodec_send_frame + receive_packet)
pub encode_us: u64,
/// Wall-clock total for this frame (import through encode output)
pub total_us: u64,
/// Encoded output size in bytes
pub output_bytes: usize,
}
/// Windowed statistics aggregator for the encode/send pipeline.
///
/// Collects counters and timing samples within a one-second window,
/// then computes avg/p95/max when the snapshot is taken.
pub struct PipelineStats {
// --- counters (reset each window) ---
capture_frames: u64,
encoded_frames: u64,
sent_frames: u64,
pipewire_dropped: u64,
over_budget_count: u64,
// --- queue depth at last observation ---
capture_queue_depth: usize,
encoded_queue_depth: usize,
// --- timing samples ---
capture_gaps_ms: Vec<f64>,
encoded_gaps_ms: Vec<f64>,
sent_gaps_ms: Vec<f64>,
frame_age_ms: Vec<f64>,
send_wait_ms: Vec<f64>,
// --- per-stage timing (microseconds) ---
import_us: Vec<u64>,
scale_us: Vec<u64>,
transfer_us: Vec<u64>,
sws_us: Vec<u64>,
encode_us: Vec<u64>,
total_us: Vec<u64>,
output_bytes: Vec<usize>,
// --- timing state ---
last_capture_time: Option<Instant>,
last_encode_time: Option<Instant>,
last_send_time: Option<Instant>,
window_start: Instant,
}
impl PipelineStats {
pub fn new() -> Self {
Self {
capture_frames: 0,
encoded_frames: 0,
sent_frames: 0,
pipewire_dropped: 0,
over_budget_count: 0,
capture_queue_depth: 0,
encoded_queue_depth: 0,
capture_gaps_ms: Vec::new(),
encoded_gaps_ms: Vec::new(),
sent_gaps_ms: Vec::new(),
frame_age_ms: Vec::new(),
send_wait_ms: Vec::new(),
import_us: Vec::new(),
scale_us: Vec::new(),
transfer_us: Vec::new(),
sws_us: Vec::new(),
encode_us: Vec::new(),
total_us: Vec::new(),
output_bytes: Vec::new(),
last_capture_time: None,
last_encode_time: None,
last_send_time: None,
window_start: Instant::now(),
}
}
/// Record that a capture frame was received from PipeWire.
pub fn record_capture(&mut self) {
let now = Instant::now();
if let Some(last) = self.last_capture_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.capture_gaps_ms.push(gap_ms);
}
self.last_capture_time = Some(now);
self.capture_frames += 1;
}
/// Record that a frame completed encoding with the given timings.
pub fn record_encode(&mut self, timings: &FrameTimings) {
let now = Instant::now();
if let Some(last) = self.last_encode_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.encoded_gaps_ms.push(gap_ms);
}
self.last_encode_time = Some(now);
self.encoded_frames += 1;
self.import_us.push(timings.import_us);
self.scale_us.push(timings.scale_us);
self.transfer_us.push(timings.transfer_us);
self.sws_us.push(timings.sws_us);
self.encode_us.push(timings.encode_us);
self.total_us.push(timings.total_us);
self.output_bytes.push(timings.output_bytes);
}
pub fn record_import(&mut self, import_us: u64) {
self.import_us.push(import_us);
}
pub fn record_encode_thread(&mut self, sws_us: u64, encode_us: u64, output_bytes: usize) {
let now = Instant::now();
if let Some(last) = self.last_encode_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.encoded_gaps_ms.push(gap_ms);
}
self.last_encode_time = Some(now);
self.encoded_frames += 1;
self.sws_us.push(sws_us);
self.encode_us.push(encode_us);
self.total_us.push(sws_us.saturating_add(encode_us));
self.output_bytes.push(output_bytes);
}
/// Record that a frame was sent via WebRTC.
/// `wait_ms` is time spent blocked waiting to send into the channel.
/// `capture_time` is when the frame was originally captured (for frame age).
pub fn record_send(&mut self, wait_ms: f64, capture_time: Option<Instant>) {
let now = Instant::now();
if let Some(last) = self.last_send_time {
let gap_ms = last.elapsed().as_secs_f64() * 1000.0;
self.sent_gaps_ms.push(gap_ms);
}
self.last_send_time = Some(now);
self.sent_frames += 1;
if wait_ms > 0.0 {
self.send_wait_ms.push(wait_ms);
}
if let Some(ct) = capture_time {
let age_ms = ct.elapsed().as_secs_f64() * 1000.0;
self.frame_age_ms.push(age_ms);
}
}
/// Record a frame sent from a background WebRTC thread.
/// `gap_ms` is the pre-computed time since the previous send (0.0 = first frame).
/// Unlike `record_send`, this does not sample `Instant::now()`, so it remains
/// accurate even when batch-drained at stats snapshot time.
pub fn record_send_from_thread(&mut self, gap_ms: f64) {
if gap_ms > 0.0 {
self.sent_gaps_ms.push(gap_ms);
}
self.sent_frames += 1;
}
/// Update PipeWire dropped counter (absolute value from AtomicU64).
pub fn set_pipewire_dropped(&mut self, total_dropped: u64, prev_dropped: u64) {
self.pipewire_dropped = total_dropped.saturating_sub(prev_dropped);
}
/// Update queue depth observations.
pub fn set_queue_depths(&mut self, capture: usize, encoded: usize) {
self.capture_queue_depth = capture;
self.encoded_queue_depth = encoded;
}
/// Record that a frame exceeded its time budget.
pub fn record_over_budget(&mut self) {
self.over_budget_count += 1;
}
/// Returns true if at least 1 second has elapsed since the last snapshot
/// (or since creation). If true, call `snapshot_and_reset` to get the stats.
pub fn should_snapshot(&self) -> bool {
self.window_start.elapsed().as_secs() >= 1
}
/// Compute a snapshot of the current window and reset all counters.
pub fn snapshot_and_reset(&mut self) -> StatsSnapshot {
let elapsed = self.window_start.elapsed().as_secs_f64();
let snap = StatsSnapshot {
elapsed_secs: elapsed,
capture_fps: self.capture_frames as f64 / elapsed,
encoded_fps: self.encoded_frames as f64 / elapsed,
sent_fps: self.sent_frames as f64 / elapsed,
capture_frames: self.capture_frames,
encoded_frames: self.encoded_frames,
sent_frames: self.sent_frames,
pipewire_dropped: self.pipewire_dropped,
over_budget_count: self.over_budget_count,
capture_queue_depth: self.capture_queue_depth,
encoded_queue_depth: self.encoded_queue_depth,
capture_gap_avg_ms: avg_f64(&self.capture_gaps_ms),
capture_gap_p95_ms: p95_f64(&self.capture_gaps_ms),
capture_gap_max_ms: max_f64(&self.capture_gaps_ms),
encoded_gap_avg_ms: avg_f64(&self.encoded_gaps_ms),
encoded_gap_p95_ms: p95_f64(&self.encoded_gaps_ms),
encoded_gap_max_ms: max_f64(&self.encoded_gaps_ms),
sent_gap_avg_ms: avg_f64(&self.sent_gaps_ms),
sent_gap_p95_ms: p95_f64(&self.sent_gaps_ms),
sent_gap_max_ms: max_f64(&self.sent_gaps_ms),
frame_age_avg_ms: avg_f64(&self.frame_age_ms),
frame_age_p95_ms: p95_f64(&self.frame_age_ms),
frame_age_max_ms: max_f64(&self.frame_age_ms),
send_wait_p95_ms: p95_f64(&self.send_wait_ms),
import_avg_ms: avg_ms(&self.import_us),
import_p95_ms: p95_ms(&self.import_us),
scale_avg_ms: avg_ms(&self.scale_us),
scale_p95_ms: p95_ms(&self.scale_us),
transfer_avg_ms: avg_ms(&self.transfer_us),
transfer_p95_ms: p95_ms(&self.transfer_us),
sws_avg_ms: avg_ms(&self.sws_us),
sws_p95_ms: p95_ms(&self.sws_us),
encode_avg_ms: avg_ms(&self.encode_us),
encode_p95_ms: p95_ms(&self.encode_us),
total_avg_ms: avg_ms(&self.total_us),
total_p95_ms: p95_ms(&self.total_us),
output_bytes_per_sec: sum_usize(&self.output_bytes) as f64 / elapsed,
output_frame_bytes_p95: p95_usize(&self.output_bytes),
output_frame_bytes_max: max_usize(&self.output_bytes),
};
// Reset all counters and sample buffers
self.capture_frames = 0;
self.encoded_frames = 0;
self.sent_frames = 0;
self.pipewire_dropped = 0;
self.over_budget_count = 0;
self.capture_queue_depth = 0;
self.encoded_queue_depth = 0;
self.capture_gaps_ms.clear();
self.encoded_gaps_ms.clear();
self.sent_gaps_ms.clear();
self.frame_age_ms.clear();
self.send_wait_ms.clear();
self.import_us.clear();
self.scale_us.clear();
self.transfer_us.clear();
self.sws_us.clear();
self.encode_us.clear();
self.total_us.clear();
self.output_bytes.clear();
self.window_start = Instant::now();
snap
}
}
/// A one-second snapshot of pipeline statistics.
#[derive(Debug)]
pub struct StatsSnapshot {
pub elapsed_secs: f64,
// FPS
pub capture_fps: f64,
pub encoded_fps: f64,
pub sent_fps: f64,
// Counters
pub capture_frames: u64,
pub encoded_frames: u64,
pub sent_frames: u64,
pub pipewire_dropped: u64,
pub over_budget_count: u64,
// Queue depths
pub capture_queue_depth: usize,
pub encoded_queue_depth: usize,
// Gap timing (ms)
pub capture_gap_avg_ms: f64,
pub capture_gap_p95_ms: f64,
pub capture_gap_max_ms: f64,
pub encoded_gap_avg_ms: f64,
pub encoded_gap_p95_ms: f64,
pub encoded_gap_max_ms: f64,
pub sent_gap_avg_ms: f64,
pub sent_gap_p95_ms: f64,
pub sent_gap_max_ms: f64,
// Frame age (capture → send)
pub frame_age_avg_ms: f64,
pub frame_age_p95_ms: f64,
pub frame_age_max_ms: f64,
// Send wait
pub send_wait_p95_ms: f64,
// Per-stage encode timing (ms)
pub import_avg_ms: f64,
pub import_p95_ms: f64,
pub scale_avg_ms: f64,
pub scale_p95_ms: f64,
pub transfer_avg_ms: f64,
pub transfer_p95_ms: f64,
pub sws_avg_ms: f64,
pub sws_p95_ms: f64,
pub encode_avg_ms: f64,
pub encode_p95_ms: f64,
pub total_avg_ms: f64,
pub total_p95_ms: f64,
// Output size
pub output_bytes_per_sec: f64,
pub output_frame_bytes_p95: usize,
pub output_frame_bytes_max: usize,
}
impl std::fmt::Display for StatsSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"capture_fps={:.1} encoded_fps={:.1} sent_fps={:.1} \
pw_dropped={} over_budget={} \
cap_q={} enc_q={} \
cap_gap_p95={:.1}ms cap_gap_max={:.1}ms \
enc_gap_p95={:.1}ms enc_gap_max={:.1}ms \
sent_gap_p95={:.1}ms sent_gap_max={:.1}ms \
frame_age_p95={:.1}ms frame_age_max={:.1}ms \
send_wait_p95={:.1}ms \
import_p95={:.1}ms scale_p95={:.1}ms transfer_p95={:.1}ms \
sws_p95={:.1}ms encode_p95={:.1}ms total_p95={:.1}ms \
output_bps={:.0} frame_bytes_max={}",
self.capture_fps,
self.encoded_fps,
self.sent_fps,
self.pipewire_dropped,
self.over_budget_count,
self.capture_queue_depth,
self.encoded_queue_depth,
self.capture_gap_p95_ms,
self.capture_gap_max_ms,
self.encoded_gap_p95_ms,
self.encoded_gap_max_ms,
self.sent_gap_p95_ms,
self.sent_gap_max_ms,
self.frame_age_p95_ms,
self.frame_age_max_ms,
self.send_wait_p95_ms,
self.import_p95_ms,
self.scale_p95_ms,
self.transfer_p95_ms,
self.sws_p95_ms,
self.encode_p95_ms,
self.total_p95_ms,
self.output_bytes_per_sec,
self.output_frame_bytes_max,
)
}
}
// ---------------------------------------------------------------------------
// Statistics helpers
// ---------------------------------------------------------------------------
fn avg_f64(data: &[f64]) -> f64 {
if data.is_empty() {
return 0.0;
}
data.iter().sum::<f64>() / data.len() as f64
}
fn p95_f64(data: &[f64]) -> f64 {
if data.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = data.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn max_f64(data: &[f64]) -> f64 {
data.iter().copied().fold(0.0_f64, f64::max)
}
fn avg_ms(data: &[u64]) -> f64 {
if data.is_empty() {
return 0.0;
}
data.iter().sum::<u64>() as f64 / data.len() as f64 / 1000.0
}
fn p95_ms(data: &[u64]) -> f64 {
if data.is_empty() {
return 0.0;
}
let mut sorted = data.to_vec();
sorted.sort_unstable();
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)] as f64 / 1000.0
}
fn sum_usize(data: &[usize]) -> usize {
data.iter().sum()
}
fn p95_usize(data: &[usize]) -> usize {
if data.is_empty() {
return 0;
}
let mut sorted = data.to_vec();
sorted.sort_unstable();
let idx = ((sorted.len() as f64) * 0.95).floor() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn max_usize(data: &[usize]) -> usize {
data.iter().copied().max().unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_stats_snapshot() {
let mut stats = PipelineStats::new();
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 0);
assert_eq!(snap.encoded_frames, 0);
assert_eq!(snap.sent_frames, 0);
}
#[test]
fn record_and_snapshot_counts() {
let mut stats = PipelineStats::new();
stats.record_capture();
stats.record_capture();
stats.record_encode(&FrameTimings {
total_us: 5000,
output_bytes: 1000,
..Default::default()
});
stats.record_send(0.1, None);
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 2);
assert_eq!(snap.encoded_frames, 1);
assert_eq!(snap.sent_frames, 1);
}
#[test]
fn p95_computation() {
// 100 values: 0.0 through 99.0
let data: Vec<f64> = (0..100).map(|i| i as f64).collect();
let result = p95_f64(&data);
assert!((result - 95.0).abs() < 1.0, "p95 of 0..100 should be ~95, got {result}");
}
#[test]
fn p95_ms_microseconds() {
let data: Vec<u64> = (0..100).map(|i| i * 1000).collect(); // 0ms..99ms
let result = p95_ms(&data);
assert!((result - 95.0).abs() < 1.0, "p95_ms should be ~95ms, got {result}");
}
#[test]
fn snapshot_resets_counters() {
let mut stats = PipelineStats::new();
stats.record_capture();
let _ = stats.snapshot_and_reset();
let snap = stats.snapshot_and_reset();
assert_eq!(snap.capture_frames, 0);
}
#[test]
fn display_format_contains_key_fields() {
let mut stats = PipelineStats::new();
stats.record_capture();
stats.record_encode(&FrameTimings {
total_us: 10000,
output_bytes: 5000,
..Default::default()
});
stats.record_send(0.5, None);
let snap = stats.snapshot_and_reset();
let text = format!("{snap}");
assert!(text.contains("capture_fps="));
assert!(text.contains("encoded_fps="));
assert!(text.contains("sent_fps="));
assert!(text.contains("total_p95="));
}
}

View File

@@ -19,11 +19,13 @@ const HTML_PAGE: &str = r#"<!DOCTYPE html>
video{max-width:90vw;max-height:80vh;border:1px solid #333} video{max-width:90vw;max-height:80vh;border:1px solid #333}
#status{margin:12px;font-size:14px;color:#aaa} #status{margin:12px;font-size:14px;color:#aaa}
#debug{position:fixed;bottom:8px;left:8px;font-size:11px;color:#666;max-width:90vw;white-space:pre-wrap} #debug{position:fixed;bottom:8px;left:8px;font-size:11px;color:#666;max-width:90vw;white-space:pre-wrap}
#stats-panel{position:fixed;top:8px;right:8px;background:rgba(0,0,0,0.7);color:#0f0;font:11px monospace;padding:6px 10px;border-radius:4px;z-index:100;pointer-events:none;max-width:90vw;white-space:pre;line-height:1.5}
</style></head> </style></head>
<body> <body>
<div id="status">Connecting...</div> <div id="status">Connecting...</div>
<video id="video" autoplay playsinline muted></video> <video id="video" autoplay playsinline muted></video>
<pre id="debug"></pre> <pre id="debug"></pre>
<div id="stats-panel"></div>
<script> <script>
const status = document.getElementById('status'); const status = document.getElementById('status');
const video = document.getElementById('video'); const video = document.getElementById('video');
@@ -51,25 +53,92 @@ function preferH264(sdp) {
} }
function installStatsLogger(peer) { function installStatsLogger(peer) {
const panel = document.getElementById('stats-panel');
let prev = null;
const intervalSecs = 1;
setInterval(() => { setInterval(() => {
if (peer !== pc) return; if (peer !== pc) return;
const v = video;
log(`video: readyState=${v.readyState} currentTime=${v.currentTime.toFixed(2)} ` +
`paused=${v.paused} width=${v.videoWidth} height=${v.videoHeight} ` +
`srcObject=${v.srcObject ? 'yes' : 'no'}`);
peer.getStats().then(stats => { peer.getStats().then(stats => {
let rtp = null, rtt = null, codecStr = '';
let freezeCount = null, totalFreezesDuration = null;
stats.forEach(report => { stats.forEach(report => {
if (report.type === 'inbound-rtp' && report.kind === 'video') { if (report.type === 'inbound-rtp' && report.kind === 'video') rtp = report;
log(`RTP-in: packetsReceived=${report.packetsReceived} packetsLost=${report.packetsLost} ` + if (report.type === 'codec' && report.mimeType && report.mimeType.includes('H264'))
`bytesReceived=${report.bytesReceived} framesDecoded=${report.framesDecoded} ` + codecStr = report.mimeType + ' ' + (report.payloadType || '');
`framesDropped=${report.framesDropped} codecId=${report.codecId}`); // candidate-pair: feature-detect 'selected' property
} if (report.type === 'candidate-pair') {
if (report.type === 'codec' && report.mimeType && report.mimeType.includes('H264')) { const isSel = ('selected' in report) ? report.selected : report.state === 'succeeded';
log(`Codec: ${report.mimeType} ${report.payloadType} sdpFmtpLine=${report.sdpFmtpLine}`); if (isSel && typeof report.currentRoundTripTime === 'number') rtt = report.currentRoundTripTime;
} }
}); });
// Freeze stats (feature-detect)
if (rtp && typeof rtp.freezeCount !== 'undefined') {
freezeCount = rtp.freezeCount;
totalFreezesDuration = rtp.totalFreezesDuration;
}
if (!rtp) return;
const cur = {
framesDecoded: rtp.framesDecoded || 0,
framesDropped: rtp.framesDropped || 0,
framesPerSecond: rtp.framesPerSecond || 0,
packetsLost: rtp.packetsLost || 0,
jitter: rtp.jitter || 0,
bytesReceived: rtp.bytesReceived || 0,
totalDecodeTime: rtp.totalDecodeTime || 0,
jitterBufferDelay: rtp.jitterBufferDelay || 0,
jitterBufferEmittedCount: rtp.jitterBufferEmittedCount || 0,
freezeCount: freezeCount,
totalFreezesDuration: totalFreezesDuration,
rtt: rtt,
};
// Raw log to debug element (backward compat)
log('RTP-in: decoded=' + cur.framesDecoded + ' lost=' + cur.packetsLost +
' bytes=' + cur.bytesReceived + ' fps=' + cur.framesPerSecond +
(codecStr ? ' codec=' + codecStr : ''));
if (!prev) { prev = cur; return; }
// Compute deltas
const dFrames = cur.framesDecoded - prev.framesDecoded;
const dDropped = cur.framesDropped - prev.framesDropped;
const dLost = cur.packetsLost - prev.packetsLost;
const dBytes = cur.bytesReceived - prev.bytesReceived;
const dDecodeTime = cur.totalDecodeTime - prev.totalDecodeTime;
const dJitterBufDelay = cur.jitterBufferDelay - prev.jitterBufferDelay;
const dJitterBufCount = cur.jitterBufferEmittedCount - prev.jitterBufferEmittedCount;
const kbps = Math.round(dBytes * 8 / intervalSecs / 1000);
const decodeMs = dFrames > 0 ? (dDecodeTime / dFrames * 1000).toFixed(1) : '—';
const jitterBufMs = dJitterBufCount > 0 ? (dJitterBufDelay / dJitterBufCount * 1000).toFixed(1) : '—';
const jitterMs = (cur.jitter * 1000).toFixed(1);
const rttMs = cur.rtt !== null ? (cur.rtt * 1000).toFixed(1) : null;
let line = 'FPS:' + cur.framesPerSecond +
' Decoded:' + cur.framesDecoded + '(+' + dFrames + ')' +
' Dropped:' + cur.framesDropped + (dDropped > 0 ? '(+' + dDropped + ')' : '') +
' Lost:' + dLost +
' Jitter:' + jitterMs + 'ms' +
(rttMs !== null ? ' RTT:' + rttMs + 'ms' : '') +
' Decode:' + decodeMs + 'ms' +
' JBuf:' + jitterBufMs + 'ms';
if (freezeCount !== null) {
const dFreeze = cur.freezeCount - (prev.freezeCount || 0);
if (cur.freezeCount > 0 || dFreeze > 0)
line += ' Freeze:' + cur.freezeCount + '(+' + dFreeze + ')';
}
line += ' ' + kbps + 'kbps';
panel.textContent = line;
prev = cur;
}).catch(() => {}); }).catch(() => {});
}, 2000); }, intervalSecs * 1000);
} }
function connect() { function connect() {
@@ -178,12 +247,16 @@ impl WebRtcState {
HTML_PAGE.len(), HTML_PAGE.len(),
HTML_PAGE HTML_PAGE
); );
let _ = stream.write_all(resp.as_bytes()); if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
} else if req_str.starts_with("POST /sdp") { } else if req_str.starts_with("POST /sdp") {
let body = extract_body(&req_str); let body = extract_body(&req_str);
if body.is_empty() { if body.is_empty() {
let resp = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\nempty body"; let resp = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\nempty body";
let _ = stream.write_all(resp.as_bytes()); if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
continue; continue;
} }
@@ -206,17 +279,23 @@ impl WebRtcState {
answer_json.len(), answer_json.len(),
answer_json answer_json
); );
let _ = stream.write_all(resp.as_bytes()); if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
} }
Err(e) => { Err(e) => {
tracing::error!("SDP offer handling failed: {e}"); tracing::error!("SDP offer handling failed: {e}");
let resp = format!("HTTP/1.1 500 Error\r\nConnection: close\r\n\r\n{e}"); let resp = "HTTP/1.1 500 Internal Server Error\r\nConnection: close\r\n\r\n";
let _ = stream.write_all(resp.as_bytes()); if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
} }
} }
} else { } else {
let resp = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n"; let resp = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n";
let _ = stream.write_all(resp.as_bytes()); if let Err(e) = stream.write_all(resp.as_bytes()) {
tracing::debug!("HTTP write error: {e}");
}
} }
} }
Ok(handled) Ok(handled)
@@ -225,7 +304,7 @@ impl WebRtcState {
pub fn poll_rtc(&mut self) -> Result<()> { pub fn poll_rtc(&mut self) -> Result<()> {
if let Some(inner) = self.inner.as_mut() { if let Some(inner) = self.inner.as_mut() {
if inner.poll_rtc()? { if inner.poll_rtc()? {
tracing::warn!("WebRTC connection closed/failed; clearing connection state"); tracing::info!("WebRTC connection closed; clearing connection state");
self.inner = None; self.inner = None;
} }
} }
@@ -246,8 +325,14 @@ impl WebRtcState {
} }
pub fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<()> { pub fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<()> {
if let Some(inner) = self.inner.as_mut() { let should_destroy = if let Some(inner) = self.inner.as_mut() {
inner.write_h264_frame(data, frame_number, fps)?; inner.write_h264_frame(data, frame_number, fps)?
} else {
false
};
if should_destroy {
tracing::info!("WebRTC connection failed during write; clearing connection state");
self.inner = None;
} }
Ok(()) Ok(())
} }
@@ -264,10 +349,49 @@ impl WebRtcInner {
let socket = UdpSocket::bind("0.0.0.0:0")?; let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
// Increase UDP send buffer to absorb IDR frame bursts (256KB IDR → ~145 RTP
// packets in a single poll_rtc loop). Default Linux wmem is ~208KB which
// causes EAGAIN on large keyframes. 2MB comfortably buffers several IDRs.
const SND_BUF_REQ: usize = 2 * 1024 * 1024;
// SAFETY: fd is a valid UDP socket; setsockopt/getsockopt with SOL_SOCKET +
// SO_SNDBUF are safe on Linux. We check the return value and log the actual
// kernel-assigned buffer (Linux may cap at wmem_max and/or double the value).
unsafe {
let fd = std::os::unix::io::AsRawFd::as_raw_fd(&socket);
let val: libc::c_int = SND_BUF_REQ as libc::c_int;
let ret = libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_SNDBUF,
&val as *const libc::c_int as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
);
if ret < 0 {
tracing::warn!("setsockopt SO_SNDBUF failed (errno {})", std::io::Error::last_os_error());
}
let mut actual: libc::c_int = 0;
let mut actual_len: libc::socklen_t = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let gret = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_SNDBUF,
&mut actual as *mut libc::c_int as *mut libc::c_void,
&mut actual_len,
);
if gret == 0 {
tracing::info!(
"UDP send buffer: requested {}KB, actual {}KB",
SND_BUF_REQ / 1024,
actual / 1024,
);
}
}
let local_addr = socket.local_addr()?; let local_addr = socket.local_addr()?;
let lan_ip = local_ip().unwrap_or_else(|| { let lan_ip = local_ip().unwrap_or_else(|| {
tracing::warn!("Failed to detect LAN IP, falling back to 127.0.0.1"); tracing::debug!("Failed to detect LAN IP, falling back to 127.0.0.1");
"127.0.0.1".to_string() "127.0.0.1".to_string()
}); });
let candidate_addr: SocketAddr = format!("{lan_ip}:{}", local_addr.port()).parse()?; let candidate_addr: SocketAddr = format!("{lan_ip}:{}", local_addr.port()).parse()?;
@@ -311,28 +435,26 @@ impl WebRtcInner {
} }
fn discover_video_params(&mut self) { fn discover_video_params(&mut self) {
for s in ["0", "1", "2", "3"] { let mid = match self.video_mid {
let mid: Mid = s.into(); Some(m) => m,
if let Some(media) = self.rtc.media(mid) { None => {
if media.kind() == MediaKind::Video { tracing::debug!("discover_video_params: no video_mid yet");
tracing::info!("Found video media: mid={mid}"); return;
self.video_mid = Some(mid); }
};
self.video_pt = None;
if let Some(writer) = self.rtc.writer(mid) {
for pp in writer.payload_params() {
tracing::debug!("Codec: pt={:?} spec={:?}", pp.pt(), pp.spec());
if pp.spec().codec.is_video() && pp.spec().codec == Codec::H264 {
self.video_pt = Some(pp.pt());
tracing::info!("H.264 payload type: {:?}", pp.pt());
break; break;
} }
} }
} }
if self.video_pt.is_none() {
if let Some(mid) = self.video_mid { tracing::warn!("discover_video_params: no H.264 codec found for mid={mid}");
if let Some(writer) = self.rtc.writer(mid) {
for pp in writer.payload_params() {
tracing::debug!("Codec: pt={:?} spec={:?}", pp.pt(), pp.spec());
if pp.spec().codec.is_video() && pp.spec().codec == Codec::H264 {
self.video_pt = Some(pp.pt());
tracing::info!("H.264 payload type: {:?}", pp.pt());
break;
}
}
}
} }
} }
@@ -340,13 +462,20 @@ impl WebRtcInner {
loop { loop {
match self.rtc.poll_output() { match self.rtc.poll_output() {
Ok(Output::Transmit(t)) => { Ok(Output::Transmit(t)) => {
tracing::info!("TX {} bytes -> {}", t.contents.len(), t.destination); tracing::trace!("TX {} bytes -> {}", t.contents.len(), t.destination);
if let Err(e) = self.socket.send_to(&t.contents, t.destination) { if let Err(e) = self.socket.send_to(&t.contents, t.destination) {
tracing::warn!("UDP send error: {e}"); if e.kind() == std::io::ErrorKind::WouldBlock {
tracing::debug!(
"UDP send WouldBlock ({} bytes) — send buffer full",
t.contents.len(),
);
} else {
tracing::warn!("UDP send error to {}: {e}", t.destination);
}
} }
} }
Ok(Output::Event(e)) => { Ok(Output::Event(e)) => {
tracing::info!("RTC event: {e:?}"); tracing::debug!("RTC event: {e:?}");
match &e { match &e {
Event::Connected => { Event::Connected => {
tracing::info!("WebRTC connected!"); tracing::info!("WebRTC connected!");
@@ -357,9 +486,21 @@ impl WebRtcInner {
Event::IceConnectionStateChange(IceConnectionState::Disconnected) => { Event::IceConnectionStateChange(IceConnectionState::Disconnected) => {
tracing::warn!("WebRTC disconnected"); tracing::warn!("WebRTC disconnected");
self.connected = false; self.connected = false;
return Ok(true);
} }
Event::MediaAdded(ma) => { Event::MediaAdded(ma) => {
tracing::info!("Media added: mid={:?}", ma.mid); tracing::info!("Media added: mid={} kind={:?}", ma.mid, ma.kind);
if ma.kind == MediaKind::Video {
if let Some(media) = self.rtc.media(ma.mid) {
if media.direction().is_sending()
&& self.video_mid.is_none()
{
self.video_mid = Some(ma.mid);
tracing::info!("Captured video mid: {}", ma.mid);
self.discover_video_params();
}
}
}
} }
_ => { _ => {
tracing::debug!("WebRTC event: {:?}", e); tracing::debug!("WebRTC event: {:?}", e);
@@ -369,7 +510,8 @@ impl WebRtcInner {
Ok(Output::Timeout(_t)) => break, Ok(Output::Timeout(_t)) => break,
Err(e) => { Err(e) => {
tracing::error!("rtc.poll_output error: {e}"); tracing::error!("rtc.poll_output error: {e}");
break; self.connected = false;
return Ok(true);
} }
} }
} }
@@ -383,7 +525,7 @@ impl WebRtcInner {
Ok((n, source)) => { Ok((n, source)) => {
recv_count += 1; recv_count += 1;
if recv_count <= 5 { if recv_count <= 5 {
tracing::info!("UDP recv {} bytes from {}", n, source); tracing::trace!("UDP recv {} bytes from {}", n, source);
} }
let input = Input::Receive( let input = Input::Receive(
Instant::now(), Instant::now(),
@@ -413,23 +555,23 @@ impl WebRtcInner {
Ok(()) Ok(())
} }
fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<()> { fn write_h264_frame(&mut self, data: &[u8], frame_number: u64, fps: u32) -> Result<bool> {
if !self.connected { if !self.connected {
return Ok(()); return Ok(false);
} }
let mid = match self.video_mid { let mid = match self.video_mid {
Some(m) => m, Some(m) => m,
None => { None => {
tracing::warn!("write_h264: no video_mid"); tracing::debug!("write_h264: no video_mid");
return Ok(()); return Ok(false);
} }
}; };
let pt = match self.video_pt { let pt = match self.video_pt {
Some(p) => p, Some(p) => p,
None => { None => {
tracing::warn!("write_h264: no video_pt"); tracing::debug!("write_h264: no video_pt");
return Ok(()); return Ok(false);
} }
}; };
@@ -439,7 +581,7 @@ impl WebRtcInner {
"write_h264: skipping non-IDR frame ({} bytes), waiting for keyframe", "write_h264: skipping non-IDR frame ({} bytes), waiting for keyframe",
data.len() data.len()
); );
return Ok(()); return Ok(false);
} }
tracing::info!( tracing::info!(
"write_h264: got IDR keyframe ({} bytes), starting playback", "write_h264: got IDR keyframe ({} bytes), starting playback",
@@ -457,8 +599,8 @@ impl WebRtcInner {
let writer = match self.rtc.writer(mid) { let writer = match self.rtc.writer(mid) {
Some(w) => w, Some(w) => w,
None => { None => {
tracing::warn!("write_h264: no writer for mid={mid}"); tracing::debug!("write_h264: no writer for mid={mid}");
return Ok(()); return Ok(false);
} }
}; };
@@ -472,9 +614,9 @@ impl WebRtcInner {
.write(pt, Instant::now(), rtp_time, data) .write(pt, Instant::now(), rtp_time, data)
.map_err(|e| anyhow::anyhow!("writer.write: {e}"))?; .map_err(|e| anyhow::anyhow!("writer.write: {e}"))?;
self.poll_rtc()?; let should_destroy = self.poll_rtc()?;
Ok(()) Ok(should_destroy)
} }
fn is_connected(&self) -> bool { fn is_connected(&self) -> bool {
@@ -510,16 +652,17 @@ fn local_ip() -> Option<String> {
fn is_idr_nalu(data: &[u8]) -> bool { fn is_idr_nalu(data: &[u8]) -> bool {
let mut i = 0; let mut i = 0;
while i + 4 < data.len() { while i < data.len() {
if data[i..i + 4] == [0, 0, 0, 1] { let tail = &data[i..];
let nal_type = data[i + 4] & 0x1F; if tail.starts_with(&[0, 0, 0, 1]) {
if nal_type == 5 { let Some(&header) = tail.get(4) else { break };
if header & 0x1F == 5 {
return true; return true;
} }
i += 5; i += 5;
} else if i + 3 < data.len() && data[i..i + 3] == [0, 0, 1] { } else if tail.starts_with(&[0, 0, 1]) {
let nal_type = data[i + 3] & 0x1F; let Some(&header) = tail.get(3) else { break };
if nal_type == 5 { if header & 0x1F == 5 {
return true; return true;
} }
i += 4; i += 4;
@@ -529,3 +672,78 @@ fn is_idr_nalu(data: &[u8]) -> bool {
} }
false false
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_data() {
assert!(!is_idr_nalu(&[]));
}
#[test]
fn short_data_no_start_code() {
assert!(!is_idr_nalu(&[0]));
assert!(!is_idr_nalu(&[0, 0]));
assert!(!is_idr_nalu(&[1, 2, 3]));
}
#[test]
fn three_byte_start_code_no_nal_header() {
assert!(!is_idr_nalu(&[0, 0, 1]));
}
#[test]
fn four_byte_start_code_no_nal_header() {
assert!(!is_idr_nalu(&[0, 0, 0, 1]));
}
#[test]
fn three_byte_start_code_idr_at_tail() {
assert!(is_idr_nalu(&[0, 0, 1, 0x65]));
assert!(!is_idr_nalu(&[0, 0, 1, 0x01]));
}
#[test]
fn four_byte_start_code_idr_at_tail() {
assert!(is_idr_nalu(&[0, 0, 0, 1, 0x65]));
assert!(!is_idr_nalu(&[0, 0, 0, 1, 0x01]));
}
#[test]
fn idr_in_middle_of_frame() {
let data: Vec<u8> = [
&[0, 0, 0, 1, 0x67][..], // SPS
&[0, 0, 0, 1, 0x68][..], // PPS
&[0, 0, 0, 1, 0x65][..], // IDR
]
.concat();
assert!(is_idr_nalu(&data));
}
#[test]
fn no_idr_in_frame() {
let data: Vec<u8> = [
&[0, 0, 0, 1, 0x67][..], // SPS
&[0, 0, 0, 1, 0x68][..], // PPS
]
.concat();
assert!(!is_idr_nalu(&data));
}
#[test]
fn mixed_start_code_lengths() {
let data: Vec<u8> = [
&[0, 0, 0, 1, 0x67][..], // SPS (4-byte start code)
&[0, 0, 1, 0x65][..], // IDR (3-byte start code)
]
.concat();
assert!(is_idr_nalu(&data));
}
#[test]
fn all_zeros() {
assert!(!is_idr_nalu(&[0, 0, 0, 0, 0, 0, 0, 0]));
}
}