# wl-webrtc Phase 1 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build a working Wayland screen → browser streaming pipeline with <50ms LAN latency. **Architecture:** mio event loop on main thread handles Wayland capture + GPU encode. tokio runtime handles WebTransport + Web UI. async_channel bridges the two (sync send on mio, async recv on tokio). Browser uses WebCodecs for direct decode. **Tech Stack:** Rust, wayland-client, ffmpeg-next (VAAPI/Vulkan), wtransport (WebTransport over HTTP/3), axum + rust-embed (Web UI), WebCodecs (browser) **Reference:** Design spec at `docs/superpowers/specs/2026-04-03-wl-webrtc-architecture-design.md`. Source analysis at `analysis.md`. --- ## File Structure ``` wl-webrtc/ ├── Cargo.toml ├── build.rs # FFmpeg library detection ├── src/ │ ├── main.rs # CLI + startup + mio event loop │ ├── args.rs # CLI argument definitions (clap) │ ├── cap_ext_image_copy.rs # ext-image-copy-capture backend │ ├── cap_wlr_screencopy.rs # wlr-screencopy backend │ ├── avhw.rs # FFmpeg HW device/frame contexts │ ├── filter.rs # FFmpeg video filter graph │ ├── transform.rs # Coordinate transforms │ ├── fps_limit.rs # VRR-aware frame rate limiter │ ├── state.rs # State machine + CaptureSource trait │ ├── transport.rs # QUIC/WebTransport server (new) │ ├── signaling.rs # axum HTTP + static files (new) │ └── nalu.rs # Annex B NAL unit framing (new) ├── static/ │ ├── index.html # Web UI shell │ ├── player.js # WebCodecs decoder + Canvas renderer │ └── style.css # Minimal styling └── tests/ ├── integration_test.rs # End-to-end test harness └── nalu_test.rs # NAL unit framing tests ``` --- ## Task 1: Project Scaffold **Files:** - Create: `Cargo.toml` - Create: `build.rs` - Create: `src/main.rs` - Create: `src/args.rs` - [ ] **Step 1: Create Cargo.toml with all dependencies** ```toml [package] name = "wl-webrtc" version = "0.1.0" edition = "2021" description = "Low-latency Wayland screen sharing via WebTransport" [dependencies] # Wayland screen capture wayland-client = "0.31" wayland-protocols = { version = "0.32", features = ["client", "unstable", "staging"] } wayland-protocols-wlr = { version = "0.3", features = ["client"] } drm-fourcc = "2" # GPU encoding — WARNING: safe API does NOT wrap HW contexts. # Use ffmpeg_next::ffi directly for AVBufferRef, AVHWFramesContext. # See wl-screenrec/src/avhw.rs for reference pattern. ffmpeg-next = "8" # WebTransport over HTTP/3 (built on quinn + rustls, self-signed cert support) wtransport = { version = "0.7", features = ["self-signed"] } # Web UI axum = { version = "0.8", features = ["ws"] } tower-http = { version = "0.6", features = ["cors"] } rust-embed = { version = "8", features = ["mime-guess"] } # Async runtime tokio = { version = "1", features = ["full"] } # Sync/async bridge (sync send() on mio thread, async recv() on tokio) async-channel = "2" # Event loop mio = "1" # Utilities clap = { version = "4", features = ["derive"] } tracing = "0.1" tracing-subscriber = "0.3" anyhow = "1" bytes = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" signal-hook = { version = "0.3", features = ["iterator"] } base64 = "0.22" mime_guess = "2" ```rust fn main() { // No custom build steps needed. // ffmpeg-next's own build.rs handles library detection via pkg-config. } ``` - [ ] **Step 3: Create src/args.rs with CLI argument definitions** ```rust use clap::Parser; /// Low-latency Wayland screen sharing via WebTransport #[derive(Parser, Debug)] #[command(name = "wl-webrtc", version, about)] pub struct Args { /// Display output to capture (e.g. "DP-1"). Defaults to first output. #[arg(short, long)] pub output: Option, /// Region of interest: x,y,w,h (e.g. "100,100,800,600") #[arg(short, long)] pub roi: Option, /// Target framerate #[arg(short, long, default_value = "60")] pub fps: u32, /// Port for WebTransport + Web UI #[arg(short, long, default_value = "8443")] pub port: u16, /// Bind address #[arg(long, default_value = "0.0.0.0")] pub bind: String, /// Video codec: h264 or hevc #[arg(long, default_value = "h264")] pub codec: String, /// Hardware encoding backend: vaapi or vulkan #[arg(long, default_value = "vaapi")] pub hw_accel: String, /// DRM device path (e.g. /dev/dri/renderD128) #[arg(long)] pub drm_device: Option, /// Target bitrate in bits per second #[arg(long, default_value = "8000000")] pub bitrate: u64, /// GOP size (keyframe interval in frames) #[arg(long, default_value = "30")] pub gop_size: u32, /// Enable verbose logging #[arg(short, long)] pub verbose: bool, } ``` - [ ] **Step 4: Create minimal src/main.rs that parses args and exits** This is a minimal stub. The full implementation is in Task 11. Do NOT add Wayland, transport, or nalu references here — those modules don't exist yet. ```rust mod args; use args::Args; use clap::Parser; fn main() -> anyhow::Result<()> { let args = Args::parse(); if args.verbose { tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); } else { tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init(); } tracing::info!("wl-webrtc starting"); tracing::info!("codec={}, hw={}, fps={}, bitrate={}", args.codec, args.hw_accel, args.fps, args.bitrate); // Full implementation is in Task 11 (main event loop wiring). // This stub only verifies args parsing and logging work. tracing::warn!("Stub mode — event loop not yet implemented. See Task 11."); Ok(()) } ``` - [ ] **Step 5: Verify project compiles** Run: `cargo build` Expected: Compiles successfully (may take time for first dependency download) - [ ] **Step 6: Commit** ```bash git init && git add -A && git commit -m "feat: project scaffold with Cargo.toml, CLI args, main stub" ``` --- ## Task 2: Leaf Modules — transform.rs **Files:** - Create: `src/transform.rs` Port coordinate transformation module. This module handles Wayland output transforms (rotation/flip) and ROI clipping. **Reference:** Wayland `wl_output` transform enum defines 8 orientations. The transform logic is standard 2D affine math (rotation + reflection matrices). See `wl_output::Transform` in the Wayland protocol spec. - [ ] **Step 1: Create src/transform.rs** Implement the following coordinate transformation utilities: - `Transform` enum — 8 Wayland output transform variants (Normal, _90, _180, _270, Flipped, Flipped90, Flipped180, Flipped270) - `transform_basis()` — maps Transform to 2x2 matrix (a, b, c, d) - `screen_to_frame()` — transforms a rectangle from screen space to frame space using transform matrix - `transpose_if_transform_transposed()` — swaps width/height for 90/270 degree rotations - `fit_inside_bounds()` — clips ROI rectangle to stay within output bounds The module is self-contained with no internal dependencies. Only uses standard library types and `drm_fourcc::DrmFourcc` if needed for format handling. Key types: ```rust #[derive(Debug, Clone, Copy)] pub enum Transform { Normal, Normal90, Normal180, Normal270, Flipped, Flipped90, Flipped180, Flipped270, } pub struct Rect { pub x: i32, pub y: i32, pub w: i32, pub h: i32, } pub fn screen_to_frame(transform: Transform, rect: Rect, frame_w: i32, frame_h: i32) -> Rect; pub fn transpose_if_transform_transposed(transform: Transform, w: i32, h: i32) -> (i32, i32); pub fn fit_inside_bounds(rect: Rect, bounds_w: i32, bounds_h: i32) -> Rect; ``` - [ ] **Step 2: Add `mod transform;` to src/main.rs** - [ ] **Step 3: Verify compilation** Run: `cargo build` - [ ] **Step 4: Commit** ```bash git add -A && git commit -m "feat: coordinate transform module for Wayland output transforms" ``` --- ## Task 3: Leaf Modules — fps_limit.rs **Files:** - Create: `src/fps_limit.rs` Implement a VRR-aware frame rate limiter. Uses a one-frame-buffer delay strategy to make correct frame-drop decisions in the presence of variable refresh rate displays. **Reference:** The one-frame-buffer approach is a standard technique for VRR displays — buffer the current frame, output the previous frame only if enough time has elapsed, otherwise discard the old frame. - [ ] **Step 1: Create src/fps_limit.rs** Implement the `FpsLimit` struct: ```rust pub struct FpsLimit { on_deck: Option<(T, std::time::Instant)>, min_interval: std::time::Duration, } impl FpsLimit { pub fn new(fps: u32) -> Self; /// Feed a new frame. Returns: /// - Some(output_frame) if a buffered frame should be displayed /// - None if no frame is ready to output yet /// The returned frame is the PREVIOUS frame (on_deck), not the current one. /// The current frame is stored in on_deck for next call. pub fn on_new_frame(&mut self, frame: T, timestamp: std::time::Instant) -> Option; /// Call at end of stream to flush the last buffered frame pub fn flush(&mut self) -> Option; } ``` Logic: - First frame: return None, store in on_deck - Second frame onwards: check if interval between on_deck timestamp and new frame >= min_interval - If yes: output on_deck frame, store new frame in on_deck - If no: discard on_deck (too close), store new frame in on_deck - [ ] **Step 2: Add `mod fps_limit;` to src/main.rs** - [ ] **Step 3: Create tests/fps_limit_test.rs with unit tests** ```rust #[cfg(test)] mod tests { use super::*; use std::time::{Duration, Instant}; #[test] fn first_frame_is_buffered() { let mut limiter: FpsLimit = FpsLimit::new(30); let now = Instant::now(); let result = limiter.on_new_frame(1u32, now); assert!(result.is_none()); } #[test] fn frames_too_close_drops_old() { let mut limiter: FpsLimit = FpsLimit::new(30); let now = Instant::now(); limiter.on_new_frame(1, now); // Send second frame almost immediately let result = limiter.on_new_frame(2, now + Duration::from_millis(1)); // Too close → old frame dropped, new frame buffered assert!(result.is_none()); } #[test] fn frames_far_enough_output_old() { let mut limiter: FpsLimit = FpsLimit::new(30); let now = Instant::now(); limiter.on_new_frame(1, now); // Wait long enough (33ms for 30fps) let result = limiter.on_new_frame(2, now + Duration::from_millis(40)); assert_eq!(result, Some(1)); } #[test] fn flush_returns_last_buffered() { let mut limiter: FpsLimit = FpsLimit::new(30); let now = Instant::now(); limiter.on_new_frame(1, now); assert_eq!(limiter.flush(), Some(1)); assert_eq!(limiter.flush(), None); } } ``` - [ ] **Step 4: Run tests** Run: `cargo test fps_limit` Expected: All 4 tests pass - [ ] **Step 5: Commit** ```bash git add -A && git commit -m "feat: VRR-aware FPS limiter with unit tests" ``` --- ## Task 4: NAL Unit Framing — nalu.rs **Files:** - Create: `src/nalu.rs` - Create: `tests/nalu_test.rs` New code (not ported). Handles Annex B NAL unit splitting and the framing protocol for QUIC transport. - [ ] **Step 1: Create src/nalu.rs with core types** ```rust use bytes::Bytes; /// A single NAL unit extracted from an Annex B bitstream pub struct NalUnit { pub nal_type: u8, pub data: Bytes, // NAL unit data WITHOUT start codes } /// Identifies frame type for transport framing #[derive(Debug, Clone, Copy, PartialEq)] pub enum FrameType { Keyframe, Delta, } /// A complete encoded frame ready for transport #[derive(Clone)] pub struct EncodedFrame { pub data: Bytes, pub pts_us: i64, pub duration: std::time::Duration, pub frame_type: FrameType, pub width: u32, pub height: u32, } /// Fragment header for QUIC datagram framing #[derive(Debug, Clone)] pub struct FragmentHeader { pub frame_type: u8, // 0x01-0x04, 0x10 pub frame_id: u32, pub pts_us: i64, // Presentation timestamp in microseconds (for WebCodecs) pub seq_num: u16, pub total_frags: u16, } impl FragmentHeader { pub const SIZE: usize = 17; // 1 + 4 + 8 + 2 + 2 pub const TYPE_KEYFRAME_FRAG: u8 = 0x01; pub const TYPE_DELTA_FRAG: u8 = 0x02; pub const TYPE_KEYFRAME_COMPLETE: u8 = 0x03; pub const TYPE_DELTA_COMPLETE: u8 = 0x04; pub const TYPE_CODEC_CONFIG: u8 = 0x10; pub fn encode(&self) -> [u8; Self::SIZE] { let mut buf = [0u8; Self::SIZE]; buf[0] = self.frame_type; buf[1..5].copy_from_slice(&self.frame_id.to_be_bytes()); buf[5..13].copy_from_slice(&self.pts_us.to_be_bytes()); buf[13..15].copy_from_slice(&self.seq_num.to_be_bytes()); buf[15..17].copy_from_slice(&self.total_frags.to_be_bytes()); buf } pub fn decode(data: &[u8]) -> anyhow::Result { if data.len() < Self::SIZE { return Err(anyhow::anyhow!("fragment header too short")); } Ok(Self { frame_type: data[0], frame_id: u32::from_be_bytes([data[1], data[2], data[3], data[4]]), pts_us: i64::from_be_bytes([data[5], data[6], data[7], data[8], data[9], data[10], data[11], data[12]]), seq_num: u16::from_be_bytes([data[13], data[14]]), total_frags: u16::from_be_bytes([data[15], data[16]]), }) } } /// Splits an encoded frame into fragments suitable for QUIC datagrams pub struct FrameFragmenter { max_payload_size: usize, // MTU - header size frame_counter: u32, } impl FrameFragmenter { pub fn new(mtu: usize) -> Self { Self { max_payload_size: mtu.saturating_sub(FragmentHeader::SIZE), frame_counter: 0, } } /// Get the current frame counter value (next frame_id that will be assigned) pub fn current_frame_id(&self) -> u32 { self.frame_counter } /// Fragment an encoded frame into QUIC-datagram-sized chunks. /// Returns Vec of (FragmentHeader, payload) pairs. /// Keyframes should be sent via reliable stream instead — /// this method is primarily for delta frames. pub fn fragment(&mut self, frame: &EncodedFrame) -> Vec<(FragmentHeader, Bytes)> { let frame_id = self.frame_counter; self.frame_counter += 1; let frag_type = match frame.frame_type { FrameType::Keyframe => FragmentHeader::TYPE_KEYFRAME_FRAG, FrameType::Delta => FragmentHeader::TYPE_DELTA_FRAG, }; if frame.data.len() <= self.max_payload_size { // Fits in a single datagram let complete_type = match frame.frame_type { FrameType::Keyframe => FragmentHeader::TYPE_KEYFRAME_COMPLETE, FrameType::Delta => FragmentHeader::TYPE_DELTA_COMPLETE, }; return vec![( FragmentHeader { frame_type: complete_type, frame_id, pts_us: frame.pts_us, seq_num: 0, total_frags: 1, }, frame.data.clone(), )]; } // Need fragmentation let total_frags = (frame.data.len() + self.max_payload_size - 1) / self.max_payload_size; let total_frags = total_frags as u16; frame.data .chunks(self.max_payload_size) .enumerate() .map(|(i, chunk)| { ( FragmentHeader { frame_type: frag_type, frame_id, pts_us: frame.pts_us, seq_num: i as u16, total_frags, }, Bytes::copy_from_slice(chunk), ) }) .collect() } } /// H.264 NAL unit type constants pub mod h264 { pub const IDR: u8 = 5; pub const SPS: u8 = 7; pub const PPS: u8 = 8; } /// HEVC NAL unit type constants pub mod hevc { pub const VPS: u8 = 32; pub const SPS: u8 = 33; pub const PPS: u8 = 34; pub const IDR_W_RADL: u8 = 19; pub const IDR_N_LP: u8 = 20; } /// Find Annex B start code positions in a byte buffer. /// Returns positions and lengths of start codes (3 or 4 bytes). pub fn find_annex_b_start_codes(data: &[u8]) -> Vec<(usize, usize)> { let mut positions = Vec::new(); let mut i = 0; while i + 3 <= data.len() { if data[i..i+3] == [0, 0, 1] { if i > 0 && data[i-1] == 0 { positions.push((i - 1, 4)); // 4-byte start code } else { positions.push((i, 3)); // 3-byte start code } i += 3; } else { i += 1; } } positions } /// Determine if an Annex B encoded frame is a keyframe by checking NAL types. /// For H.264: checks for IDR (type 5) NAL units. /// For HEVC: checks for IDR_W_RADL (19) or IDR_N_LP (20) NAL types. pub fn is_keyframe(data: &[u8], is_hevc: bool) -> bool { let start_codes = find_annex_b_start_codes(data); if start_codes.is_empty() { return false; } for i in 0..start_codes.len() { let nal_start = start_codes[i].0 + start_codes[i].1; if nal_start >= data.len() { continue; } let nal_type = if is_hevc { (data[nal_start] >> 1) & 0x3f } else { data[nal_start] & 0x1f }; if is_hevc { if nal_type == hevc::IDR_W_RADL || nal_type == hevc::IDR_N_LP { return true; } } else { if nal_type == h264::IDR { return true; } } } false } ``` - [ ] **Step 2: Add `mod nalu;` to src/main.rs** - [ ] **Step 3: Create tests/nalu_test.rs** ```rust use wl_webrtc::nalu::*; #[test] fn test_find_start_codes_single_nal() { // 4-byte start code + 1 byte NAL let data: &[u8] = &[0x00, 0x00, 0x00, 0x01, 0x65, 0xFF, 0xFF]; let sc = find_annex_b_start_codes(data); assert_eq!(sc.len(), 1); assert_eq!(sc[0], (0, 4)); } #[test] fn test_find_start_codes_multiple_nals() { // SPS + PPS + IDR let data: Vec = [ &[0x00, 0x00, 0x00, 0x01][..], // start code &[0x67][..], // SPS (type 7) &[0x00, 0x00, 0x00, 0x01][..], // start code &[0x68][..], // PPS (type 8) &[0x00, 0x00, 0x00, 0x01][..], // start code &[0x65, 0xFF][..], // IDR (type 5) ].concat(); let sc = find_annex_b_start_codes(&data); assert_eq!(sc.len(), 3); } #[test] fn test_is_keyframe_h264_idr() { let data: Vec = [ &[0x00, 0x00, 0x00, 0x01][..], &[0x65, 0xFF][..], // IDR NAL type 5 ].concat(); assert!(is_keyframe(&data, false)); } #[test] fn test_is_keyframe_h264_non_idr() { let data: Vec = [ &[0x00, 0x00, 0x00, 0x01][..], &[0x41, 0xFF][..], // Non-IDR slice (type 1) ].concat(); assert!(!is_keyframe(&data, false)); } #[test] fn test_fragment_header_roundtrip() { let header = FragmentHeader { frame_type: FragmentHeader::TYPE_DELTA_FRAG, frame_id: 42, pts_us: 12345678, seq_num: 3, total_frags: 7, }; let encoded = header.encode(); let decoded = FragmentHeader::decode(&encoded).unwrap(); assert_eq!(decoded.frame_type, header.frame_type); assert_eq!(decoded.frame_id, header.frame_id); assert_eq!(decoded.seq_num, header.seq_num); assert_eq!(decoded.total_frags, header.total_frags); assert_eq!(decoded.pts_us, header.pts_us); } #[test] fn test_fragmenter_small_frame() { let mut fragger = FrameFragmenter::new(1200); let frame = EncodedFrame { data: Bytes::from(vec![0u8; 500]), pts_us: 0, duration: std::time::Duration::from_secs_f64(1.0 / 60.0), frame_type: FrameType::Delta, width: 1920, height: 1080, }; let frags = fragger.fragment(&frame); assert_eq!(frags.len(), 1); assert_eq!(frags[0].0.frame_type, FragmentHeader::TYPE_DELTA_COMPLETE); assert_eq!(frags[0].0.total_frags, 1); } #[test] fn test_fragmenter_large_frame() { let mut fragger = FrameFragmenter::new(1200); let frame = EncodedFrame { data: Bytes::from(vec![0u8; 5000]), pts_us: 0, duration: std::time::Duration::from_secs_f64(1.0 / 60.0), frame_type: FrameType::Delta, width: 1920, height: 1080, }; let frags = fragger.fragment(&frame); assert!(frags.len() > 1); // All fragments should have the same frame_id let fid = frags[0].0.frame_id; for (h, _) in &frags { assert_eq!(h.frame_id, fid); } } ``` - [ ] **Step 4: Add `[[test]]` sections to Cargo.toml and wire up tests** Add to Cargo.toml: ```toml [[test]] name = "nalu_test" path = "tests/nalu_test.rs" ``` - [ ] **Step 5: Run tests** Run: `cargo test nalu_test` Expected: All 7 tests pass - [ ] **Step 6: Commit** ```bash git add -A && git commit -m "feat: NAL unit framing protocol with Annex B parser and fragmenter" ``` --- ## Task 5: Transport Layer — transport.rs **Files:** - Create: `src/transport.rs` New code. WebTransport server using the `wtransport` crate (which provides full HTTP/3 + WebTransport protocol support). The browser's `WebTransport` API requires HTTP/3 with the WebTransport extension — `wtransport` handles all of this internally, including TLS certificate generation and the HTTP/3 handshake. **Key decisions:** 1. `wtransport` replaces `quinn` + `h3` + `h3-quinn` — it bundles the full HTTP/3 + WebTransport server stack 2. `async_channel::Receiver` replaces `crossbeam::channel::Receiver` — the async `recv()` method eliminates the sync→async bridge problem 3. TLS is handled internally by `wtransport` (self-signed cert auto-generated) — no manual `TlsConfig` or `rustls` config needed 4. Keyframes → reliable WebTransport stream (`open_uni()`); Delta frames → unreliable datagrams (`send_datagram()`) 5. Single client session in Phase 1 **Port architecture:** WebTransport runs over HTTP/3 (UDP). The HTTP static files server (Task 6, axum on TCP) is separate. `player.js` connects to the WebTransport port. Both can share the same port number if desired (HTTP/3 is UDP, axum is TCP), but `player.js` must configure `SERVER_URL` to point to the WebTransport server's address. - [ ] **Step 1: Verify transport dependencies in Cargo.toml** Dependencies (`wtransport`, `async-channel`) should already be in Cargo.toml from Task 1. If not, add: ```toml wtransport = { version = "0.7", features = ["self-signed"] } async-channel = "2" ``` These replace the `quinn`, `rustls`, and `rcgen` deps (remove those if present — `wtransport` handles TLS internally). The `self-signed` feature enables `Identity::self_signed()` for automatic self-signed certificate generation. - [ ] **Step 2: Create src/transport.rs** ```rust use anyhow::Result; use async_channel::Receiver; use std::net::SocketAddr; use crate::nalu::{EncodedFrame, FrameFragmenter, FragmentHeader, FrameType}; /// Codec configuration extracted from encoder SPS/PPS. /// Sent to new clients upon session establishment. pub struct CodecConfig { pub codec: String, // e.g. "avc1.42E01F" pub width: u32, pub height: u32, pub framerate: u32, } /// Manages WebTransport connections and frame distribution. /// Uses wtransport crate for full HTTP/3 + WebTransport protocol support. /// /// CRITICAL API NOTES (wtransport 0.7): /// - `wtransport::Endpoint::server(config)` returns `Result>` — use `?` /// - `endpoint.accept().await` returns `IncomingSession` (NOT a Connection) /// - `incoming_session.await` returns `SessionRequest` (second await) /// - `session_request.accept().await` returns `Connection` (third await) /// - `connection.open_uni().await?.await?` — DOUBLE await for unidirectional streams /// - `connection.send_datagram(data)` — takes `impl AsRef<[u8]>` (`Vec` works directly) /// - `connection.receive_datagram().await` — receive datagram /// - TLS requires `Identity::self_signed()` with `self-signed` feature enabled pub struct TransportServer { endpoint: wtransport::endpoint::Endpoint, frame_rx: Receiver, codec_config: Option, last_keyframe: Option, } impl TransportServer { /// Create a new WebTransport server. /// /// Uses Identity::self_signed() for auto-generated TLS certificate. /// The `self-signed` feature must be enabled in Cargo.toml. pub fn new(addr: SocketAddr, frame_rx: Receiver) -> Result { let identity = wtransport::Identity::self_signed(["localhost", "127.0.0.1", "::1"]) .map_err(|e| anyhow::anyhow!("Failed to generate self-signed certificate: {}", e))?; let config = wtransport::ServerConfig::builder() .with_bind_address(addr) .with_identity(identity) .keep_alive_interval(Some(std::time::Duration::from_secs(3))) .build(); let endpoint = wtransport::Endpoint::server(config)?; Ok(Self { endpoint, frame_rx, codec_config: None, last_keyframe: None, }) } /// Run the transport server loop. Blocks until shutdown. /// Should be spawned on the tokio runtime. pub async fn run(mut self) -> Result<()> { tracing::info!("WebTransport server listening on {}", self.endpoint.local_addr()?); let mut fragmenter = FrameFragmenter::new(1200); let mut session: Option = None; loop { tokio::select! { // Accept new WebTransport connections // 3-step chain: accept() -> IncomingSession.await -> SessionRequest.accept().await incoming = self.endpoint.accept() => { match Self::accept_session(incoming).await { Ok(connection) => { tracing::info!("New WebTransport session from {:?}", connection.remote_address()); // Send codec config as first message if available if let Some(ref config) = self.codec_config { if let Err(e) = Self::send_codec_config(&connection, config).await { tracing::warn!("Failed to send codec config: {}", e); } } // Resend last keyframe if available (so new client can decode immediately) if let Some(ref keyframe) = self.last_keyframe { if let Err(e) = Self::send_keyframe_data(&connection, keyframe, &mut fragmenter).await { tracing::warn!("Failed to send last keyframe to new client: {}", e); } } session = Some(Session::new(connection)); } Err(e) => { tracing::warn!("Session accept failed: {}", e); } } } // Receive encoded frames from capture pipeline (async_channel) frame = self.frame_rx.recv() => { match frame { Ok(frame) => { // Cache keyframes for new client delivery (EncodedFrame derives Clone) if frame.frame_type == FrameType::Keyframe { self.last_keyframe = Some(frame.clone()); } if let Some(s) = &mut session { if let Err(e) = s.send_frame(&frame, &mut fragmenter).await { tracing::error!("Frame send error: {}", e); session = None; } } } Err(_) => break, // Channel closed — shutdown } } } } Ok(()) } /// Accept an incoming session: 3-step async chain async fn accept_session( incoming: wtransport::endpoint::IncomingSession, ) -> Result { // Step 1: incoming_session.await -> SessionRequest let session_request = incoming.await?; tracing::info!( "Session request: authority='{}', path='{}'", session_request.authority(), session_request.path() ); // Step 2: session_request.accept().await -> Connection let connection = session_request.accept().await?; Ok(connection) } /// Send codec configuration to a client via reliable stream. /// In Annex B mode, we only send codec name and dimensions. /// SPS/PPS arrive in-band with each keyframe (via `h264_metadata` BSF with `repeat_sps=1`/`repeat_pps=1`). async fn send_codec_config( connection: &wtransport::Connection, config: &CodecConfig, ) -> Result<()> { let mut stream = connection.open_uni().await?.await?; // Build JSON config message — NO AVCC description needed in Annex B mode. // The browser decoder is configured WITHOUT description, which activates Annex B mode. let config_json = serde_json::json!({ "type": "codec_config", "codec": config.codec, "width": config.width, "height": config.height, "framerate": config.framerate, }); // Write type byte + JSON payload let header = FragmentHeader { frame_type: FragmentHeader::TYPE_CODEC_CONFIG, frame_id: 0, pts_us: 0, seq_num: 0, total_frags: 1, }; stream.write_all(&header.encode()).await?; stream.write_all(config_json.to_string().as_bytes()).await?; stream.finish().await?; Ok(()) } /// Send a cached keyframe to a new client async fn send_keyframe_data( connection: &wtransport::Connection, frame: &EncodedFrame, fragmenter: &mut FrameFragmenter, ) -> Result<()> { let mut stream = connection.open_uni().await?.await?; let header = FragmentHeader { frame_type: FragmentHeader::TYPE_KEYFRAME_COMPLETE, frame_id: fragmenter.current_frame_id(), pts_us: frame.pts_us, seq_num: 0, total_frags: 1, }; stream.write_all(&header.encode()).await?; stream.write_all(&frame.data).await?; stream.finish().await?; Ok(()) } /// Update codec configuration (called from encoder when format changes) pub fn update_codec_config(&mut self, config: CodecConfig) { self.codec_config = Some(config); } } /// A single connected client session struct Session { conn: wtransport::Connection, needs_keyframe: bool, } impl Session { fn new(conn: wtransport::Connection) -> Self { Self { conn, needs_keyframe: false } } async fn send_frame( &mut self, frame: &EncodedFrame, fragmenter: &mut FrameFragmenter, ) -> Result<()> { match frame.frame_type { FrameType::Keyframe => { // Send keyframes via reliable WebTransport stream // NOTE: open_uni().await?.await? — double await! let mut stream = self.conn.open_uni().await?.await?; // Write fragment header + full data let header = FragmentHeader { frame_type: FragmentHeader::TYPE_KEYFRAME_COMPLETE, frame_id: fragmenter.current_frame_id(), pts_us: frame.pts_us, seq_num: 0, total_frags: 1, }; stream.write_all(&header.encode()).await?; stream.write_all(&frame.data).await?; stream.finish().await?; } FrameType::Delta => { // Send delta frames via datagrams (unreliable, low latency) let fragments = fragmenter.fragment(frame); for (header, payload) in fragments { let mut datagram = Vec::with_capacity(FragmentHeader::SIZE + payload.len()); datagram.extend_from_slice(&header.encode()); datagram.extend_from_slice(&payload); self.conn.send_datagram(datagram.into())?; } } } Ok(()) } } ``` **API notes for the implementer (wtransport 0.7 — VERIFIED against source):** - `wtransport::Identity::self_signed(["localhost", "127.0.0.1", "::1"])` generates a self-signed TLS certificate (requires `self-signed` feature) - `wtransport::ServerConfig::builder().with_bind_address(addr).with_identity(identity).build()` creates server config - `wtransport::Endpoint::server(config)` returns `Result>` — use `?` operator - `endpoint.accept().await` returns `IncomingSession` — this is the FIRST step - `incoming_session.await` returns `SessionRequest` — this is the SECOND step - `session_request.accept().await` returns `Connection` — this is the THIRD step - `connection.open_uni().await?.await?` opens a unidirectional stream — **DOUBLE await** (first opens QUIC stream, second waits for readiness) - `connection.send_datagram(bytes)` sends an unreliable datagram (takes `impl Into`) - `connection.receive_datagram().await` receives a datagram - `stream.write_all(&data).await?` writes data to a stream - `stream.finish().await?` closes the stream gracefully - `self.frame_rx.recv()` is async (from `async_channel`) — no sync→async bridge needed - [ ] **Step 3: Add `mod transport;` to src/main.rs** - [ ] **Step 4: Verify compilation** Run: `cargo build` - [ ] **Step 5: Commit** ```bash git add -A && git commit -m "feat: WebTransport server with wtransport for browser streaming" ``` --- ## Task 6: Web UI — signaling.rs + static/* **Files:** - Create: `src/signaling.rs` - Create: `static/index.html` - Create: `static/player.js` - Create: `static/style.css` - [ ] **Step 1: Create static/index.html** ```html wl-webrtc
Connecting...
--
``` - [ ] **Step 2: Create static/style.css** ```css * { margin: 0; padding: 0; box-sizing: border-box; } body { background: #000; color: #fff; font-family: monospace; } #app { display: flex; flex-direction: column; height: 100vh; } #video-container { flex: 1; position: relative; display: flex; align-items: center; justify-content: center; } #video-canvas { max-width: 100%; max-height: 100%; } #status-overlay { position: absolute; top: 0; left: 0; right: 0; padding: 8px; background: rgba(0,0,0,0.7); text-align: center; } #controls { display: flex; justify-content: space-between; align-items: center; padding: 8px; background: #111; } #info { font-size: 12px; color: #aaa; } button { background: #333; color: #fff; border: 1px solid #555; padding: 6px 16px; cursor: pointer; } button:hover { background: #444; } ``` - [ ] **Step 3: Create static/player.js** ```javascript const canvas = document.getElementById('video-canvas'); const ctx = canvas.getContext('2d'); const statusEl = document.getElementById('status'); const infoEl = document.getElementById('info'); const fullscreenBtn = document.getElementById('fullscreen-btn'); // WebTransport server runs on HTTP/3 (UDP) on args.port. // HTTP static file server runs on TCP on args.port + 1. // When loaded from the HTTP server, location.port = args.port + 1. // WebTransport port is one less than the HTTP port. const WT_PORT = parseInt(location.port) - 1; const SERVER_URL = `https://${location.hostname}:${WT_PORT}/wt`; let decoder = null; let transport = null; let frameIdCounter = 0; let fragmentBuffer = new Map(); // frame_id -> { total, fragments: Map } let stats = { frames: 0, startTime: 0, latency: 0 }; let codecConfigured = false; // --- Fragment reassembly --- const HEADER_SIZE = 17; const TYPE_KEYFRAME_FRAG = 0x01; const TYPE_DELTA_FRAG = 0x02; const TYPE_KEYFRAME_COMPLETE = 0x03; const TYPE_DELTA_COMPLETE = 0x04; const TYPE_CODEC_CONFIG = 0x10; function parseHeader(data) { const view = new DataView(data.buffer, data.byteOffset, data.byteLength); return { frameType: data[0], frameId: view.getUint32(1), // pts_us: bytes 5-12 (i64 big-endian, but JS DataView only supports Float64/Int32) ptsUs: view.getBigInt64(5), seqNum: view.getUint16(13), totalFrags: view.getUint16(15), }; } function reassembleFrame(header, payload) { if (header.totalFrags === 1) { // Complete frame in single packet return payload; } // Buffer fragment if (!fragmentBuffer.has(header.frameId)) { fragmentBuffer.set(header.frameId, { total: header.totalFrags, fragments: new Map(), }); } const buf = fragmentBuffer.get(header.frameId); buf.fragments.set(header.seqNum, payload); // Check if complete if (buf.fragments.size === buf.total) { const sorted = [...buf.fragments.entries()].sort((a, b) => a[0] - b[0]); let totalLen = 0; for (const [, d] of sorted) totalLen += d.byteLength; const result = new Uint8Array(totalLen); let offset = 0; for (const [, d] of sorted) { result.set(new Uint8Array(d.buffer, d.byteOffset, d.byteLength), offset); offset += d.byteLength; } fragmentBuffer.delete(header.frameId); return result.buffer; } // Cleanup: evict old incomplete frames to prevent memory leak const MAX_BUFFER_SIZE = 32; if (fragmentBuffer.size > MAX_BUFFER_SIZE) { // Delete oldest entries (lowest frameId) const sortedIds = [...fragmentBuffer.keys()].sort((a, b) => a - b); const toDelete = sortedIds.slice(0, fragmentBuffer.size - MAX_BUFFER_SIZE); for (const id of toDelete) { fragmentBuffer.delete(id); } } // Not complete yet return null; } // --- WebCodecs Decoder --- function initDecoder(config) { decoder = new VideoDecoder({ output: (frame) => { canvas.width = frame.codedWidth; canvas.height = frame.codedHeight; ctx.drawImage(frame, 0, 0); frame.close(); stats.frames++; if (stats.startTime === 0) stats.startTime = performance.now(); const elapsed = (performance.now() - stats.startTime) / 1000; if (elapsed > 1) { const fps = (stats.frames / elapsed).toFixed(1); infoEl.textContent = `${frame.codedWidth}x${frame.codedHeight} | ${fps} fps`; } }, error: (e) => { console.error('Decoder error:', e); statusEl.textContent = `Decoder error: ${e.message}`; }, }); // CRITICAL: Configure WITHOUT description → Annex B mode. // Per W3C AVC WebCodecs Registration, providing description forces AVC (length-prefixed) // mode for ALL frames. Since our server sends Annex B (start-code-prefixed), // we must omit description and rely on in-band SPS/PPS (injected via `h264_metadata` BSF with `repeat_sps=1`/`repeat_pps=1`). decoder.configure({ codec: config.codec, codedWidth: config.width, codedHeight: config.height, // NO description field — Annex B mode. SPS/PPS arrive in-band with each keyframe. }); codecConfigured = true; statusEl.textContent = 'Streaming'; } // --- Connection --- async function connect() { statusEl.textContent = 'Connecting...'; try { transport = new WebTransport(SERVER_URL); await transport.ready; statusEl.textContent = 'Connected, waiting for stream...'; // Read from datagrams for delta frames readDatagrams(); // Read from unidirectional streams for keyframes and codec config readStreams(); } catch (e) { console.error('Connection failed:', e); statusEl.textContent = `Connection failed: ${e.message}. Retrying in 3s...`; setTimeout(connect, 3000); } } async function readDatagrams() { const reader = transport.datagrams.readable.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const header = parseHeader(new Uint8Array(value, 0, HEADER_SIZE)); const payload = value.slice(HEADER_SIZE); handleFrame(header, payload); } } catch (e) { console.error('Datagram read error:', e); } } async function readStreams() { try { const reader = transport.incoming_unidirectional_streams.getReader(); while (true) { const { value, done } = await reader.read(); if (done) break; const stream = value; const data = await readAll(stream); const header = parseHeader(new Uint8Array(data, 0, HEADER_SIZE)); const payload = data.slice(HEADER_SIZE); handleFrame(header, payload); } } catch (e) { console.error('Stream read error:', e); } } async function readAll(stream) { const reader = stream.readable.getReader(); const chunks = []; let totalLen = 0; while (true) { const { value, done } = await reader.read(); if (done) break; chunks.push(value); totalLen += value.byteLength; } const result = new Uint8Array(totalLen); let offset = 0; for (const chunk of chunks) { result.set(new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength), offset); offset += chunk.byteLength; } return result.buffer; } function handleFrame(header, payload) { if (header.frameType === TYPE_CODEC_CONFIG) { // Codec config message — contains codec name and dimensions only. // No AVCC description needed: we configure in Annex B mode (no `description` field). // SPS/PPS arrive in-band with each keyframe (injected via `h264_metadata` BSF). const config = JSON.parse(new TextDecoder().decode(payload)); initDecoder({ codec: config.codec, codedWidth: config.width, codedHeight: config.height, }); return; } if (!codecConfigured || !decoder || decoder.state === 'closed') return; const frameData = reassembleFrame(header, payload); if (!frameData) return; const isKeyframe = header.frameType === TYPE_KEYFRAME_COMPLETE || header.frameType === TYPE_KEYFRAME_FRAG; const chunk = new EncodedVideoChunk({ type: isKeyframe ? 'key' : 'delta', // i64 → Number conversion is safe: Number can represent integers up to 2^53 exactly. // Our PTS starts at 0 and increments by ~16,667μs/frame (60fps). // Overflow would take ~4.7 million years of continuous streaming. timestamp: Number(header.ptsUs), data: frameData, }); if (decoder.state === 'configured') { decoder.decode(chunk); } } // --- Fullscreen --- fullscreenBtn.addEventListener('click', () => { if (document.fullscreenElement) { document.exitFullscreen(); } else { document.getElementById('app').requestFullscreen(); } }); // --- Start --- connect(); ``` - [ ] **Step 4: Create src/signaling.rs** ```rust use anyhow::Result; use axum::{ body::Body, extract::State, http::{header, StatusCode}, response::{Html, IntoResponse, Response}, routing::get, Router, }; use rust_embed::Embed; use std::net::SocketAddr; #[derive(Embed)] #[folder = "static/"] struct Asset; /// Create the axum router for serving static files pub fn create_router() -> Router { Router::new() .route("/", get(index_handler)) .fallback(static_handler) } async fn index_handler() -> impl IntoResponse { static_handler(axum::extract::Path("index.html".to_string())).await } async fn static_handler(path: axum::extract::Path) -> impl IntoResponse { let path = path.0.trim_start_matches('/'); let path = if path.is_empty() { "index.html" } else { path }; match Asset::get(path) { Some(file) => { let mime = mime_guess::from_path(path).first_or_octet_stream(); Response::builder() .header(header::CONTENT_TYPE, mime.as_ref()) .body(Body::from(file.data.to_vec())) .unwrap() .into_response() } None => StatusCode::NOT_FOUND.into_response(), } } /// Run the HTTP server for static files on the given address. /// This should be spawned on the tokio runtime. pub async fn serve(addr: SocketAddr) -> Result<()> { let app = create_router(); let listener = tokio::net::TcpListener::bind(addr).await?; tracing::info!("HTTP server listening on {}", addr); axum::serve(listener, app).await?; Ok(()) } ``` - [ ] **Step 5: Add `mod signaling;` to src/main.rs** - [ ] **Step 6: Verify compilation** Run: `cargo build` - [ ] **Step 7: Commit** ```bash git add -A && git commit -m "feat: Web UI with WebCodecs player and static file serving" ``` --- ## Task 7: FFmpeg Hardware Context — avhw.rs **Files:** - Create: `src/avhw.rs` Manages FFmpeg hardware device and frame contexts for VAAPI/Vulkan encoding. **CRITICAL WARNING:** `ffmpeg-next` crate (v8.x) does NOT provide safe wrappers for `AVBufferRef` (hardware device context) or `AVHWFramesContext` (hardware frame context). The safe API covers codec lookup, encoder configuration, filter graph, and encode/decode flow. ALL hardware context operations MUST use raw FFI via `ffmpeg_next::ffi`. **Reference implementation:** See `wl-screenrec/src/avhw.rs` (https://github.com/russelltg/wl-screenrec/blob/main/src/avhw.rs) — this is the closest real-world reference for this exact pattern. **API References:** - ffmpeg-next FFI: `ffmpeg_next::ffi::*` — raw C API bindings - Key FFI functions: - `ffi::av_hwdevice_ctx_create(&mut ptr, device_type, device_path, opts, flags)` — create HW device - `ffi::av_hwdevice_ctx_create_derived(&mut dst, type, src, flags)` — derive HW device (e.g. DRM → VAAPI) - `ffi::av_hwframe_ctx_alloc(device_ref)` — allocate frame context on device - `ffi::av_hwframe_ctx_init(frames_ref)` — commit frame context parameters - `ffi::av_buffer_ref(ptr)` — increment reference count - `ffi::av_buffer_unref(&mut ptr)` — decrement reference count (for Drop) - `ffi::av_hwframe_get_buffer(frames_ref, frame, 0)` — allocate HW surface from pool - FFmpeg VAAPI encoding: https://ffmpeg.org/ffmpeg-codecs.html#vaapi - VA-API specification: https://01.org/linuxmedia/vaapi - [ ] **Step 1: Create src/avhw.rs** Implement hardware device and frame context management using ffmpeg-next FFI: Key types: ```rust use ffmpeg_next as ff; use ffmpeg_next::ffi as ffi; use std::ptr; /// Hardware encoding backend selection pub enum HwAccel { Vaapi, Vulkan, } /// Encode pixel format — drives three-way dispatch pub enum EncodePixelFormat { Vaapi(ff::PixelFormat), Vulkan(ff::PixelFormat), Sw(ff::PixelFormat), } /// Holds FFmpeg hardware device context. /// /// Wraps a raw *mut AVBufferRef. The ffmpeg-next safe API does NOT expose /// this type — we use raw FFI pointers with manual Drop. /// /// Reference: wl-screenrec/src/avhw.rs — same pattern pub struct AvHwDevCtx { ptr: *mut ffi::AVBufferRef, fmt: ff::PixelFormat, } // SAFETY: AVBufferRef is thread-safe (reference counted internally) unsafe impl Send for AvHwDevCtx {} impl AvHwDevCtx { /// Create VAAPI device context from DRM device path. /// /// Uses raw FFI: ffi::av_hwdevice_ctx_create() /// Internally calls vaGetDisplayDRM → vaInitialize pub fn new_vaapi(drm_device: &std::path::Path) -> Result { let device_cstr = std::ffi::CString::new(drm_device.to_str().unwrap())?; let mut ptr: *mut ffi::AVBufferRef = ptr::null_mut(); let ret = unsafe { ffi::av_hwdevice_ctx_create( &mut ptr, ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI, device_cstr.as_ptr(), ptr::null_mut(), 0, ) }; if ret < 0 { anyhow::bail!("Failed to create VAAPI device context: error code {}", ret); } Ok(Self { ptr, fmt: ff::PixelFormat::VAAPI, }) } /// Create Vulkan device context from DRM device path. /// /// Two-step process via raw FFI: /// 1. ffi::av_hwdevice_ctx_create(DRM, path) → drm_ctx /// 2. ffi::av_hwdevice_ctx_create_derived(VULKAN, drm_ctx) → vulkan_ctx /// 3. Release drm_ctx (derived context shares the GPU device) /// /// For self-referential FFmpeg internal structs, Vulkan requires /// Pin> for stable addresses (see wl-screenrec reference). pub fn new_vulkan(drm_device: &std::path::Path) -> Result { let device_cstr = std::ffi::CString::new(drm_device.to_str().unwrap())?; // Step 1: Create DRM device context let mut drm_ptr: *mut ffi::AVBufferRef = ptr::null_mut(); let ret = unsafe { ffi::av_hwdevice_ctx_create( &mut drm_ptr, ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_DRM, device_cstr.as_ptr(), ptr::null_mut(), 0, ) }; if ret < 0 { anyhow::bail!("Failed to create DRM device context: error code {}", ret); } // Step 2: Derive Vulkan context from DRM let mut vk_ptr: *mut ffi::AVBufferRef = ptr::null_mut(); let ret = unsafe { ffi::av_hwdevice_ctx_create_derived( &mut vk_ptr, ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VULKAN, drm_ptr, 0, ) }; // Release DRM context regardless of derive result unsafe { ffi::av_buffer_unref(&mut drm_ptr) }; if ret < 0 { anyhow::bail!("Failed to derive Vulkan device context: error code {}", ret); } Ok(Self { ptr: vk_ptr, fmt: ff::PixelFormat::VULKAN, }) } /// Get raw pointer for use in FFI calls (encoder, filter setup) pub fn as_ptr(&self) -> *mut ffi::AVBufferRef { self.ptr } /// Create a new reference (increment refcount) pub fn ref_clone(&self) -> *mut ffi::AVBufferRef { unsafe { ffi::av_buffer_ref(self.ptr) } } } impl Drop for AvHwDevCtx { fn drop(&mut self) { if !self.ptr.is_null() { unsafe { ffi::av_buffer_unref(&mut self.ptr) }; } } } /// Holds FFmpeg hardware frame context (used for both capture and encoding). /// /// Wraps a raw *mut AVBufferRef pointing to AVHWFramesContext. /// Must configure via raw pointer cast before calling av_hwframe_ctx_init(). /// /// Usage pattern: /// 1. ffi::av_hwframe_ctx_alloc(device_ctx.as_ptr()) → frames_ref /// 2. Cast frames_ref data to *mut AVHWFramesContext, set fields /// 3. ffi::av_hwframe_ctx_init(frames_ref) → commit pub struct AvHwFrameCtx { ptr: *mut ffi::AVBufferRef, } unsafe impl Send for AvHwFrameCtx {} impl AvHwFrameCtx { /// Create hardware frame context for DMA-BUF capture surfaces. /// /// Uses raw FFI: /// 1. ffi::av_hwframe_ctx_alloc(hw_device_ctx.as_ptr()) → frames_ref /// 2. Dereference frames_ref → AVHWFramesContext, set: /// - format = AV_PIX_FMT_VAAPI (or VULKAN) /// - sw_format = AV_PIX_FMT_NV12 (or equivalent) /// - width, height /// - initial_pool_size = 4 (double buffer + headroom) /// 3. ffi::av_hwframe_ctx_init(frames_ref) → commit pub fn for_capture( hw_device_ctx: &AvHwDevCtx, width: u32, height: u32, sw_format: ff::PixelFormat, ) -> Result { let ptr = unsafe { ffi::av_hwframe_ctx_alloc(hw_device_ctx.as_ptr()) }; if ptr.is_null() { anyhow::bail!("av_hwframe_ctx_alloc returned null"); } // Configure the frames context via raw pointer unsafe { let frames_ctx = (*ptr).data as *mut ffi::AVHWFramesContext; (*frames_ctx).format = hw_device_ctx.fmt.into(); (*frames_ctx).sw_format = sw_format.into(); (*frames_ctx).width = width as i32; (*frames_ctx).height = height as i32; (*frames_ctx).initial_pool_size = 4; } let ret = unsafe { ffi::av_hwframe_ctx_init(ptr) }; if ret < 0 { unsafe { ffi::av_buffer_unref(&mut ptr as *mut _) }; anyhow::bail!("av_hwframe_ctx_init failed: error code {}", ret); } Ok(Self { ptr }) } /// Create hardware frame context for encoder input surfaces. /// Same as for_capture but with encoder-compatible format (typically NV12). pub fn for_encode( hw_device_ctx: &AvHwDevCtx, width: u32, height: u32, sw_format: ff::PixelFormat, ) -> Result { // Same pattern as for_capture, possibly different pool size Self::for_capture(hw_device_ctx, width, height, sw_format) } /// Get raw pointer for assignment to encoder/filter hw_frames_ctx pub fn as_ptr(&self) -> *mut ffi::AVBufferRef { self.ptr } } impl Drop for AvHwFrameCtx { fn drop(&mut self) { if !self.ptr.is_null() { unsafe { ffi::av_buffer_unref(&mut self.ptr) }; } } } /// Encapsulates the full encoding state pub struct EncState { pub enc_video: ff::codec::encoder::Video, pub frames_rgb: crate::avhw::AvHwFrameCtx, // capture format pub frames_yuv: crate::avhw::AvHwFrameCtx, // encode format pub video_filter: ff::filter::Graph, pub hw_device_ctx: crate::avhw::AvHwDevCtx, pub starting_timestamp: Option, } impl EncState { /// Create the full encoding pipeline. /// /// Implementation steps: /// 1. Create AvHwDevCtx (VAAPI or Vulkan based on --hw-accel flag) /// 2. Create capture frame context (AvHwFrameCtx::for_capture) /// 3. Create encode frame context (AvHwFrameCtx::for_encode) /// 4. Create H.264 encoder: /// - Find codec: `ff::encoder::find_by_name("h264_vaapi")` or `"h264_vulkan"` /// - Create encoder from codec with hardware device context /// - Set parameters: width, height, pixel_format (VAAPI/Vulkan), bit_rate, gop_size, time_base /// - Open encoder /// 5. Build video filter graph (see filter.rs Task 8) /// 6. Wrap everything in EncState pub fn new( hw_accel: HwAccel, drm_device: &std::path::Path, width: u32, height: u32, bitrate: u64, gop_size: u32, fps: u32, ) -> Result { // Step-by-step construction as described above } } ``` The module must handle: - VAAPI device creation from DRM render node (`/dev/dri/renderD128`) - Vulkan device creation (two-step: DRM → Vulkan derivation, then release DRM) - Hardware frame context creation with correct parameters (capture vs encode) - Vulkan self-referential `Pin>` pattern for stable FFmpeg struct addresses - `EncodePixelFormat` three-way dispatch for codec selection, filter building, frame context creation - DRM device discovery from Wayland protocol data (the DRM device associated with the captured output) **Implementation notes:** - Use raw FFI via `ffmpeg_next::ffi` for ALL hardware context operations: - `ffi::av_hwdevice_ctx_create()` for VAAPI/DRM device creation - `ffi::av_hwframe_ctx_alloc()` + `ffi::av_hwframe_ctx_init()` for frame pool creation - `ffi::av_buffer_ref()` / `ffi::av_buffer_unref()` for reference counting - `(*ctx_as_mut_ptr()).hw_device_ctx = ffi::av_buffer_ref(...)` to assign HW context - Use `ff::codec::encoder::find_by_name("h264_vaapi")` to find the VAAPI H.264 encoder - For the encoder, set `pixel_format` to the hardware format (e.g. `ff::PixelFormat::VA_API`) and `hw_frames_ctx` on the encoder's `codec_context` - **IMPORTANT**: Use `h264_metadata` bitstream filter with `repeat_sps=1` and `repeat_pps=1` to guarantee SPS/PPS in every IDR frame. Note: `repeat_headers=1` is libx264-only and does NOT work with `h264_vaapi`. - DRM device path can be discovered from the Wayland compositor's `zwp_linux_dmabuf_feedback_v1` or defaulted to `/dev/dri/renderD128` - [ ] **Step 2: Add `mod avhw;` to src/main.rs** - [ ] **Step 3: Verify compilation** Run: `cargo build` Note: FFmpeg linking errors are expected if FFmpeg dev libraries aren't installed. Ensure `libavcodec-dev`, `libavformat-dev`, `libavfilter-dev`, `libavutil-dev`, `libswscale-dev` are installed. - [ ] **Step 4: Commit** ```bash git add -A && git commit -m "feat: FFmpeg hardware device and frame context management" ``` --- ## Task 8: FFmpeg Filter Graph — filter.rs **Files:** - Create: `src/filter.rs` Builds the GPU video filter pipeline (crop → scale → transpose) using FFmpeg's libavfilter via ffmpeg-next. **API References:** - ffmpeg-next filter API: https://docs.rs/ffmpeg-next — `filter::Graph`, `filter::Context` - FFmpeg libavfilter docs: https://ffmpeg.org/ffmpeg-filters.html — `crop`, `scale_vaapi`, `scale_vulkan`, `transpose_vaapi`, `transpose_vulkan` - [ ] **Step 1: Create src/filter.rs** Implement video filter graph construction: ```rust use ffmpeg_next as ff; use crate::avhw::EncodePixelFormat; /// Build the video filter graph for the capture → encode pipeline. /// /// Pipeline: buffersrc (HW) → crop → scale → [transpose] → buffersink /// /// Implementation: /// 1. Create `ff::filter::Graph::new()` /// 2. Add `buffer` source filter with `hw_frames_ctx` set to the capture frame context: /// - `graph.add_filter(&ff::filter::find("buffer").unwrap(), "in")` /// - Set params: `video_size`, `pixel_format`, `time_base`, `frame_rate` /// - Critical: set `hw_frames_ctx` on the buffersrc parameters to enable HW-accelerated filtering /// 3. Add `crop` filter if ROI specified: /// - `graph.add_filter(&ff::filter::find("crop").unwrap(), "crop")` /// - Set params: `x`, `y`, `w`, `h` /// - Use `exact=1` parameter to avoid rounding issues at edges /// 4. Add `scale` filter (GPU-accelerated): /// - VAAPI: `ff::filter::find("scale_vaapi")` — scales in GPU memory /// - Vulkan: `ff::filter::find("scale_vulkan")` — scales in GPU memory /// - Set `w` and `h` to encoder input dimensions /// 5. Add `transpose` filter if output is rotated 90/270 degrees: /// - VAAPI: `ff::filter::find("transpose_vaapi")` /// - Vulkan: `ff::filter::find("transpose_vulkan")` /// - Set `dir` parameter based on transform type /// 6. Add `buffersink` output filter with `hw_frames_ctx` set to encode frame context: /// - `graph.add_filter(&ff::filter::find("buffersink").unwrap(), "out")` /// - Set `hw_frames_ctx` on the buffersink to ensure output is in the encoder's format /// 7. Link all filters in sequence: `src → crop → scale → [transpose] → sink` /// 8. Call `graph.validate()` to check the graph is valid pub fn build_video_filter( enc: &mut ff::codec::encoder::Video, frames_rgb: &ff::HardwareFrameContext, frames_yuv: &ff::HardwareFrameContext, crop_rect: Option<(i32, i32, i32, i32)>, needs_transpose: bool, pixel_format: &EncodePixelFormat, ) -> Result { // Build graph as described above } ``` **Implementation notes:** - The filter graph operates entirely on GPU memory — no CPU copies involved - `hw_frames_ctx` must be set on BOTH buffersrc and buffersink for hardware filtering - Scale filter names are backend-specific (`scale_vaapi` vs `scale_vulkan`), dispatch via `EncodePixelFormat` - The `crop` filter with `exact=1` avoids pixel alignment issues at ROI boundaries - Transpose is only needed for 90/270 degree rotations (check via `transform.rs` helpers) - After building, call `graph.validate()` to ensure correct linkage - [ ] **Step 2: Add `mod filter;` to src/main.rs** - [ ] **Step 3: Verify compilation** Run: `cargo build` - [ ] **Step 4: Commit** ```bash git add -A && git commit -m "feat: GPU video filter graph for crop/scale/transpose" ``` --- ## Task 9: Capture Backends — cap_*.rs **Files:** - Create: `src/cap_ext_image_copy.rs` - Create: `src/cap_wlr_screencopy.rs` Implement two Wayland screen capture backends using their respective protocol specifications. Both implement the `CaptureSource` trait. **API References:** - Wayland client crate: https://docs.rs/wayland-client — `Connection`, `GlobalList`, `Dispatch` trait - wlr-screencopy protocol: `wayland-protocols-wlr` crate — `zwlr_screencopy_manager_v1`, `zwlr_screencopy_frame_v1` - ext-image-copy-capture protocol: `wayland-protocols` crate (with `staging` feature) — `ext_image_copy_capture_manager_v1`, `ext_image_copy_capture_session_v1`, `ext_image_copy_capture_frame_v1` - DMA-BUF protocol: `zwp_linux_dmabuf_v1` (in `wayland-protocols` crate) — buffer creation and format negotiation - [ ] **Step 1: Define CaptureSource trait in src/state.rs (create minimal file)** ```rust use bytes::Bytes; use wayland_client::protocol::wl_output::WlOutput; /// Unified interface for screen capture backends pub trait CaptureSource: Sized + 'static { /// Frame type specific to this backend type Frame; /// Create a new capture source from Wayland globals and target output. /// /// Implementation: Use `wayland_client::globals::GlobalList` to bind the protocol global, /// then create a capture session targeting the specified `WlOutput`. fn new( gm: &wayland_client::globals::GlobalList, output: &WlOutput, output_info: &crate::OutputInfo, ) -> Result; /// Allocate a frame for capture. Returns Some(frame) for synchronous /// backends (ext-image-copy), None for async (wlr-screencopy). fn alloc_frame(&mut self) -> Option; /// Submit DMA-BUF buffer to compositor for capture. /// /// Implementation: Create a `wl_buffer` from the DMA-BUF fd using `zwp_linux_dmabuf_v1`, /// then pass it to the capture protocol's capture request. fn queue_copy(&self, buffer: wayland_client::protocol::wl_buffer::WlBuffer); /// Callback when frame is done being used fn on_done_with_frame(&self, frame: Self::Frame); } ``` - [ ] **Step 2: Create src/cap_ext_image_copy.rs** Implement ext-image-copy-capture backend (~240 lines). **Protocol overview** (ext-image-copy-capture-v1): - `ext_image_copy_capture_manager_v1` — global for creating sessions - `ext_image_copy_capture_session_v1` — session object, created for a specific output - `ext_image_copy_capture_frame_v1` — per-frame capture object - Session-level format negotiation (not per-frame) — format is negotiated once and reused - Synchronous allocation: `alloc_frame()` returns `Some(frame)` immediately - Supports real modifier list via `zwp_linux_dmabuf_feedback_v1` (not hardcoded LINEAR) **Implementation steps:** 1. Bind `ext_image_copy_capture_manager_v1` global from `GlobalList` 2. Create session for target output: `manager.create_session(output)` 3. Implement `Dispatch` trait for `ext_image_copy_capture_session_v1`: - Handle `format` event — receive negotiated DMA-BUF format and modifier - Handle `constraints` event — receive size and modifier constraints 4. Implement `Dispatch` trait for `ext_image_copy_capture_frame_v1`: - Handle `ready` event — frame data written to DMA-BUF - Handle `failed` event — capture failed 5. `alloc_frame()` — allocate from hardware frame pool, return `Some(frame)` 6. `queue_copy()` — create `wl_buffer` via `zwp_linux_dmabuf_v1::create_params`, pass DMA-BUF fd, submit frame capture Key types: - `CapExtImageCopy` struct holding session, format state, constraints - `ExtImageCopyFrame` — the frame type for this backend - [ ] **Step 3: Create src/cap_wlr_screencopy.rs** Implement wlr-screencopy backend (~165 lines). **Protocol overview** (zwlr-screencopy-unstable-v1): - `zwlr_screencopy_manager_v1` — global for creating frame captures - `zwlr_screencopy_frame_v1` — per-frame capture object (created fresh each frame) - Per-frame format negotiation — format is negotiated for each capture - Asynchronous allocation: `alloc_frame()` returns `None` — frame object triggers allocation - Uses hardcoded `DrmModifier::LINEAR` for buffer creation **Implementation steps:** 1. Bind `zwlr_screencopy_manager_v1` global from `GlobalList` 2. Implement `Dispatch` trait for `zwlr_screencopy_frame_v1`: - Handle `buffer` event — receive format, width, height, stride - Handle `ready` event — frame data written to buffer, receive timestamp - Handle `failed` event — capture failed 3. For each frame: - Create new `zwlr_screencopy_frame_v1` via `manager.capture_output(output)` - On `buffer` event: negotiate format, create DMA-BUF buffer - On `ready` event: frame is complete 4. `alloc_frame()` — returns `None` (async; frame object is the trigger) 5. `queue_copy()` — submit the DMA-BUF buffer to the frame object Key types: - `CapWlrScreencopy` struct holding manager, frame state - `WlrScreencopyFrame` — the frame type - [ ] **Step 4: Add modules to src/main.rs** ```rust mod cap_ext_image_copy; mod cap_wlr_screencopy; mod state; ``` - [ ] **Step 5: Verify compilation** Run: `cargo build` Note: Wayland protocol code may require protocol XML files. Check wayland-protocols crate features. - [ ] **Step 6: Commit** ```bash git add -A && git commit -m "feat: Wayland capture backends (ext-image-copy + wlr-screencopy)" ``` --- ## Task 10: State Machine — state.rs **Files:** - Modify: `src/state.rs` (expand from Task 9 minimal version) Implement the full state machine with `EncConstructionStage`, `InFlightSurface`, and the `State` struct. **Design reference:** See architecture spec §4 for state diagrams and transitions. - [ ] **Step 1: Expand src/state.rs with full state machine** ```rust use crate::avhw::EncState; use crate::nalu::EncodedFrame; use crate::CaptureSource; use async_channel::Sender; /// Output information collected during probing pub struct OutputInfo { pub name: String, pub transform: crate::transform::Transform, pub width: i32, pub height: i32, pub scale: f64, } /// Partial output info during the ProbingOutputs stage pub struct PartialOutputInfo { pub name: Option, pub transform: Option, pub width: Option, pub height: Option, pub scale: Option, pub wl_output: wayland_client::protocol::wl_output::WlOutput, } impl PartialOutputInfo { pub fn is_complete(&self) -> bool { self.name.is_some() && self.transform.is_some() && self.width.is_some() && self.height.is_some() } pub fn into_output_info(self) -> Option { Some(OutputInfo { name: self.name?, transform: self.transform?, width: self.width?, height: self.height?, scale: self.scale.unwrap_or(1.0), }) } } /// State machine for encoder construction pub enum EncConstructionStage { /// Discovering Wayland outputs ProbingOutputs { outputs: Vec, }, /// Outputs known, waiting for format negotiation EverythingButFmt { output_info: OutputInfo, hw_device_ctx: crate::avhw::AvHwDevCtx, cap: S, }, /// Fully operational — streaming Streaming { output_info: OutputInfo, enc: EncState, cap: S, frame_tx: Sender, }, /// Output disconnected, waiting for reconnection OutputWentAway { output_name: String, enc: EncState, frame_tx: Sender, }, /// Transient state during transitions (mem::replace target) Intermediate, } impl EncConstructionStage { /// Extract EncState, consuming self. Used during state transitions /// that preserve the encoder while discarding capture objects. pub fn take_enc(self) -> Option<(EncState, Sender)> { match self { Self::Streaming { enc, frame_tx, .. } => Some((enc, frame_tx)), Self::OutputWentAway { enc, frame_tx, .. } => Some((enc, frame_tx)), _ => None, } } } /// Frame capture lifecycle — tracks the state of the single in-flight surface. /// At most one frame is being processed at any time to prevent buffer exhaustion. pub enum InFlightSurface { /// No frame in flight — ready to start next capture None, /// Frame allocation requested, waiting for GPU surface AllocQueued, /// Surface allocated from hardware frame pool Allocd(S::Frame), /// Buffer submitted to compositor, waiting for capture completion CopyQueued { surface: ffmpeg_next::frame::Video, drm_map: ffmpeg_next::ffi::AVDRMFrameDescriptor, // DMA-BUF mapping descriptor frame: S::Frame, buffer: wayland_client::protocol::wl_buffer::WlBuffer, }, } /// Global state parameterized by capture backend pub struct State { pub enc: EncConstructionStage, pub in_flight_surface: InFlightSurface, pub starting_timestamp: Option, pub args: crate::args::Args, pub frame_tx: Sender, pub errored: bool, pub output_went_away: bool, pub format_change: bool, pub gm: wayland_client::globals::GlobalList, pub xdg_output_manager: Option, pub fps_limit: Option>, } impl State { /// Create initial state in ProbingOutputs stage pub fn new( gm: wayland_client::globals::GlobalList, args: crate::args::Args, frame_tx: Sender, qh: &wayland_client::QueueHandle>, ) -> Self { Self { enc: EncConstructionStage::ProbingOutputs { outputs: Vec::new(), }, in_flight_surface: InFlightSurface::None, starting_timestamp: None, args, frame_tx, errored: false, output_went_away: false, format_change: false, gm, xdg_output_manager: None, fps_limit: None, } } /// Frame captured successfully — run through filter + encode, send to transport. /// /// Pipeline: captured HW frame → video_filter → encoder → EncodedFrame → channel pub fn on_copy_complete(&mut self) -> anyhow::Result<()> { // 1. Extract the captured frame from InFlightSurface // - assert!(matches!(in_flight_surface, CopyQueued{..})) // - Take ownership of surface, mark InFlightSurface::None // // 2. Push captured HW frame into filter graph (buffersrc) // - enc.video_filter.get("in").source().add(&surface) // // 3. Pull filtered frame from filter graph (buffersink) // - enc.video_filter.get("out").sink().frame(&mut filtered) // // 4. Send to encoder // - enc.enc_video.send_frame(&filtered_frame) // // 5. Receive encoded packet // - enc.enc_video.receive_packet(&mut packet) // - Loop until receive_packet returns EAGAIN // // 6. Check if keyframe via NAL type inspection (crate::nalu::is_keyframe) // // 7. Wrap as EncodedFrame and send through frame_tx // - IMPORTANT: Use try_send(), NOT send() or send_blocking()! // try_send() is non-blocking: returns Err(TrySendError::Full(_)) if channel full. // On full channel, log and drop the frame (acceptable for real-time streaming). // Do NOT use send_blocking() — it would stall the mio capture pipeline. // - frame_tx.try_send(EncodedFrame { // data, pts_us, duration: 1_000_000/fps μs, frame_type, width, height // }) // // 8. Return InFlightSurface to None (ready for next frame) Ok(()) } /// Frame capture failed — handle errors. pub fn on_copy_fail(&mut self) { // 1. Check output_went_away flag // - If true: transition to OutputWentAway (preserve encoder + transport, drop capture) // 2. Check format_change flag // - If true: rebuild filter + frame contexts (preserve device ctx + transport) // 3. Otherwise: log error, set errored flag } /// Frame allocated — create DMA-BUF buffer and queue capture. pub fn on_frame_allocd(&mut self, frame: S::Frame) { // 1. Create wl_buffer from frame's DMA-BUF fd using zwp_linux_dmabuf_v1 // 2. Queue capture: cap.queue_copy(buffer) // 3. Transition InFlightSurface: Allocd → CopyQueued } /// Initiate frame capture cycle — call when in Streaming state and no frame in flight. pub fn queue_alloc_frame(&mut self) { // 1. assert!(matches!(in_flight_surface, None)) // 2. Call cap.alloc_frame() // - For ext-image-copy: returns Some(frame) → immediately call on_frame_allocd // - For wlr-screencopy: returns None → set InFlightSurface::AllocQueued // 3. Transition InFlightSurface: None → AllocQueued or Allocd } /// Negotiate DMA-BUF format with compositor. /// Called during EverythingButFmt → Streaming transition. pub fn negotiate_format(&mut self) -> anyhow::Result<()> { // 1. Determine capture format from compositor feedback // 2. Create hardware frame contexts (capture + encode) // 3. Build video filter graph // 4. Create encoder with negotiated parameters // 5. Transition EverythingButFmt → Streaming Ok(()) } } ``` - [ ] **Step 2: Verify compilation** Run: `cargo build` - [ ] **Step 3: Commit** ```bash git add -A && git commit -m "feat: state machine with EncConstructionStage and InFlightSurface" ``` --- ## Task 11: Main Event Loop — main.rs **Files:** - Modify: `src/main.rs` (replace stub with full implementation) Wire everything together: CLI → Wayland connection → state machine → mio event loop → tokio runtime. - [ ] **Step 1: Replace src/main.rs with full implementation** ```rust mod args; mod avhw; mod cap_ext_image_copy; mod cap_wlr_screencopy; mod filter; mod fps_limit; mod nalu; mod signaling; mod state; mod transform; mod transport; use args::Args; use cap_ext_image_copy::CapExtImageCopy; use cap_wlr_screencopy::CapWlrScreencopy; use clap::Parser; use async_channel; use mio::{Events, Interest, Poll, Token}; use state::{State, EncConstructionStage}; use std::os::unix::io::AsRawFd; const TOKEN_WAYLAND: Token = Token(0); const TOKEN_QUIT: Token = Token(1); fn main() -> anyhow::Result<()> { let args = Args::parse(); // Initialize logging if args.verbose { tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); } else { tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init(); } // Create async_channel for encoded frames (capture pipeline → transport) // async_channel provides both sync try_send() and async recv() let (frame_tx, frame_rx) = async_channel::bounded::(16); // Spawn tokio runtime for transport + web UI let tokio_rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let bind_addr = format!("{}:{}", args.bind, args.port).parse::()?; let http_addr = format!("{}:{}", args.bind, args.port + 1).parse::()?; // Start transport server on tokio tokio_rt.spawn(async move { if let Err(e) = async { let server = transport::TransportServer::new(bind_addr, frame_rx)?; server.run().await } .await { tracing::error!("Transport server error: {}", e); } }); // Start HTTP server for Web UI on tokio tokio_rt.spawn(async move { if let Err(e) = signaling::serve(http_addr).await { tracing::error!("HTTP server error: {}", e); } }); // Connect to Wayland compositor // wayland-client 0.31 API: Connection (not deprecated Display) let conn = wayland_client::Connection::connect_to_env()?; // Use registry_queue_init for initial global discovery with () state. // This returns (GlobalList, EventQueue<()>). let (gm, mut probe_queue) = wayland_client::globals::registry_queue_init::<()>(&conn)?; let _probe_qh = probe_queue.handle(); // Determine which capture backends are available // Priority: ext-image-copy-capture > wlr-screencopy let has_ext: bool = gm.instances().any(|(iface, _)| { iface == "ext_image_copy_capture_manager_v1" }); let has_wlr: bool = gm.instances().any(|(iface, _)| { iface == "zwlr_screencopy_manager_v1" }); if !has_ext && !has_wlr { anyhow::bail!( "No supported screen capture protocol found. \ Need ext-image-copy-capture-v1 or zwlr-screencopy-unstable-v1." ); } tracing::info!( "Capture backends: ext-image-copy={}, wlr-screencopy={}", has_ext, has_wlr ); // Set up mio poll for Wayland fd + signals let mut poll = Poll::new()?; let mut events = Events::with_capacity(1024); // Register Wayland fd with mio let wl_fd = conn.fd(); let mut wl_event_source = mio::unix::SourceFd(&wl_fd); poll.registry().register( &mut wl_event_source, TOKEN_WAYLAND, Interest::READABLE, )?; // Set up signal handling via signal-hook + mio let mut signals = signal_hook::iterator::Signals::new([ signal_hook::consts::SIGINT, signal_hook::consts::SIGTERM, ])?; let signal_fd = signals.as_raw_fd(); let mut signal_source = mio::unix::SourceFd(&signal_fd); poll.registry().register( &mut signal_source, TOKEN_QUIT, Interest::READABLE, )?; // Monomorphized backend selection: run the event loop with the chosen backend. // Each branch creates a typed EventQueue> and runs the same event loop. // This avoids the CaptureBackend enum dispatch problem — the Wayland Dispatch trait // requires compile-time monomorphization (State vs State). if has_ext { run_event_loop::(conn, gm, args, frame_tx, &mut poll, &mut events)?; } else { run_event_loop::(conn, gm, args, frame_tx, &mut poll, &mut events)?; } // Cleanup tokio_rt.shutdown_background(); tracing::info!("wl-webrtc stopped"); Ok(()) } /// Generic event loop, monomorphized per capture backend. /// Both branches compile the same event loop code with the concrete State type. fn run_event_loop( conn: wayland_client::Connection, gm: wayland_client::globals::GlobalList, args: Args, frame_tx: async_channel::Sender, poll: &mut Poll, events: &mut Events, ) -> anyhow::Result<()> { // Create typed event queue for the actual event loop. // wayland-client 0.31: Connection::new_event_queue::() creates EventQueue let mut event_queue = conn.new_event_queue::>(); let qh = event_queue.handle(); // Create initial state in ProbingOutputs stage let mut state = State::::new(gm, args, frame_tx, &qh); tracing::info!("wl-webrtc event loop starting (backend: {})", std::any::type_name::()); // Main event loop let mut running = true; 'main: loop { if !running { break 'main; } // Poll with timeout to prevent blocking forever poll.poll(events, Some(std::time::Duration::from_millis(100)))?; for event in events.iter() { match event.token() { TOKEN_QUIT => { tracing::info!("Shutdown signal received"); running = false; break 'main; } TOKEN_WAYLAND => { // Dispatch Wayland events — this drives: // output probing, format negotiation, frame capture callbacks event_queue.dispatch_pending(&mut state, |_, _, _| {})?; } _ => {} } } // After dispatch, initiate next frame capture if ready if matches!(state.enc, EncConstructionStage::Streaming { .. }) && matches!(state.in_flight_surface, state::InFlightSurface::None) { state.queue_alloc_frame(); } // Check for fatal errors if state.errored { tracing::error!("Fatal error in capture pipeline, exiting"); break; } // Flush any pending Wayland requests event_queue.flush()?; } Ok(()) } ``` **Implementation notes:** 1. **Wayland-client 0.31 API**: Uses `Connection::connect_to_env()` (not deprecated `Display`), `registry_queue_init()` for global discovery, and `conn.new_event_queue::()` for typed event queues. See `wl-screenrec/src/main.rs` for the reference pattern. 2. **Monomorphized backend selection** (NOT enum dispatch): The `run_event_loop::()` function is generic over the capture backend. At startup, we call it with either `CapExtImageCopy` or `CapWlrScreencopy` based on available Wayland globals. This is the same pattern wl-screenrec uses — the Wayland `Dispatch` trait requires compile-time monomorphization. The `CaptureBackend` enum dispatch pattern from earlier drafts was incorrect because `Dispatch` implementations are per-`State` type, not per-enum-variant. 3. **Signal handling** uses `signal-hook` crate with mio integration. SIGINT/SIGTERM trigger clean shutdown. 4. **The event loop** is straightforward: - `mio::poll` blocks until Wayland fd is readable or signal arrives - `event_queue.dispatch_pending` processes all pending Wayland events (frame captures, output changes, etc.) - After dispatch, check if a new frame capture should be initiated - 100ms timeout prevents indefinite blocking if no events arrive 5. **Dependency**: `signal-hook` already in Cargo.toml from Task 1. - [ ] **Step 2: Verify full project compiles** Run: `cargo build` - [ ] **Step 3: Commit** ```bash git add -A && git commit -m "feat: main event loop with Wayland capture and tokio transport" ``` --- ## Task 12: Integration Testing **Files:** - Create: `tests/integration_test.rs` - [ ] **Step 1: Create integration test scaffolding** ```rust /// Integration test: verify the full pipeline can start. /// This test requires: /// - A running Wayland compositor (or CI with weston headless) /// - FFmpeg with VAAPI support (or software fallback) /// - A display to capture /// /// The test is tagged #[ignore] for CI environments without hardware. #[cfg(test)] mod tests { use std::process::Command; #[test] #[ignore] fn test_binary_starts() { // Verify the binary starts and doesn't crash immediately let output = Command::new("cargo") .args(["run", "--", "--help"]) .output() .expect("Failed to run wl-webrtc"); assert!(output.status.success()); let stdout = String::from_utf8_lossy(&output.stdout); assert!(stdout.contains("wl-webrtc")); assert!(stdout.contains("port")); assert!(stdout.contains("codec")); } #[test] fn test_transport_server_binds() { // Test that QUIC server can bind to a port // (without Wayland, just the transport layer) let (tx, rx) = async_channel::bounded(16); // This should succeed in creating a QUIC endpoint // (tested in a separate thread since it needs tokio) let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { // Can't fully test without Wayland, but verify types compile let _ = tx; }); } } ``` - [ ] **Step 2: Run unit tests** Run: `cargo test` Expected: fps_limit tests pass, nalu tests pass, integration tests ignored - [ ] **Step 3: Verify full build in release mode** Run: `cargo build --release` Expected: Compiles successfully - [ ] **Step 4: Final commit** ```bash git add -A && git commit -m "feat: integration test scaffolding and release build verification" ``` --- ## Self-Review Checklist - [x] **Spec coverage**: Every module in spec Section 2.3 has a corresponding task - [x] **Placeholder scan**: No TBD/TODO in step descriptions — all steps have concrete code or explicit implementation guidance referencing official protocol specs and public API documentation - [x] **Known gaps**: - Task 7 (avhw.rs) and Task 8 (filter.rs) require deep FFmpeg API knowledge. Code shown is structural, not complete implementation. The actual implementation requires working with raw FFI bindings via `ffmpeg_next::ffi` for hardware context management (AVBufferRef, AVHWFramesContext). See `wl-screenrec/src/avhw.rs` for reference pattern. - Task 11 (main.rs) uses monomorphized `run_event_loop::()` generic function — one branch per capture backend. This avoids the CaptureBackend enum dispatch problem (Wayland `Dispatch` trait requires compile-time monomorphization). - `wtransport` crate handles HTTP/3 + WebTransport protocol internally. Browser `new WebTransport()` works directly — no manual h3 integration needed. **Implementation complexity notes**: - `ffmpeg-next` crate (v8.1.0) does NOT provide safe wrappers for `AVBufferRef` (hardware device context) or `AVHWFramesContext` (hardware frame context). These operations require using the raw FFI bindings via `ffmpeg_next::ffi`. Specifically: `ffi::av_hwdevice_ctx_create()`, `ffi::av_hwframe_ctx_alloc()`, `ffi::av_hwframe_ctx_init()`, `ffi::av_hwframe_transfer_data()` — transfers between DMA-BUF and VAAPI surfaces. `(*ctx.as_mut_ptr()).hw_device_ctx = ffi::av_buffer_ref(...)` to assign HW context to encoder/filter. The safe API covers: codec lookup, encoder configuration, filter graph, and encode/decode flow. - **Encoder MUST use `h264_metadata` BSF** with `repeat_sps=1` and `repeat_pps=1` to guarantee SPS/PPS in every IDR frame. Note: `repeat_headers=1` is libx264-only and does NOT work with `h264_vaapi`. Required for WebCodecs Annex B mode on the browser side. - Task 11 (main.rs) uses monomorphized `run_event_loop::()` — NOT enum dispatch. Wayland's `Dispatch` trait requires compile-time monomorphization. See wl-screenrec for the reference pattern. - `async_channel::bounded(16)` provides both sync `try_send()` (from mio main thread) and async `recv()` (from tokio transport). Main thread uses `try_send()` — if channel is full, the frame is dropped and logged. This prevents GPU pipeline stalls. **Do NOT use `send_blocking()`** — it would stall the mio capture pipeline. - `wtransport` crate (v0.7) provides complete WebTransport-over-HTTP/3 server. No raw quinn or h3 integration needed. Browser `new WebTransport(url)` works directly. - **WebCodecs Annex B mode**: Browser configures `VideoDecoder` WITHOUT `description` field. Per W3C AVC WebCodecs Registration, omitting `description` activates Annex B mode — all `EncodedVideoChunk` data must use start-code-prefixed NAL units (which our encoder outputs). The `description` field forces AVC (length-prefixed) mode — do NOT provide it. ## Implementation Gap Notes > **WARNING**: The following tasks require significant implementation work not fully specified in the code blocks above. An implementing agent MUST consult the referenced source code for complete patterns. ### Task 7 (avhw.rs) — FFmpeg FFI Completeness The `EncState::new()` method is described structurally but not fully implemented. The implementing agent MUST: 1. Provide complete `EncState::new()` with all FFI calls verified against `wl-screenrec/src/avhw.rs` 2. Implement the `AvHwDevCtx` → encoder `hw_frames_ctx` assignment via raw FFI 3. Implement the Vulkan `Pin>` self-referential pattern 4. Add `h264_metadata` BSF to the encoding pipeline for SPS/PPS injection 5. Verify VAAPI output at runtime by test-encoding one frame and inspecting NAL units ### Task 9 (cap_*.rs) — Wayland Dispatch Implementations The capture backend descriptions omit the `Dispatch` trait implementations (~400 lines total). The implementing agent MUST: 1. Provide complete `Dispatch` implementations for all Wayland protocol objects 2. For ext-image-copy: Dispatch for `ExtImageCopyCaptureSessionV1`, `ExtImageCopyCaptureFrameV1`, `ZwpLinuxDmabufFeedbackV1` 3. For wlr-screencopy: Dispatch for `ZwlrScreencopyFrameV1` 4. Dispatch for `WlOutput` and `XdgOutputManagerV1`/`XdgOutputV1` for output probing 5. Reference `wl-screenrec` for the complete pattern ### Task Order Note The plan implements Task 5-6 (transport/signaling) before Task 7-8 (FFmpeg). This is correct because `transport.rs` depends only on `nalu.rs`, NOT on `avhw.rs`. The channel-based architecture decouples the two pipelines — transport and encoding can be developed in parallel.