Files
wl-webrtc/DESIGN_CN.md
2026-02-03 11:14:25 +08:00

82 KiB
Raw Blame History

Wayland → WebRTC 远程桌面后端

技术设计文档

目录

  1. 系统架构
  2. 技术栈
  3. 核心组件设计
  4. 数据流优化
  5. 低延迟优化
  6. 实施路线图
  7. 潜在挑战与解决方案

系统架构

高层架构

┌─────────────────────────────────────────────────────────────────────┐
│                          客户端浏览器                              │
│                    (WebRTC 接收端)                                │
└─────────────────────────────┬───────────────────────────────────────┘
                               │ WebRTC (UDP/TCP)
                               │ 信令 (WebSocket/HTTP)
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                        信令服务器                                  │
│                    (WebSocket/WebSocket Secure)                     │
│                      - 会话管理                                    │
│                      - SDP 交换                                     │
│                      - ICE 候选                                    │
└─────────────────────────────┬───────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    Rust 后端服务器                                   │
├─────────────────────────────────────────────────────────────────────┤
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   捕获       │    │   编码器     │    │  WebRTC      │          │
│  │   管理器     │───▶│  管道        │───▶│  传输层      │          │
│  └──────────────┘    └──────────────┘    └──────────────┘          │
│         │                  │                  │                      │
│         ▼                  ▼                  ▼                      │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   PipeWire   │    │   视频       │    │  数据        │          │
│  │   Portal     │    │   编码器     │    │  通道        │          │
│  │   (xdg-      │    │   (H.264/    │    │  (输入/      │          │
│  │   desktop-   │    │   H.265/VP9) │    │   控制)      │          │
│  │   portal)    │    └──────────────┘    └──────────────┘          │
│  └──────────────┘                                                     │
│                                                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                零拷贝缓冲区管理器                           │    │
│  │           - DMA-BUF 导入/导出                              │    │
│  │           - 共享内存池                                     │    │
│  │           - 内存所有权跟踪                                 │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    Wayland 合成器                                   │
│                  (PipeWire 屏幕共享)                               │
└─────────────────────────────────────────────────────────────────────┘

组件详解

1. 捕获管理器

职责:

  • 与 PipeWire xdg-desktop-portal 接口
  • 请求屏幕捕获权限
  • 接收 DMA-BUF 帧数据
  • 管理帧缓冲区生命周期

关键技术:

  • pipewire crate 用于 PipeWire 协议
  • wayland-client 用于 Wayland 协议
  • ashpd 用于桌面门户
pub struct CaptureManager {
    pipewire_connection: Rc<PipewireConnection>,
    stream_handle: Option<StreamHandle>,
    frame_sender: async_channel::Sender<CapturedFrame>,
    config: CaptureConfig,
}

pub struct CaptureConfig {
    pub frame_rate: u32,
    pub quality: QualityLevel,
    pub screen_region: Option<ScreenRegion>,
}

pub enum QualityLevel {
    Low,
    Medium,
    High,
    Ultra,
}

pub struct CapturedFrame {
    pub dma_buf: DmaBufHandle,
    pub width: u32,
    pub height: u32,
    pub format: PixelFormat,
    pub timestamp: u64,
}

2. 编码器管道

职责:

  • 从捕获端接收原始帧
  • 编码为 H.264/H.265/VP9
  • 硬件加速VA-API、NVENC、VideoToolbox
  • 比特率自适应

零拷贝策略:

  • DMA-BUF 直接输入编码器(无 CPU 拷贝)
  • 编码器输出到内存映射缓冲区
  • WebRTC 直接消费编码缓冲区
pub struct EncoderPipeline {
    encoder: Box<dyn VideoEncoder>,
    config: EncoderConfig,
    stats: EncoderStats,
}

pub trait VideoEncoder: Send + Sync {
    fn encode_frame(
        &mut self,
        frame: CapturedFrame,
    ) -> Result<EncodedFrame, EncoderError>;

    fn set_bitrate(&mut self, bitrate: u32) -> Result<(), EncoderError>;

    fn request_keyframe(&mut self) -> Result<(), EncoderError>;
}

pub struct EncodedFrame {
    pub data: Bytes, // 零拷贝 Bytes 包装器
    pub is_keyframe: bool,
    pub timestamp: u64,
    pub sequence_number: u64,
}

3. WebRTC 传输层

职责:

  • WebRTC 对等连接管理
  • 媒体轨道(视频)和数据通道
  • RTP 打包
  • ICE/STUN/TURN 处理
  • 拥塞控制

库:

  • webrtc crate (webrtc-rs) 或自定义 WebRTC 实现
pub struct WebRtcTransport {
    peer_connection: RTCPeerConnection,
    video_track: RTCVideoTrack,
    data_channel: Option<RTCDataChannel>,
    config: WebRtcConfig,
}

pub struct WebRtcConfig {
    pub stun_servers: Vec<String>,
    pub turn_servers: Vec<TurnServer>,
    pub ice_transport_policy: IceTransportPolicy,
}

pub struct TurnServer {
    pub urls: Vec<String>,
    pub username: String,
    pub credential: String,
}

4. 零拷贝缓冲区管理器

职责:

  • 管理 DMA-BUF 生命周期
  • 预分配内存池
  • 通过 Rust 类型跟踪所有权
  • 与 PipeWire 内存池协调
pub struct BufferManager {
    dma_buf_pool: Pool<DmaBufHandle>,
    encoded_buffer_pool: Pool<Bytes>,
    max_buffers: usize,
}

impl BufferManager {
    pub fn acquire_dma_buf(&self) -> Option<DmaBufHandle> {
        self.dma_buf_pool.acquire()
    }

    pub fn release_dma_buf(&self, handle: DmaBufHandle) {
        self.dma_buf_pool.release(handle)
    }

    pub fn acquire_encoded_buffer(&self, size: usize) -> Option<Bytes> {
        self.encoded_buffer_pool.acquire_with_size(size)
    }
}

数据流

Wayland 合成器
        │
        │ DMA-BUF (GPU 内存)
        ▼
PipeWire Portal
        │
        │ DMA-BUF 文件描述符
        ▼
捕获管理器
        │
        │ CapturedFrame { dma_buf, ... }
        │ (零拷贝所有权转移)
        ▼
缓冲区管理器
        │
        │ DmaBufHandle (移动,非拷贝)
        ▼
编码器管道
        │
        │ EncodedFrame { data: Bytes, ... }
        │ (零拷贝 Bytes 包装器)
        ▼
WebRTC 传输层
        │
        │ RTP 数据包(引用 Bytes
        ▼
网络 (UDP/TCP)
        │
        ▼
客户端浏览器

技术栈

核心依赖

[dependencies]
# 异步运行时
tokio = { version = "1.35", features = ["full", "rt-multi-thread"] }
async-trait = "0.1"

# Wayland & PipeWire
wayland-client = "0.31"
wayland-protocols = "0.31"
pipewire = "0.8"
ashpd = "0.8"

# 视频编码(低延迟)
openh264 = { version = "0.6", optional = true }
x264 = { version = "0.4", optional = true }
nvenc = { version = "0.1", optional = true }
vpx = { version = "0.1", optional = true }

# 硬件加速(低延迟)
libva = { version = "0.14", optional = true }     # VA-API
nvidia-encode = { version = "0.5", optional = true }  # NVENC

# WebRTC低延迟配置
webrtc = "0.11"  # webrtc-rs

# 内存与零拷贝
bytes = "1.5"
memmap2 = "0.9"
shared_memory = "0.12"

# 无锁数据结构以最小化竞争
crossbeam = { version = "0.8", features = ["std"] }
crossbeam-channel = "0.5"
crossbeam-queue = "0.3"
parking_lot = "0.12"  # 更快的互斥锁

# 序列化
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

# 日志与跟踪
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.22"  # 用于延迟监控

# 指标与监控
prometheus = { version = "0.13", optional = true }
metrics = "0.21"

# 错误处理
anyhow = "1.0"
thiserror = "1.0"

# 工具
regex = "1.10"
uuid = { version = "1.6", features = ["v4", "serde", "fast-rng"] }
instant = "0.1"  # 高精度计时

[features]
default = ["software-encoder", "webrtc-rs"]

# 编码器选项
software-encoder = ["x264", "openh264"]
hardware-vaapi = ["libva"]
hardware-nvenc = ["nvidia-encode"]
all-encoders = ["software-encoder", "hardware-vaapi", "hardware-nvenc"]

# WebRTC 实现
webrtc-rs = ["webrtc"]
custom-webrtc = []

# 低延迟特性
low-latency = []
ultra-low-latency = ["low-latency", "all-encoders"]

# 监控
monitoring = ["prometheus", "tracing-opentelemetry"]

# 开发
dev = ["monitoring", "all-encoders"]

编码器选项

编码器 硬件 性能 质量 许可证 用例
H.264 (x264) CPU 中等 GPL 后备方案
H.264 (VA-API) GPU 中等 开源 Linux Intel/AMD
H.264 (NVENC) GPU (NVIDIA) 很高 专有 NVIDIA GPU
H.265 (HEVC) GPU 很高 混合 带宽受限
VP9 CPU/GPU 中等 BSD 开放网络
AV1 GPU 中等 很高 开源 面向未来

推荐首选: VA-API H.264 (Linux), NVENC H.264 (NVIDIA) 推荐后备: x264 H.264 (软件)

WebRTC 库

选项 1webrtc-rs(推荐)

  • 纯 Rust 实现
  • 活跃开发
  • 良好的 WebRTC 规范兼容性
  • 媒体零拷贝支持

选项 2自定义实现

  • 使用 webrtc crate 作为基础
  • 添加专门的零拷贝优化
  • 与编码器管道更紧密集成

核心组件设计

1. Wayland 屏幕捕获模块

// src/capture/mod.rs
use pipewire as pw;
use pipewire::properties;
use pipewire::spa::param::format::Format;
use pipewire::stream::StreamFlags;
use async_channel::{Sender, Receiver};

pub struct WaylandCapture {
    core: pw::Core,
    context: pw::Context,
    main_loop: pw::MainLoop,
    stream: pw::stream::Stream,
    frame_sender: Sender<CapturedFrame>,
    frame_receiver: Receiver<CapturedFrame>,
}

impl WaylandCapture {
    pub async fn new(config: CaptureConfig) -> Result<Self, CaptureError> {
        let main_loop = pw::MainLoop::new()?;
        let context = pw::Context::new(&main_loop)?;
        let core = context.connect(None)?;

        // 通过 xdg-desktop-portal 请求屏幕捕获
        let portal = Portal::new().await?;
        let session = portal.create_session(ScreenCaptureType::Monitor).await?;
        let sources = portal.request_sources(&session).await?;

        let (sender, receiver) = async_channel::bounded(30);

        Ok(Self {
            core,
            context,
            main_loop,
            stream: Self::create_stream(&context, &session, sender.clone())?,
            frame_sender: sender,
            frame_receiver: receiver,
        })
    }

    fn create_stream(
        context: &pw::Context,
        session: &Session,
        sender: Sender<CapturedFrame>,
    ) -> Result<pw::stream::Stream, pw::Error> {
        let mut stream = pw::stream::Stream::new(
            context,
            "wl-webrtc-capture",
            properties! {
                *pw::keys::MEDIA_TYPE => "Video",
                *pw::keys::MEDIA_CATEGORY => "Capture",
                *pw::keys::MEDIA_ROLE => "Screen",
            },
        )?;

        stream.connect(
            pw::spa::direction::Direction::Input,
            None,
            StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
        )?;

        // 设置新帧回调(零拷贝 DMA-BUF
        let listener = stream.add_local_listener()?;
        listener
            .register(pw::stream::events::Events::param_done, |data| {
                // 处理流参数变化
            })
            .register(pw::stream::events::Events::process, |data| {
                // 处理新帧 - DMA-BUF 已映射
                Self::process_frame(data, sender.clone());
            })?;

        Ok(stream)
    }

    fn process_frame(
        stream: &pw::stream::Stream,
        sender: Sender<CapturedFrame>,
    ) {
        // 获取缓冲区而不拷贝 - DMA-BUF 在 GPU 内存中
        let buffer = stream.dequeue_buffer().expect("no buffer");
        let datas = buffer.datas();
        let data = &datas[0];

        // 创建零拷贝帧
        let frame = CapturedFrame {
            dma_buf: DmaBufHandle::from_buffer(buffer),
            width: stream.format().unwrap().size().width,
            height: stream.format().unwrap().size().height,
            format: PixelFormat::from_spa_format(&stream.format().unwrap()),
            timestamp: timestamp_ns(),
        };

        // 发送帧(通过移动转移所有权)
        let _ = sender.try_send(frame);
    }

    pub async fn next_frame(&self) -> CapturedFrame {
        self.frame_receiver.recv().await.unwrap()
    }
}

// 零拷贝 DMA-BUF 句柄
pub struct DmaBufHandle {
    fd: RawFd,
    size: usize,
    stride: u32,
    offset: u32,
}

impl DmaBufHandle {
    pub fn from_buffer(buffer: &pw::buffer::Buffer) -> Self {
        let data = &buffer.datas()[0];
        Self {
            fd: data.fd().unwrap(),
            size: data.chunk().size() as usize,
            stride: data.chunk().stride(),
            offset: data.chunk().offset(),
        }
    }

    pub unsafe fn as_ptr(&self) -> *mut u8 {
        // 内存映射 DMA-BUF
        let ptr = libc::mmap(
            ptr::null_mut(),
            self.size,
            libc::PROT_READ,
            libc::MAP_SHARED,
            self.fd,
            self.offset as i64,
        );

        if ptr == libc::MAP_FAILED {
            panic!("Failed to mmap DMA-BUF");
        }

        ptr as *mut u8
    }
}

impl Drop for DmaBufHandle {
    fn drop(&mut self) {
        // 句柄删除时取消映射并关闭文件描述符
        unsafe {
            libc::munmap(ptr::null_mut(), self.size);
            libc::close(self.fd);
        }
    }
}

2. 帧缓冲区管理(零拷贝)

// src/buffer/mod.rs
use bytes::Bytes;
use std::sync::Arc;
use std::collections::VecDeque;

pub struct FrameBufferPool {
    dma_bufs: VecDeque<DmaBufHandle>,
    encoded_buffers: VecDeque<Bytes>,
    max_dma_bufs: usize,
    max_encoded: usize,
}

impl FrameBufferPool {
    pub fn new(max_dma_bufs: usize, max_encoded: usize) -> Self {
        Self {
            dma_bufs: VecDeque::with_capacity(max_dma_bufs),
            encoded_buffers: VecDeque::with_capacity(max_encoded),
            max_dma_bufs,
            max_encoded,
        }
    }

    pub fn acquire_dma_buf(&mut self) -> Option<DmaBufHandle> {
        self.dma_bufs.pop_front()
    }

    pub fn release_dma_buf(&mut self, buf: DmaBufHandle) {
        if self.dma_bufs.len() < self.max_dma_bufs {
            self.dma_bufs.push_back(buf);
        }
        // 否则:删除缓冲区,让操作系统回收 DMA-BUF
    }

    pub fn acquire_encoded_buffer(&mut self, size: usize) -> Bytes {
        // 尝试重用现有缓冲区
        if let Some(mut buf) = self.encoded_buffers.pop_front() {
            if buf.len() >= size {
                // 切片到请求的大小(零拷贝视图)
                return buf.split_to(size);
            }
        }

        // 需要时分配新缓冲区
        Bytes::from(vec![0u8; size])
    }

    pub fn release_encoded_buffer(&mut self, buf: Bytes) {
        if self.encoded_buffers.len() < self.max_encoded {
            self.encoded_buffers.push_back(buf);
        }
        // 否则:删除缓冲区,释放内存
    }
}

// 零拷贝帧包装器
pub struct ZeroCopyFrame {
    pub data: Bytes, // 引用计数,无拷贝
    pub metadata: FrameMetadata,
}

pub struct FrameMetadata {
    pub width: u32,
    pub height: u32,
    pub format: PixelFormat,
    pub timestamp: u64,
    pub is_keyframe: bool,
}

// DMA-BUF 智能指针
pub struct DmaBufPtr {
    ptr: *mut u8,
    len: usize,
    _marker: PhantomData<&'static mut [u8]>,
}

impl DmaBufPtr {
    pub unsafe fn new(ptr: *mut u8, len: usize) -> Self {
        Self {
            ptr,
            len,
            _marker: PhantomData,
        }
    }

    pub fn as_slice(&self) -> &[u8] {
        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
    }
}

unsafe impl Send for DmaBufPtr {}
unsafe impl Sync for DmaBufPtr {}

impl Drop for DmaBufPtr {
    fn drop(&mut self) {
        // 内存将由 DmaBufHandle 的 Drop 取消映射
    }
}

3. 视频编码器集成

// src/encoder/mod.rs
use async_trait::async_trait;

pub enum EncoderType {
    H264_VAAPI,
    H264_NVENC,
    H264_X264,
    VP9_VAAPI,
}

pub struct EncoderConfig {
    pub encoder_type: EncoderType,
    pub bitrate: u32,
    pub keyframe_interval: u32,
    pub preset: EncodePreset,
}

pub enum EncodePreset {
    Ultrafast,
    Superfast,
    Veryfast,
    Faster,
    Fast,
    Medium,
    Slow,
    Slower,
    Veryslow,
}

#[async_trait]
pub trait VideoEncoder: Send + Sync {
    async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError>;
    async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError>;
    async fn request_keyframe(&mut self) -> Result<(), EncoderError>;
}

pub struct VaapiEncoder {
    display: va::Display,
    context: va::Context,
    config: EncoderConfig,
    sequence_number: u64,
}

impl VaapiEncoder {
    pub fn new(config: EncoderConfig) -> Result<Self, EncoderError> {
        let display = va::Display::open(None)?;
        let context = va::Context::new(&display)?;

        Ok(Self {
            display,
            context,
            config,
            sequence_number: 0,
        })
    }
}

#[async_trait]
impl VideoEncoder for VaapiEncoder {
    async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
        // 零拷贝:直接将 DMA-BUF 导入到 VA-API 表面
        let surface = unsafe {
            self.context.import_dma_buf(
                frame.dma_buf.fd,
                frame.width,
                frame.height,
                frame.format.as_va_format(),
            )?
        };

        // 编码帧(硬件加速)
        let encoded_data = self.context.encode_surface(surface)?;

        // 创建零拷贝 Bytes 包装器
        let bytes = Bytes::from(encoded_data);

        self.sequence_number += 1;

        Ok(EncodedFrame {
            data: bytes,
            is_keyframe: surface.is_keyframe(),
            timestamp: frame.timestamp,
            sequence_number: self.sequence_number,
        })
    }

    async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
        self.config = config;
        self.context.set_bitrate(config.bitrate)?;
        self.context.set_preset(config.preset)?;
        Ok(())
    }

    async fn request_keyframe(&mut self) -> Result<(), EncoderError> {
        self.context.force_keyframe()?;
        Ok(())
    }
}

// 后备软件编码器
pub struct X264Encoder {
    encoder: x264::Encoder,
    config: EncoderConfig,
    sequence_number: u64,
}

impl X264Encoder {
    pub fn new(config: EncoderConfig) -> Result<Self, EncoderError> {
        let params = x264::Params::default();
        params.set_width(1920);
        params.set_height(1080);
        params.set_fps(60, 1);
        params.set_bitrate(config.bitrate);
        params.set_preset(config.preset);
        params.set_tune("zerolatency");

        let encoder = x264::Encoder::open(&params)?;

        Ok(Self {
            encoder,
            config,
            sequence_number: 0,
        })
    }
}

#[async_trait]
impl VideoEncoder for X264Encoder {
    async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
        // 将 DMA-BUF 映射到 CPU 内存(一次拷贝)
        let ptr = unsafe { frame.dma_buf.as_ptr() };
        let slice = unsafe { std::slice::from_raw_parts(ptr, frame.dma_buf.size) };

        // 如需要转换为 YUV
        let yuv_frame = self.convert_to_yuv(slice, frame.width, frame.height)?;

        // 编码帧
        let encoded_data = self.encoder.encode(&yuv_frame)?;

        self.sequence_number += 1;

        Ok(EncodedFrame {
            data: Bytes::from(encoded_data),
            is_keyframe: self.encoder.is_keyframe(),
            timestamp: frame.timestamp,
            sequence_number: self.sequence_number,
        })
    }

    async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
        self.config = config;
        // 使用新参数重新打开编码器
        Ok(())
    }

    async fn request_keyframe(&mut self) -> Result<(), EncoderError> {
        self.encoder.force_keyframe();
        Ok(())
    }
}

4. WebRTC 信令和数据传输

// src/webrtc/mod.rs
use webrtc::{
    api::APIBuilder,
    ice_transport::ice_server::RTCIceServer,
    media_track::{track_local::track_local_static_sample::TrackLocalStaticSample, TrackLocal},
    peer_connection::{
        configuration::RTCConfiguration,
        peer_connection_state::RTCPeerConnectionState,
        sdp::session_description::RTCSessionDescription,
        RTCPeerConnection,
    },
    rtp_transceiver::rtp_codec::RTCRtpCodecCapability,
};

pub struct WebRtcServer {
    api: webrtc::API,
    peer_connections: Arc<Mutex<HashMap<String, PeerConnection>>>,
    signaling_server: SignalingServer,
}

impl WebRtcServer {
    pub async fn new(config: WebRtcConfig) -> Result<Self, WebRtcError> {
        let mut api = APIBuilder::new().build();

        let signaling_server = SignalingServer::new(config.signaling_addr).await?;

        Ok(Self {
            api,
            peer_connections: Arc::new(Mutex::new(HashMap::new())),
            signaling_server,
        })
    }

    pub async fn create_peer_connection(
        &self,
        session_id: String,
        video_track: TrackLocalStaticSample,
    ) -> Result<String, WebRtcError> {
        let config = RTCConfiguration {
            ice_servers: vec![RTCIceServer {
                urls: self.signaling_server.stun_servers(),
                ..Default::default()
            }],
            ..Default::default()
        };

        let pc = self.api.new_peer_connection(config).await?;

        // 添加视频轨道
        let rtp_transceiver = pc
            .add_track(Arc::new(video_track))
            .await?;

        // 设置 ICE 候选处理器
        let peer_connections = self.peer_connections.clone();
        pc.on_ice_candidate(Box::new(move |candidate| {
            let peer_connections = peer_connections.clone();
            Box::pin(async move {
                if let Some(candidate) = candidate {
                    // 发送候选到信令服务器
                    // ...
                }
            })
        }))
        .await;

        // 存储对等连接
        self.peer_connections
            .lock()
            .await
            .insert(session_id.clone(), PeerConnection::new(pc));

        Ok(session_id)
    }

    pub async fn send_video_frame(
        &self,
        session_id: &str,
        frame: EncodedFrame,
    ) -> Result<(), WebRtcError> {
        let peer_connections = self.peer_connections.lock().await;

        if let Some(peer) = peer_connections.get(session_id) {
            peer.video_track.write_sample(&webrtc::media::Sample {
                data: frame.data.to_vec(),
                duration: std::time::Duration::from_nanos(frame.timestamp),
                ..Default::default()
            }).await?;
        }

        Ok(())
    }
}

pub struct PeerConnection {
    pc: RTCPeerConnection,
    video_track: Arc<TrackLocalStaticSample>,
    data_channel: Option<Arc<RTCDataChannel>>,
}

impl PeerConnection {
    pub async fn create_offer(&mut self) -> Result<RTCSessionDescription, WebRtcError> {
        let offer = self.pc.create_offer(None).await?;
        self.pc.set_local_description(offer.clone()).await?;
        Ok(offer)
    }

    pub async fn set_remote_description(
        &mut self,
        desc: RTCSessionDescription,
    ) -> Result<(), WebRtcError> {
        self.pc.set_remote_description(desc).await
    }

    pub async fn create_answer(&mut self) -> Result<RTCSessionDescription, WebRtcError> {
        let answer = self.pc.create_answer(None).await?;
        self.pc.set_local_description(answer.clone()).await?;
        Ok(answer)
    }
}

// 用于输入/控制的数据通道
pub struct DataChannelManager {
    channels: HashMap<String, Arc<RTCDataChannel>>,
}

impl DataChannelManager {
    pub async fn send_input(&self, channel_id: &str, input: InputEvent) -> Result<()> {
        if let Some(channel) = self.channels.get(channel_id) {
            let data = serde_json::to_vec(&input)?;
            channel.send(&data).await?;
        }
        Ok(())
    }

    pub fn on_input<F>(&mut self, channel_id: String, callback: F)
    where
        F: Fn(InputEvent) + Send + Sync + 'static,
    {
        if let Some(channel) = self.channels.get(&channel_id) {
            channel.on_message(Box::new(move |msg| {
                if let Ok(input) = serde_json::from_slice::<InputEvent>(&msg.data) {
                    callback(input);
                }
            })).unwrap();
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum InputEvent {
    MouseMove { x: f32, y: f32 },
    MouseClick { button: MouseButton },
    KeyPress { key: String },
    KeyRelease { key: String },
}

#[derive(Debug, Serialize, Deserialize)]
pub enum MouseButton {
    Left,
    Right,
    Middle,
}

5. IPC 层(可选)

// src/ipc/mod.rs
use tokio::net::UnixListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

pub struct IpcServer {
    listener: UnixListener,
}

impl IpcServer {
    pub async fn new(socket_path: &str) -> Result<Self> {
        // 删除现有套接字(如果存在)
        let _ = std::fs::remove_file(socket_path);

        let listener = UnixListener::bind(socket_path)?;

        Ok(Self { listener })
    }

    pub async fn run(&self, sender: async_channel::Sender<IpcMessage>) {
        loop {
            match self.listener.accept().await {
                Ok((mut stream, _)) => {
                    let sender = sender.clone();
                    tokio::spawn(async move {
                        let mut buf = [0; 1024];
                        loop {
                            match stream.read(&mut buf).await {
                                Ok(0) => break,
                                Ok(n) => {
                                    if let Ok(msg) = serde_json::from_slice::<IpcMessage>(&buf[..n]) {
                                        let _ = sender.send(msg).await;
                                    }
                                }
                                Err(_) => break,
                            }
                        }
                    });
                }
                Err(_) => continue,
            }
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum IpcMessage {
    StartCapture { session_id: String },
    StopCapture { session_id: String },
    SetQuality { level: QualityLevel },
    GetStatus,
}

数据流优化

零拷贝管道阶段

阶段 1捕获
  输入Wayland 合成器GPU 内存)
  输出DMA-BUF 文件描述符
  拷贝:无(零拷贝)

阶段 2缓冲区管理器
  输入DMA-BUF 文件描述符
  输出DmaBufHandleRAII 包装器)
  拷贝:无(零拷贝所有权转移)

阶段 3编码器
  输入DmaBufHandle
  输出Bytes引用计数
  拷贝DMA-BUF 直接导入 GPU 编码器)

阶段 4WebRTC
  输入Bytes
  输出RTP 数据包(引用 Bytes
  拷贝:无(零拷贝到套接字缓冲区)

阶段 5网络
  输入RTP 数据包
  输出UDP 数据报
  拷贝:最小(仅在内核空间)

内存所有权转移

// 示例:通过管道的所有权转移
async fn process_frame_pipeline(
    mut capture: WaylandCapture,
    mut encoder: VaapiEncoder,
    mut webrtc: WebRtcServer,
) -> Result<()> {
    loop {
        // 阶段 1捕获所有权从 PipeWire 转移到我们的代码)
        let frame = capture.next_frame().await; // CapturedFrame 拥有 DmaBufHandle

        // 阶段 2编码所有权移动非拷贝
        let encoded = encoder.encode(frame).await?; // EncodedFrame 拥有 Bytes

        // 阶段 3发送Bytes 是引用计数的,无拷贝)
        webrtc.send_video_frame("session-123", encoded).await?;

        // 所有权全程转移,无拷贝
    }
}

缓冲区共享机制

1. DMA-BUF主要

  • GPU 内存缓冲区
  • 导出为文件描述符
  • 零拷贝到硬件编码器
  • 限于同一 GPU/驱动
pub fn export_dma_buf(surface: &va::Surface) -> Result<DmaBufHandle> {
    let fd = surface.export_dma_buf()?;
    Ok(DmaBufHandle {
        fd,
        size: surface.size(),
        stride: surface.stride(),
        offset: 0,
    })
}

2. 共享内存(后备)

  • POSIX 共享内存shm_open
  • 用于软件编码路径
  • 从 DMA-BUF 拷贝到共享内存
pub fn create_shared_buffer(size: usize) -> Result<SharedBuffer> {
    let name = format!("/wl-webrtc-{}", uuid::Uuid::new_v4());
    let fd = shm_open(&name, O_CREAT | O_RDWR, 0666)?;
    ftruncate(fd, size as i64)?;

    let ptr = unsafe { mmap(ptr::null_mut(), size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0) };

    Ok(SharedBuffer {
        ptr,
        size,
        fd,
        name,
    })
}

3. 内存映射文件(替代方案)

  • 用于持久缓存
  • 进程间通信
  • 用于帧缓冲
pub struct MappedFile {
    file: File,
    ptr: *mut u8,
    size: usize,
}

impl MappedFile {
    pub fn new(path: &Path, size: usize) -> Result<Self> {
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(path)?;

        file.set_len(size as u64)?;

        let ptr = unsafe {
            mmap(
                ptr::null_mut(),
                size,
                PROT_READ | PROT_WRITE,
                MAP_SHARED,
                file.as_raw_fd(),
                0,
            )
        }?;

        Ok(Self { file, ptr, size })
    }
}

管道优化策略

1. 并行编码

// 为不同质量级别并行运行多个编码器
pub struct AdaptiveEncoder {
    encoders: Vec<Box<dyn VideoEncoder>>,
    active_encoder: usize,
    bandwidth_monitor: BandwidthMonitor,
}

impl AdaptiveEncoder {
    pub async fn encode_adaptive(&mut self, frame: CapturedFrame) -> Result<EncodedFrame> {
        let bandwidth = self.bandwidth_monitor.current_bandwidth();

        // 根据带宽切换编码器
        let new_encoder = match bandwidth {
            b if b < 500_000 => 0,  // 低比特率
            b if b < 2_000_000 => 1, // 中比特率
            _ => 2,                  // 高比特率
        };

        if new_encoder != self.active_encoder {
            self.active_encoder = new_encoder;
        }

        self.encoders[self.active_encoder].encode(frame).await
    }
}

2. 帧跳过

pub struct FrameSkipper {
    target_fps: u32,
    last_frame_time: Instant,
    skip_count: u32,
}

impl FrameSkipper {
    pub fn should_skip(&mut self) -> bool {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_frame_time).as_millis();
        let frame_interval = 1000 / self.target_fps as u128;

        if elapsed < frame_interval {
            self.skip_count += 1;
            return true;
        }

        self.last_frame_time = now;
        self.skip_count = 0;
        false
    }
}

3. 感兴趣区域ROI

pub struct RegionEncoder {
    full_encoder: Box<dyn VideoEncoder>,
    roi_encoder: Box<dyn VideoEncoder>,
    current_region: Option<ScreenRegion>,
}

impl RegionEncoder {
    pub async fn encode_roi(
        &mut self,
        frame: CapturedFrame,
        roi: Option<ScreenRegion>,
    ) -> Result<EncodedFrame> {
        if let Some(region) = roi {
            // 以更高质量仅编码 ROI
            let cropped = self.crop_frame(frame, region)?;
            self.roi_encoder.encode(cropped).await
        } else {
            // 编码完整帧
            self.full_encoder.encode(frame).await
        }
    }

    fn crop_frame(&self, mut frame: CapturedFrame, region: ScreenRegion) -> Result<CapturedFrame> {
        // 调整区域 DMA-BUF 偏移量
        frame.width = region.width;
        frame.height = region.height;
        Ok(frame)
    }
}

低延迟优化

设计理念

为了在本地网络实现 15-25ms 延迟,我们优先考虑:

  1. 速度优先于完整性:快速、低延迟交付比完美可靠性更重要
  2. 最小化缓冲:每个阶段使用小缓冲区
  3. 全程零拷贝:消除 CPU 内存拷贝
  4. 硬件加速:所有密集型操作使用 GPU
  5. 预测性时序:通过准确计时减少等待时间

1. 编码器优化

硬件编码器配置

pub struct LowLatencyEncoderConfig {
    // 编解码器设置
    pub codec: VideoCodec,

    // 低延迟特定设置
    pub gop_size: u32,           // 小 GOP8-15 帧
    pub b_frames: u32,           // 零 B 帧以最小延迟
    pub max_b_frames: u32,       // 低延迟始终为 0
    pub lookahead: u32,           // 最小前瞻0-2 帧

    // 码率控制
    pub rc_mode: RateControlMode, // CBR 或带严格约束的 VBR
    pub bitrate: u32,            // 自适应比特率
    pub max_bitrate: u32,        // 紧的最大约束
    pub min_bitrate: u32,
    pub vbv_buffer_size: u32,    // 非常小的 VBV 缓冲区
    pub vbv_max_rate: u32,       // 接近比特率

    // 时序
    pub fps: u32,                // 目标 FPS30-60
    pub intra_period: u32,       // 关键帧间隔

    // 质量与延迟权衡
    pub preset: EncoderPreset,   // Ultrafast/Fast
    pub tune: EncoderTune,       // zerolatency
    pub quality: u8,             // 恒定质量CRF或 CQ
}

pub enum VideoCodec {
    H264,    // 最佳兼容性,良好延迟
    H265,    // 更好压缩,稍高延迟
    VP9,     // 开放替代方案
}

pub enum RateControlMode {
    CBR,     // 恒定比特率 - 可预测
    VBR,     // 可变比特率 - 更好质量
    CQP,     // 恒定量化 - 最低延迟
}

pub enum EncoderPreset {
    Ultrafast,  // 最低延迟,较低质量
    Superfast,
    Veryfast,   // 推荐 15-25ms
    Faster,
}

pub enum EncoderTune {
    Zerolatency, // 低延迟必需
    Film,
    Animation,
}

推荐编码器设置

VA-API (Intel/AMD) - 用于 15-25ms 延迟:

// libva 特定的低延迟设置
VAConfigAttrib attribs[] = {
    {VAConfigAttribRTFormat, VA_RT_FORMAT_YUV420},
    {VAConfigAttribRateControl, VA_RC_CBR},
    {VAConfigAttribEncMaxRefFrames, {1, 0}},  // 最小参考帧
    {VAConfigAttribEncPackedHeaders, VA_ENC_PACKED_HEADER_SEQUENCE},
};

VAEncSequenceParameterBufferH264 seq_param = {
    .intra_period = 15,           // 短 GOP
    .ip_period = 1,               // 无 B 帧
    .bits_per_second = 4000000,
    .max_num_ref_frames = 1,     // 最小参考
    .time_scale = 90000,
    .num_units_in_tick = 1500,   // 60 FPS
};

VAEncPictureParameterBufferH264 pic_param = {
    .reference_frames = {
        {0, VA_FRAME_PICTURE},    // 单一参考
    },
    .num_ref_idx_l0_active_minus1 = 0,
    .num_ref_idx_l1_active_minus1 = 0,
    .pic_fields.bits.idr_pic_flag = 0,
    .pic_fields.bits.reference_pic_flag = 1,
};

VAEncSliceParameterBufferH264 slice_param = {
    .num_ref_idx_l0_active_minus1 = 0,
    .num_ref_idx_l1_active_minus1 = 0,
    .disable_deblocking_filter_idc = 1,  // 更快
};

NVENC (NVIDIA) - 用于 15-25ms 延迟:

// NVENC 低延迟配置
let mut create_params = NV_ENC_INITIALIZE_PARAMS::default();
create_params.encodeGUID = NV_ENC_CODEC_H264_GUID;
create_params.presetGUID = NV_ENC_PRESET_P4_GUID;  // 低延迟

let mut config = NV_ENC_CONFIG::default();
config.profileGUID = NV_ENC_H264_PROFILE_BASELINE_GUID;  // 更快编码
config.rcParams.rateControlMode = NV_ENC_PARAMS_RC_CBR;
config.rcParams.averageBitRate = 4000000;
config.rcParams.maxBitRate = 4000000;
config.rcParams.vbvBufferSize = 4000000;  // 1 秒缓冲区
config.rcParams.vbvInitialDelay = 0;      // 最小延迟

let mut h264_config = unsafe { config.encodeCodecConfig.h264Config };
h264_config.enableIntraRefresh = 1;
h264_config.idrPeriod = 30;                // 每 30 帧关键帧
h264_config.repeatSPSPPS = 1;
h264_config.enableConstrainedEncoding = 1;
h264_config.frameNumD = 0;
h264_config.sliceMode = NV_ENC_SLICE_MODE_AUTOSELECT;

// 低延迟特定
h264_config.maxNumRefFrames = 1;          // 最小参考
h264_config.idrPeriod = 15;               // 更短 GOP

x264 (软件) - 用于 50-100ms 延迟:

// x264 低延迟参数
let param = x264_param_t {
    i_width: 1920,
    i_height: 1080,
    i_fps_num: 60,
    i_fps_den: 1,

    // 码率控制
    i_bitrate: 4000,                      // 4 Mbps
    i_keyint_max: 15,                     // 短 GOP
    b_intra_refresh: 1,

    // 低延迟
    b_repeat_headers: 1,
    b_annexb: 1,
    i_scenecut_threshold = 0,             // 禁用场景检测

    // 无 B 帧以降低延迟
    i_bframe: 0,
    i_bframe_adaptive: 0,
    i_bframe_pyramid: 0,

    // 参考
    i_frame_reference: 1,                 // 最小参考

    // 预设ultrafast 或 superfast
    // 通过预设函数设置
};

// 应用预设
x264_param_apply_preset(&param, "superfast");
x264_param_apply_tune(&param, "zerolatency");

动态比特率与延迟权衡

pub struct AdaptiveBitrateController {
    target_latency_ms: u32,
    current_bitrate: u32,
    frame_rate: u32,
    network_quality: NetworkQuality,
    buffer_depth_ms: u32,
}

pub struct NetworkQuality {
    bandwidth_mbps: f64,
    latency_ms: u32,
    packet_loss_rate: f64,
    jitter_ms: u32,
}

impl AdaptiveBitrateController {
    pub fn update_target_bitrate(&mut self, measured_latency_ms: u32) -> u32 {
        let latency_ratio = measured_latency_ms as f64 / self.target_latency_ms as f64;

        if latency_ratio > 1.5 {
            // 延迟过高 - 激进降低比特率
            self.current_bitrate = (self.current_bitrate as f64 * 0.7) as u32;
        } else if latency_ratio > 1.2 {
            // 中等偏高 - 降低比特率
            self.current_bitrate = (self.current_bitrate as f64 * 0.85) as u32;
        } else if latency_ratio < 0.8 {
            // 可以增加比特率
            self.current_bitrate = (self.current_bitrate as f64 * 1.1) as u32;
        }

        // 限制在合理范围内
        self.current_bitrate = self.current_bitrate.clamp(1000000, 8000000);

        self.current_bitrate
    }
}

2. 捕获优化

PipeWire DMA-BUF 零拷贝

pub struct LowLatencyCaptureConfig {
    pub frame_rate: u32,           // 30-60 FPS
    pub zero_copy: bool,           // 始终为 true
    pub track_damage: bool,        // 启用损坏跟踪
    pub partial_updates: bool,     // 仅编码损坏区域
    pub buffer_pool_size: usize,   // 小池3-5 个缓冲区
}

pub struct DamageTracker {
    damaged_regions: VecDeque<ScreenRegion>,
    last_frame: Option<DmaBufHandle>,
    threshold: u32,                // 要编码的最小变化大小
}

impl DamageTracker {
    pub fn update(&mut self, new_frame: &CapturedFrame) -> Vec<ScreenRegion> {
        match &self.last_frame {
            Some(last) => {
                let regions = self.compute_damage_regions(last, new_frame);
                self.last_frame = Some(new_frame.dma_buf.clone());
                regions
            }
            None => {
                self.last_frame = Some(new_frame.dma_buf.clone());
                vec![ScreenRegion {
                    x: 0,
                    y: 0,
                    width: new_frame.width,
                    height: new_frame.height,
                }]
            }
        }
    }

    fn compute_damage_regions(&self, last: &DmaBufHandle, new: &CapturedFrame) -> Vec<ScreenRegion> {
        // 比较帧并查找变化的区域
        // 可以通过 GPU 高效完成
        // 对于 MVP可以使用简单的基于块的比较

        // 比较的块大小(例如 16x16 像素)
        let block_size = 16;
        let blocks_x = (new.width as usize + block_size - 1) / block_size;
        let blocks_y = (new.height as usize + block_size - 1) / block_size;

        // 将相邻的损坏块合并为区域
        // ...

        vec![]  // 占位符
    }
}

部分区域编码

pub struct RegionEncoder {
    full_encoder: Box<dyn VideoEncoder>,
    tile_encoder: Box<dyn VideoEncoder>,
    current_regions: Vec<ScreenRegion>,
}

impl RegionEncoder {
    pub async fn encode_with_regions(
        &mut self,
        frame: CapturedFrame,
        regions: Vec<ScreenRegion>,
    ) -> Result<Vec<EncodedTile>> {
        let mut encoded_tiles = Vec::new();

        if regions.is_empty() || regions.len() > 4 {
            // 区域太多或无变化 - 编码完整帧
            let encoded = self.full_encoder.encode(frame).await?;
            encoded_tiles.push(EncodedTile {
                region: ScreenRegion {
                    x: 0,
                    y: 0,
                    width: frame.width,
                    height: frame.height,
                },
                data: encoded.data,
                is_keyframe: encoded.is_keyframe,
            });
        } else {
            // 分别编码每个损坏区域
            for region in regions {
                let cropped = self.crop_frame(&frame, &region)?;
                let encoded = self.tile_encoder.encode(cropped).await?;

                encoded_tiles.push(EncodedTile {
                    region,
                    data: encoded.data,
                    is_keyframe: encoded.is_keyframe,
                });
            }
        }

        Ok(encoded_tiles)
    }

    fn crop_frame(&self, frame: &CapturedFrame, region: &ScreenRegion) -> Result<CapturedFrame> {
        // 调整区域 DMA-BUF 偏移量
        // 这是零拷贝操作 - 仅元数据更改

        Ok(CapturedFrame {
            dma_buf: DmaBufHandle::from_region(&frame.dma_buf, region)?,
            width: region.width,
            height: region.height,
            format: frame.format,
            timestamp: frame.timestamp,
        })
    }
}

3. WebRTC 传输层优化

低延迟 WebRTC 配置

pub struct LowLatencyWebRtcConfig {
    // ICE 和传输
    pub ice_transport_policy: IceTransportPolicy,
    pub ice_servers: Vec<IceServer>,

    // 媒体设置
    pub video_codecs: Vec<VideoCodecConfig>,
    pub max_bitrate: u32,
    pub min_bitrate: u32,
    pub start_bitrate: u32,

    // 缓冲 - 最小化以降低延迟
    pub playout_delay_min_ms: u16,    // 0-10ms默认 50ms
    pub playout_delay_max_ms: u16,    // 10-20ms默认 200ms

    // 打包
    pub rtp_payload_size: u16,        // 更小数据包1200 字节
    pub packetization_mode: PacketizationMode,

    // 反馈和重传
    pub nack_enabled: bool,           // 限制 NACK
    pub fec_enabled: bool,            // 禁用 FEC 以降低延迟
    pub transport_cc_enabled: bool,   // 拥塞控制

    // RTCP 设置
    pub rtcp_report_interval_ms: u32,  // 频繁50-100ms
}

pub struct VideoCodecConfig {
    pub name: String,
    pub clock_rate: u32,
    pub num_channels: u16,
    pub parameters: CodecParameters,
}

impl LowLatencyWebRtcConfig {
    pub fn for_ultra_low_latency() -> Self {
        Self {
            ice_transport_policy: IceTransportPolicy::All,
            ice_servers: vec![],

            video_codecs: vec![
                VideoCodecConfig {
                    name: "H264".to_string(),
                    clock_rate: 90000,
                    num_channels: 1,
                    parameters: CodecParameters {
                        profile_level_id: "42e01f".to_string(),  // 基线配置
                        packetization_mode: 1,
                        level_asymmetry_allowed: 1,
                    },
                },
            ],

            max_bitrate: 8000000,      // 8 Mbps 最大
            min_bitrate: 500000,       // 500 Kbps 最小
            start_bitrate: 4000000,    // 4 Mbps 起始

            // 关键:最小播放延迟
            playout_delay_min_ms: 0,   // 无最小
            playout_delay_max_ms: 20,  // 20ms 最大

            // 更小的数据包以降低序列化延迟
            rtp_payload_size: 1200,
            packetization_mode: PacketizationMode::NonInterleaved,

            // 有限重传
            nack_enabled: true,        // 但限制重传窗口
            fec_enabled: false,        // 禁用 FEC - 增加延迟
            transport_cc_enabled: true,

            // 更频繁的 RTCP 反馈
            rtcp_report_interval_ms: 50,
        }
    }
}

丢包处理策略

pub enum LossHandlingStrategy {
    PreferLatency,    // 丢弃后期帧,优先低延迟
    PreferQuality,     // 重传,优先质量
    Balanced,          // 基于网络条件自适应
}

pub struct PacketLossHandler {
    strategy: LossHandlingStrategy,
    max_retransmission_delay_ms: u32,
    nack_window_size: u32,
}

impl PacketLossHandler {
    pub fn handle_packet_loss(
        &mut self,
        sequence_number: u16,
        now_ms: u64,
    ) -> RetransmissionDecision {
        match self.strategy {
            LossHandlingStrategy::PreferLatency => {
                // 如果太旧则不重传
                if now_ms > self.max_retransmission_delay_ms as u64 {
                    RetransmissionDecision::Drop
                } else {
                    RetransmissionDecision::None
                }
            }
            LossHandlingStrategy::PreferQuality => {
                // 始终尝试重传
                RetransmissionDecision::Request(sequence_number)
            }
            LossHandlingStrategy::Balanced => {
                // 基于丢包率自适应
                RetransmissionDecision::None  // 占位符
            }
        }
    }
}

pub enum RetransmissionDecision {
    Request(u16),
    Drop,
    None,
}

NACK 与 FEC 选择

15-25ms 延迟的建议:

  • 主要:有限 NACK

    • NACK 窗口1-2 帧60fps 时 16-33ms
    • 最大重传延迟20ms
    • 仅重传关键帧或关键数据包
  • 避免 FEC

    • 前向纠错增加显著延迟
    • 在低丢包 LAN 中FEC 开销超过收益
    • 改为选择性地使用 NACK
pub struct NackController {
    window_size_ms: u32,           // 20ms 窗口
    max_nack_packets_per_second: u32,
    nack_list: VecDeque<(u16, u64)>, // (序列号时间戳_ms)
}

impl NackController {
    pub fn should_send_nack(&self, seq_num: u16, now_ms: u64) -> bool {
        // 检查数据包是否在 NACK 窗口内
        if let Some(&(_, oldest_ts)) = self.nack_list.front() {
            if now_ms - oldest_ts > self.window_size_ms as u64 {
                return false;  // 太旧
            }
        }

        true
    }
}

4. 帧率和缓冲策略

动态帧率调整

pub struct FrameRateController {
    target_fps: u32,              // 期望 FPS (30-60)
    current_fps: u32,
    frame_times: VecDeque<Instant>,
    last_frame_time: Instant,
    min_interval: Duration,       // 1 / max_fps
}

impl FrameRateController {
    pub fn new(target_fps: u32) -> Self {
        let min_interval = Duration::from_micros(1_000_000 / 60);  // 最大 60 FPS

        Self {
            target_fps,
            current_fps: 30,
            frame_times: VecDeque::with_capacity(60),
            last_frame_time: Instant::now(),
            min_interval,
        }
    }

    pub fn should_capture(&mut self) -> bool {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_frame_time);

        if elapsed < self.min_interval {
            return false;  // 太快
        }

        // 根据条件更新帧率
        self.adjust_fps_based_on_conditions();

        self.last_frame_time = now;
        true
    }

    pub fn adjust_fps_based_on_conditions(&mut self) {
        // 检查系统负载、网络条件等
        let system_load = self.get_system_load();
        let network_quality = self.get_network_quality();

        if system_load > 0.8 || network_quality.is_poor() {
            self.current_fps = 30;  // 降低帧率
        } else if system_load < 0.5 && network_quality.is_excellent() {
            self.current_fps = 60;  // 提高帧率
        } else {
            self.current_fps = 45;  // 平衡
        }
    }
}

快速帧丢弃策略

pub struct FrameDropper {
    target_fps: u32,
    adaptive_drop_threshold_ms: u32,
    consecutive_drops: u32,
    max_consecutive_drops: u32,
}

impl FrameDropper {
    pub fn should_drop(&mut self, queue_latency_ms: u32) -> bool {
        if queue_latency_ms > self.adaptive_drop_threshold_ms {
            if self.consecutive_drops < self.max_consecutive_drops {
                self.consecutive_drops += 1;
                return true;
            }
        }

        self.consecutive_drops = 0;
        false
    }

    pub fn get_drop_interval(&self) -> u32 {
        // 计算要丢弃的帧数
        match self.target_fps {
            60 => 1,   // 每 2 帧丢弃 1 帧
            30 => 1,   // 每 2 帧丢弃 1 帧
            _ => 0,
        }
    }
}

最小缓冲

发送端:

pub struct SenderBuffer {
    max_size_frames: usize,        // 非常小1-2 帧
    queue: VecDeque<EncodedFrame>,
    target_latency_ms: u32,
}

impl SenderBuffer {
    pub fn new() -> Self {
        Self {
            max_size_frames: 1,      // 单帧缓冲
            queue: VecDeque::with_capacity(2),
            target_latency_ms: 5,    // 5ms 目标
        }
    }

    pub fn push(&mut self, frame: EncodedFrame) -> Result<()> {
        if self.queue.len() >= self.max_size_frames {
            // 丢弃最旧的帧以保持低延迟
            self.queue.pop_front();
        }

        self.queue.push_back(frame);
        Ok(())
    }

    pub fn pop(&mut self) -> Option<EncodedFrame> {
        self.queue.pop_front()
    }
}

接收端(抖动缓冲):

pub struct MinimalJitterBuffer {
    target_delay_ms: u32,           // 0-10ms
    min_delay_ms: u32,             // 0ms
    max_delay_ms: u32,             // 10-20ms
    packets: VecDeque<RtpPacket>,
}

impl MinimalJitterBuffer {
    pub fn new() -> Self {
        Self {
            target_delay_ms: 5,     // 5ms 目标
            min_delay_ms: 0,        // 无最小
            max_delay_ms: 10,      // 10ms 最大
            packets: VecDeque::with_capacity(10),
        }
    }

    pub fn push(&mut self, packet: RtpPacket) {
        if self.packets.len() < self.max_delay_ms as usize / 2 {
            self.packets.push_back(packet);
        } else {
            // 缓冲区满 - 丢弃最旧的
            self.packets.pop_front();
            self.packets.push_back(packet);
        }
    }

    pub fn pop(&mut self) -> Option<RtpPacket> {
        self.packets.pop_front()
    }
}

5. 架构调整

单线程 vs 多线程

建议:混合方法

  • 捕获线程PipeWire 专用线程
  • 编码器线程:每会话编码器线程
  • 网络线程WebRTC 传输线程
  • 协调:用于数据传递的无锁通道
pub struct PipelineArchitecture {
    capture_thread: JoinHandle<()>,
    encoder_threads: Vec<JoinHandle<()>>,
    network_thread: JoinHandle<()>,

    // 无锁通信
    capture_to_encoder: async_channel::Sender<CapturedFrame>,
    encoder_to_network: async_channel::Sender<EncodedFrame>,
}

锁竞争最小化

// 尽可能使用无锁数据结构
use crossbeam::queue::SegQueue;
use crossbeam::channel::{bounded, unbounded};

pub struct LockFreeFrameQueue {
    queue: SegQueue<CapturedFrame>,
    max_size: usize,
}

impl LockFreeFrameQueue {
    pub fn push(&self, frame: CapturedFrame) -> Result<()> {
        if self.queue.len() >= self.max_size {
            return Err(Error::QueueFull);
        }
        self.queue.push(frame);
        Ok(())
    }

    pub fn pop(&self) -> Option<CapturedFrame> {
        self.queue.pop()
    }
}

异步任务调度

pub struct LowLatencyScheduler {
    capture_priority: TaskPriority,
    encode_priority: TaskPriority,
    network_priority: TaskPriority,
}

impl LowLatencyScheduler {
    pub async fn schedule_pipeline(&self) {
        tokio::spawn_with_priority(TaskPriority::High, async move {
            // 关键路径:捕获 -> 编码 -> 发送
        });

        tokio::spawn_with_priority(TaskPriority::Medium, async move {
            // 后台任务:统计、日志记录
        });
    }
}

6. 技术栈调整

延迟的编码器选择

编码器 设置延迟 每帧延迟 质量 建议
VA-API H.264 1-2ms 2-3ms 中等 主要Linux
NVENC H.264 1-2ms 1-2ms 主要NVIDIA
x264 (ultrafast) 0ms 5-8ms 后备
x264 (superfast) 0ms 8-12ms 中等 后备

建议:

  • 主要:使用 ultrafast 预设的 VA-API 或 NVENC H.264
  • 后备:使用 ultrafast 预设的 x264接受 30-50ms 延迟)

直接 Wayland vs PipeWire

使用 PipeWire推荐

  • 更好的 DMA-BUF 支持
  • 硬件加速集成
  • 通过生态系统的零拷贝

直接 Wayland如果需要

  • 更底层的控制
  • 可能更低的捕获延迟0.5-1ms
  • 更复杂的实现
  • 无门户集成(安全问题)

建议: MVP 坚持使用 PipeWire。仅当 PipeWire 延迟不可接受时才考虑直接 Wayland。

webrtc-rs 延迟特性

优点:

  • 纯 Rust行为可预测
  • 良好的零拷贝支持
  • 可自定义缓冲

缺点:

  • 可能具有为可靠性优化的默认缓冲设置
  • 需要手动配置以实现超低延迟

自定义 WebRTC 层(高级):

  • 对缓冲和时序的完全控制
  • 可以内联打包
  • 更复杂的实现

建议: 使用 webrtc-rs 配合低延迟配置。仅当 webrtc-rs 无法达到目标时才考虑自定义层。

7. 实施优先级

P0MVP 必须具备)

  1. 硬件编码器集成

    • 带低延迟设置的 VA-API H.264
    • 无 B 帧,小 GOP15 帧)
    • Ultrafast 预设
  2. DMA-BUF 零拷贝

    • PipeWire DMA-BUF 导入
    • 直接编码器输入
    • 无 CPU 拷贝
  3. 最小缓冲

    • 单帧发送缓冲
    • 0-5ms 抖动缓冲
    • 快速帧丢弃
  4. 低延迟 WebRTC 配置

    • playout_delay_min: 0ms
    • playout_delay_max: 20ms
    • 禁用 FEC

P115-25ms 必需)

  1. 损坏跟踪

    • 部分区域更新
    • 降低编码负载
  2. 动态帧率

    • 30-60 FPS 自适应
    • 网络感知
  3. NACK 控制

    • 有限重传窗口20ms
    • 选择性 NACK

P2最好具备

  1. 直接 Wayland 捕获

    • 如果 PipeWire 延迟不足
  2. 自定义 WebRTC 层

    • 如果 webrtc-rs 不足
  3. 高级拥塞控制

    • SCReAM 或 Google 拥塞控制

8. 测试和验证

端到端延迟测量

pub struct LatencyMeter {
    timestamps: VecDeque<(u64, LatencyStage)>,
}

pub enum LatencyStage {
    Capture,
    EncodeStart,
    EncodeEnd,
    Packetize,
    NetworkSend,
    NetworkReceive,
    Depacketize,
    DecodeStart,
    DecodeEnd,
    Display,
}

impl LatencyMeter {
    pub fn mark(&mut self, stage: LatencyStage) {
        let now = timestamp_ns();
        self.timestamps.push_back((now, stage));
    }

    pub fn calculate_total_latency(&self) -> Duration {
        if self.timestamps.len() < 2 {
            return Duration::ZERO;
        }

        let first = self.timestamps.front().unwrap().0;
        let last = self.timestamps.back().unwrap().0;
        Duration::from_nanos(last - first)
    }
}

测量方法:

  1. 时间戳注入

    • 在捕获时注入帧 ID屏幕上的可见时间戳
    • 用摄像头在客户端捕获
    • 比较时间戳以计算往返时间
    • 除以 2 得到单向延迟
  2. 网络时间戳

    • 在 RTP 头扩展中添加帧捕获时间
    • 在接收器测量
    • 考虑时钟偏移
  3. 硬件时间戳

    • 使用内核数据包时间戳SO_TIMESTAMPING
    • 如果可用,使用硬件 NIC 时间戳

性能基准测试

#[bench]
fn bench_full_pipeline_latency(b: &mut Bencher) {
    let mut pipeline = LowLatencyPipeline::new(config).unwrap();
    let mut latencies = Vec::new();

    b.iter(|| {
        let start = Instant::now();

        let frame = pipeline.capture().unwrap();
        let encoded = pipeline.encode(frame).unwrap();
        pipeline.send(encoded).unwrap();

        latencies.push(start.elapsed());
    });

    let avg_latency = latencies.iter().sum::<Duration>() / latencies.len() as u32;
    println!("Average latency: {:?}", avg_latency);
}

目标基准:

指标 目标 可接受
捕获延迟 2-3ms <5ms
编码延迟 3-5ms <8ms
打包延迟 1-2ms <3ms
网络 (LAN) 0.5-1ms <2ms
解码延迟 1-2ms <4ms
显示延迟 1-2ms <4ms
总计 15-25ms <30ms

调优策略

  1. 基线测量

    • 单独测量每个阶段
    • 识别瓶颈
  2. 迭代调优

    • 一次调整一个参数
    • 测量对总延迟的影响
    • 如需要,权衡质量
  3. 验证

    • 在各种网络条件下测试
    • 在系统负载下测试
    • 使用不同内容测试(静态、动态)
  4. 持续监控

    • 在生产中跟踪延迟
    • 对退化发出警报
    • 自适应调整

实施路线图(低延迟更新)

第 1 阶段MVP最小可行产品- 4-6 周

目标: 基本屏幕捕获和 WebRTC 流媒体

第 1-2 周:核心基础设施

  • 项目设置Cargo.toml、目录结构
  • Tokio 异步运行时设置
  • 错误处理框架anyhow/thiserror
  • 日志设置tracing
  • 配置管理

第 2-3 周Wayland 捕获

  • PipeWire xdg-desktop-portal 集成
  • 基本屏幕捕获(单显示器)
  • DMA-BUF 导入/导出
  • 帧接收通道

第 3-4 周:简单编码

  • x264 软件编码器(后备)
  • 基本帧管道(捕获 → 编码)
  • 帧率限制

第 4-5 周WebRTC 传输

  • webrtc-rs 集成
  • 基本对等连接
  • 视频轨道设置
  • 简单信令WebSocket

第 5-6 周:测试与集成

  • 端到端测试Wayland → WebRTC → 浏览器)
  • 性能基准测试
  • 错误修复

MVP 交付成果:

  • 工作屏幕捕获
  • WebRTC 流媒体到浏览器
  • 720p 15-30 FPS
  • x264 编码(软件)

第 2 阶段:硬件加速 - 3-4 周

目标: GPU 加速编码以获得更好性能

第 1-2 周VA-API 集成

  • VA-API 编码器实现
  • DMA-BUF 到 VA-API 表面导入
  • H.264 编码
  • Intel/AMD GPU 支持

第 2-3 周NVENC 集成

  • NVIDIA GPU 的 NVENC 编码器
  • CUDA 内存管理
  • NVENC H.264 编码

第 3-4 周:编码器选择

  • 编码器检测和选择
  • 后备链NVENC → VA-API → x264
  • 运行时编码器切换

第 2 阶段交付成果:

  • GPU 加速编码
  • 1080p 30-60 FPS
  • 更低的 CPU 使用率
  • 自适应编码器选择

第 3 阶段:低延迟优化 - 4-5 周

目标: 在本地网络实现 25-35ms 延迟

第 1 周编码器低延迟配置P0

  • 配置 VA-API/NVENC 以 <5ms 编码
  • 禁用 B 帧,设置 GOP 为 15 帧
  • 使用小 VBV 缓冲区实现 CBR 码率控制
  • 调整编码器预设ultrafast/superfast
  • 独立测量编码器延迟

第 2 周最小缓冲P0

  • 将发送缓冲减少到 1 帧
  • 实现 0-10ms 抖动缓冲
  • 配置 WebRTC 播放延迟0-20ms
  • 禁用 FEC 以降低延迟
  • 测试端到端延迟

第 3 周损坏跟踪与部分更新P1

  • 实现区域变化检测
  • 添加部分区域编码
  • 针对静态内容优化
  • 基准测试延迟改进

第 4 周动态帧率与质量P1

  • 实现自适应帧率30-60fps
  • 网络质量检测
  • 动态比特率与延迟权衡
  • 负载下快速帧丢弃

第 5 周高级优化P1/P2

  • 有限 NACK 窗口20ms
  • 选择性数据包重传
  • RTCP 微调50ms 间隔)
  • 性能分析
  • 最终延迟调优

第 3 阶段交付成果:

  • LAN 上 25-35ms 延迟
  • 零拷贝 DMA-BUF 管道
  • 低延迟配置的硬件编码器
  • 管道全程最小缓冲
  • 基于条件的自适应质量

第 4 阶段:超低延迟的生产就绪 - 5-7 周

目标: 实现 15-25ms 延迟,同时确保安全、可靠和部署就绪

第 1-2 周超低延迟调优P1/P2

  • 直接 Wayland 捕获评估(如需要)
  • 自定义 WebRTC 层评估(如需要)
  • 高级拥塞控制SCReAM/Google CC
  • 内核旁路优化(如需要 DPDK/AF_XDP
  • 最终延迟优化和调优

第 2-3 周:安全

  • 身份验证JWT、OAuth
  • 加密TLS、DTLS
  • 会话管理
  • 访问控制
  • 安全审计和渗透测试

第 3-4 周:可靠性

  • 错误恢复
  • 连接健康监控
  • 自动重连
  • 延迟感知的优雅降级
  • 故障转移机制

第 4-5 周:监控与调试

  • 实时延迟指标收集
  • 每阶段延迟跟踪
  • 日志改进
  • 带帧检查的调试模式
  • 延迟可视化的性能仪表板
  • 延迟退化警报

第 5-6 周:部署

  • Docker 容器化
  • Systemd 服务
  • 带低延迟预设的配置文件
  • 安装脚本
  • 性能调优文档

第 6-7 周:测试

  • 集成测试
  • 延迟监控的负载测试
  • 跨浏览器测试
  • 长时间稳定性测试
  • 延迟回归测试
  • 自动化性能基准

第 4 阶段交付成果:

  • LAN 上 15-25ms 延迟
  • 生产就绪部署
  • 安全功能
  • 监控和可观测性
  • 综合测试
  • 延迟回归测试

第 5 阶段:高级功能(可选)- 持续进行

潜在功能:

  • 音频捕获和流媒体
  • 双向输入(鼠标、键盘)
  • 剪贴板共享
  • 文件传输
  • 录制(保存会话)
  • 多用户会话
  • 移动客户端支持
  • 用于控制的 WebRTC 数据通道
  • WebRTC 可插入流(客户端效果)
  • 自适应分辨率
  • H.265/HEVC 编码
  • AV1 编码
  • 屏幕区域选择
  • 虚拟显示支持
  • Wayland 虚拟指针协议

测试策略

单元测试

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_dma_buf_lifecycle() {
        let handle = DmaBufHandle::new(/* ... */);
        assert_eq!(handle.ref_count(), 1);

        let handle2 = handle.clone();
        assert_eq!(handle.ref_count(), 2);

        drop(handle);
        assert_eq!(handle2.ref_count(), 1);

        drop(handle2);
        // 缓冲区释放
    }

    #[tokio::test]
    async fn test_encoder_pipeline() {
        let config = EncoderConfig {
            encoder_type: EncoderType::H264_X264,
            bitrate: 2_000_000,
            keyframe_interval: 30,
            preset: EncodePreset::Fast,
        };

        let mut encoder = X264Encoder::new(config).unwrap();

        let frame = create_test_frame(1920, 1080);
        let encoded = encoder.encode(frame).await.unwrap();

        assert!(!encoded.data.is_empty());
        assert!(encoded.is_keyframe);
    }
}

集成测试

#[tokio::test]
async fn test_full_pipeline() {
    // 设置
    let capture = WaylandCapture::new(CaptureConfig::default()).await.unwrap();
    let encoder = VaapiEncoder::new(EncoderConfig::default()).unwrap();
    let webrtc = WebRtcServer::new(WebRtcConfig::default()).await.unwrap();

    // 运行 100 帧管道
    for _ in 0..100 {
        let frame = capture.next_frame().await;
        let encoded = encoder.encode(frame).await.unwrap();
        webrtc.send_video_frame("test-session", encoded).await.unwrap();
    }

    // 验证
    assert_eq!(webrtc.frames_sent(), 100);
}

负载测试

# 模拟 10 个并发连接
for i in {1..10}; do
    cargo test test_full_pipeline --release &
done
wait

性能基准

#[bench]
fn bench_encode_frame(b: &mut Bencher) {
    let mut encoder = X264Encoder::new(config).unwrap();
    let frame = create_test_frame(1920, 1080);

    b.iter(|| {
        encoder.encode(frame.clone()).unwrap()
    });
}

潜在挑战与解决方案

1. Wayland 协议限制

挑战: Wayland 的安全模型限制屏幕捕获

解决方案:

  • 使用 xdg-desktop-portal 进行权限管理
  • 实现用于捕获授权的用户提示
  • 支持多个门户后端GNOME、KDE 等)
pub async fn request_capture_permission() -> Result<bool> {
    let portal = Portal::new().await?;
    let session = portal.create_session(ScreenCaptureType::Monitor).await?;

    // 用户将看到请求权限的对话框
    let sources = portal.request_sources(&session).await?;

    Ok(!sources.is_empty())
}

替代方案: 直接使用 PipeWire 配合适当的身份验证

2. 硬件加速兼容性

挑战: 不同的 GPU 需要不同的 APIVA-API、NVENC 等)

解决方案:

  • 实现多个编码器后端
  • 运行时检测可用编码器
  • 优雅地回退到软件编码
pub fn detect_best_encoder() -> EncoderType {
    // 先尝试 NVENC (NVIDIA)
    if nvenc::is_available() {
        return EncoderType::H264_NVENC;
    }

    // 尝试 VA-API (Intel/AMD)
    if vaapi::is_available() {
        return EncoderType::H264_VAAPI;
    }

    // 回退到软件
    EncoderType::H264_X264
}

3. 跨浏览器 WebRTC 兼容性

挑战: 不同的浏览器具有不同的 WebRTC 实现

解决方案:

  • 使用标准化编解码器H.264、VP8、VP9
  • 实现编解码器协商
  • 提供后备选项
pub fn get_supported_codecs() -> Vec<RTCRtpCodecCapability> {
    vec![
        RTCRtpCodecCapability {
            mime_type: "video/H264".to_string(),
            clock_rate: 90000,
            ..Default::default()
        },
        RTCRtpCodecCapability {
            mime_type: "video/VP9".to_string(),
            clock_rate: 90000,
            ..Default::default()
        },
    ]
}

4. 安全和身份验证

挑战: 安全的远程访问而不将桌面暴露给未经授权的用户

解决方案:

  • 实现 JWT 身份验证
  • 使用 DTLS 进行媒体加密
  • 添加速率限制和访问控制
pub struct AuthManager {
    secret: String,
    sessions: Arc<Mutex<HashMap<String, Session>>>,
}

impl AuthManager {
    pub fn create_token(&self, user_id: &str) -> Result<String> {
        let claims = Claims {
            sub: user_id.to_string(),
            exp: Utc::now() + chrono::Duration::hours(1),
        };

        encode(&Header::default(), &claims, &EncodingKey::from_secret(self.secret.as_ref()))
    }

    pub fn validate_token(&self, token: &str) -> Result<Claims> {
        decode::<Claims>(
            token,
            &DecodingKey::from_secret(self.secret.as_ref()),
            &Validation::default(),
        )
        .map(|data| data.claims)
        .map_err(|_| AuthError::InvalidToken)
    }
}

5. 内存管理

挑战: 避免使用 DMA-BUF 和共享内存时发生内存泄漏

解决方案:

  • 使用 Rust 的所有权系统
  • 用于资源清理的 RAII 模式
  • 带限制的缓冲区池
pub struct ScopedDmaBuf {
    handle: DmaBufHandle,
}

impl Drop for ScopedDmaBuf {
    fn drop(&mut self) {
        // 自动释放 DMA-BUF
        // 文件描述符关闭
        // GPU 内存释放
    }
}

// 使用确保清理
{
    let buf = ScopedDmaBuf::new(/* ... */);
    // 使用缓冲区
} // 自动在此处删除

6. 延迟优化

挑战: 最小化端到端延迟

解决方案:

  • 零拷贝管道
  • 硬件加速
  • 自适应质量
  • 帧跳过
pub struct LatencyOptimizer {
    target_latency_ms: u32,
    current_latency_ms: u32,
}

impl LatencyOptimizer {
    pub fn adjust_parameters(&mut self) {
        if self.current_latency_ms > self.target_latency_ms {
            // 降低质量以改善延迟
            self.reduce_bitrate();
            self.increase_frame_skipping();
        } else {
            // 提高质量
            self.increase_bitrate();
        }
    }
}

7. 网络条件

挑战: 不同的带宽和网络条件

解决方案:

  • 自适应比特率流媒体
  • 多个质量预设
  • 拥塞控制
pub struct BandwidthMonitor {
    measurements: VecDeque<u32>,
    window_size: usize,
}

impl BandwidthMonitor {
    pub fn update(&mut self, bytes_sent: u32, duration: Duration) {
        let bandwidth = bytes_sent * 8 / duration.as_secs() as u32;
        self.measurements.push_back(bandwidth);

        if self.measurements.len() > self.window_size {
            self.measurements.pop_front();
        }
    }

    pub fn average_bandwidth(&self) -> u32 {
        if self.measurements.is_empty() {
            return 0;
        }

        self.measurements.iter().sum::<u32>() / self.measurements.len() as u32
    }
}

8. 跨平台兼容性

挑战: 支持不同的 Linux 发行版和桌面环境

解决方案:

  • 容器化应用程序
  • 运行时检测可用技术
  • 提供后备选项
pub fn detect_desktop_environment() -> DesktopEnvironment {
    if std::path::Path::new("/usr/bin/gnome-shell").exists() {
        DesktopEnvironment::GNOME
    } else if std::path::Path::new("/usr/bin/plasmashell").exists() {
        DesktopEnvironment::KDE
    } else {
        DesktopEnvironment::Other
    }
}

pub fn configure_portal_for_env(env: DesktopEnvironment) -> PortalConfig {
    match env {
        DesktopEnvironment::GNOME => PortalConfig::gnome(),
        DesktopEnvironment::KDE => PortalConfig::kde(),
        DesktopEnvironment::Other => PortalConfig::generic(),
    }
}

9. 调试和故障排除

挑战: 调试具有多个组件的复杂管道

解决方案:

  • 综合日志记录
  • 指标收集
  • 带帧检查的调试模式
pub struct DebugLogger {
    enabled: bool,
    output: DebugOutput,
}

pub enum DebugOutput {
    Console,
    File(PathBuf),
    Both,
}

impl DebugLogger {
    pub fn log_frame(&self, frame: &CapturedFrame) {
        if !self.enabled {
            return;
        }

        tracing::debug!(
            "Frame: {}x{}, format: {:?}, timestamp: {}",
            frame.width,
            frame.height,
            frame.format,
            frame.timestamp
        );
    }

    pub fn log_encoding(&self, encoded: &EncodedFrame) {
        if !self.enabled {
            return;
        }

        tracing::debug!(
            "Encoded: {} bytes, keyframe: {}, seq: {}",
            encoded.data.len(),
            encoded.is_keyframe,
            encoded.sequence_number
        );
    }
}

10. 资源限制

挑战: 防止资源耗尽CPU、内存、GPU

解决方案:

  • 限制并发会话
  • 监控资源使用
  • 实现优雅降级
pub struct ResourceManager {
    max_sessions: usize,
    active_sessions: Arc<Mutex<HashSet<String>>>,
    cpu_threshold: f32,
    memory_threshold: u64,
}

impl ResourceManager {
    pub async fn can_create_session(&self) -> bool {
        let sessions = self.active_sessions.lock().await;

        if sessions.len() >= self.max_sessions {
            return false;
        }

        if self.cpu_usage() > self.cpu_threshold {
            return false;
        }

        if self.memory_usage() > self.memory_threshold {
            return false;
        }

        true
    }
}

代码示例

主应用程序入口点

// src/main.rs
mod capture;
mod encoder;
mod webrtc;
mod buffer;
mod ipc;
mod config;

use anyhow::Result;
use tracing::{info, error};
use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化日志记录
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        .init();

    // 加载配置
    let config = config::load_config("config.toml")?;

    info!("Starting Wayland WebRTC Backend");
    info!("Configuration: {:?}", config);

    // 初始化组件
    let capture = capture::WaylandCapture::new(config.capture).await?;
    let encoder = encoder::create_encoder(config.encoder)?;
    let webrtc = webrtc::WebRtcServer::new(config.webrtc).await?;

    // 创建视频轨道
    let video_track = webrtc::create_video_track()?;

    // 运行捕获管道
    let session_id = uuid::Uuid::new_v4().to_string();
    webrtc.create_peer_connection(session_id.clone(), video_track).await?;

    // 主循环
    loop {
        match capture.next_frame().await {
            Ok(frame) => {
                match encoder.encode(frame).await {
                    Ok(encoded) => {
                        if let Err(e) = webrtc.send_video_frame(&session_id, encoded).await {
                            error!("Failed to send frame: {}", e);
                        }
                    }
                    Err(e) => {
                        error!("Failed to encode frame: {}", e);
                    }
                }
            }
            Err(e) => {
                error!("Failed to capture frame: {}", e);
            }
        }
    }
}

配置示例

# config.toml - 低延迟配置

[server]
address = "0.0.0.0:8080"
max_sessions = 10
log_level = "info"

[capture]
frame_rate = 60
quality = "high"
screen_region = null
# screen_region = { x = 0, y = 0, width = 1920, height = 1080 }

# 低延迟捕获设置
zero_copy = true  # 始终使用 DMA-BUF 零拷贝
track_damage = true  # 启用损坏跟踪
partial_updates = true  # 仅编码损坏区域
buffer_pool_size = 3  # 低延迟的小缓冲池

[encoder]
type = "auto"
# 选项: "vaapi", "nvenc", "x264", "auto"
bitrate = 4000000
keyframe_interval = 15  # 低延迟短 GOP
preset = "ultrafast"  # Ultrafast 最小延迟

# 低延迟特定设置
[encoder.low_latency]
gop_size = 15  # 短 GOP
b_frames = 0  # 低延迟无 B 帧
lookahead = 0  # 最小前瞻
rc_mode = "CBR"  # 恒定比特率可预测延迟
vbv_buffer_size = 4000000  # 1 秒 VBV 缓冲区
max_bitrate = 4000000  # 紧最大约束
intra_period = 15  # 关键帧间隔

# 质量与延迟预设
[encoder.quality_presets]
low = { bitrate = 1000000, preset = "ultrafast", gop_size = 30 }
medium = { bitrate = 2000000, preset = "ultrafast", gop_size = 20 }
high = { bitrate = 4000000, preset = "ultrafast", gop_size = 15 }
ultra = { bitrate = 8000000, preset = "veryfast", gop_size = 15 }

[webrtc]
stun_servers = ["stun:stun.l.google.com:19302"]
turn_servers = []

[webrtc.low_latency]
# 关键:最小播放延迟
playout_delay_min_ms = 0  # 无最小延迟
playout_delay_max_ms = 20  # 20ms 最大延迟

# 比特率设置
max_bitrate = 8000000  # 8 Mbps 最大
min_bitrate = 500000  # 500 Kbps 最小
start_bitrate = 4000000  # 4 Mbps 起始

# 打包
rtp_payload_size = 1200  # 更小数据包降低延迟
packetization_mode = "non_interleaved"

# 重传设置
nack_enabled = true  # 启用 NACK
nack_window_size_ms = 20  # 仅请求重传最近数据包
max_nack_packets_per_second = 50
fec_enabled = false  # 禁用 FEC 降低延迟

# 拥塞控制
transport_cc_enabled = true  # 启用传输宽拥塞控制

# RTCP 设置
rtcp_report_interval_ms = 50  # 频繁 RTCP 反馈

[webrtc.ice]
transport_policy = "all"
candidate_network_types = ["udp", "tcp"]

[buffer]
# 低延迟最小缓冲
dma_buf_pool_size = 3  # 小池
encoded_buffer_pool_size = 5  # 非常小池
sender_buffer_max_frames = 1  # 单帧发送缓冲
jitter_buffer_target_delay_ms = 5  # 5ms 目标抖动缓冲
jitter_buffer_max_delay_ms = 10  # 10ms 最大抖动缓冲

[latency]
# 延迟目标
target_latency_ms = 25  # 目标端到端延迟
max_acceptable_latency_ms = 50  # 最大可接受延迟

# 自适应设置
adaptive_frame_rate = true
adaptive_bitrate = true
fast_frame_drop = true

# 帧丢弃策略
consecutive_drop_limit = 3  # 最大连续丢弃
drop_threshold_ms = 5  # 如果队列延迟超过此值则丢弃

[monitoring]
enable_metrics = true
enable_latency_tracking = true
metrics_port = 9090

性能目标

延迟目标

本地网络 (LAN)

场景 目标 可接受
核心目标 25-35ms 15-25ms优秀
最低 <50ms 15-25ms优秀
用户体验 <16ms: 感觉不到 16-33ms: 非常流畅
用户体验 33-50ms: 良好 50-100ms: 可接受

互联网

场景 目标 可接受
优秀 40-60ms <80ms
良好 60-80ms <100ms

各阶段性能指标

指标 MVP 第 2 阶段 第 3 阶段 第 4 阶段
FPS (LAN) 30 60 60 60
FPS (互联网) 15-20 30 30-60 60
分辨率 720p 1080p 1080p/4K 1080p/4K
延迟 (LAN) <100ms <50ms 25-35ms 15-25ms
延迟 (互联网) <200ms <100ms 60-80ms 40-60ms
CPU 使用率 20-30% 10-15% 5-10% <5%
内存使用 150MB 250MB 400MB <400MB
比特率 2-4 Mbps 4-8 Mbps 自适应 自适应
并发会话 1 3-5 5-10 10+

延迟预算分配15-25ms 目标)

组件 时间 (ms) 百分比 优化策略
Wayland 捕获 2-3 12-15% DMA-BUF 零拷贝、部分更新
编码器 3-5 20-25% 硬件编码器、无 B 帧
打包 1-2 6-10% 内联 RTP、最小缓冲
网络 (LAN) 0.5-1 3-5% UDP 直接路径、内核旁路
抖动缓冲 0-2 0-10% 最小缓冲、预测性抖动
解码器 1-2 6-10% 硬件加速
显示 1-2 6-10% vsync 旁路、直接扫描
总计 15-25 100%

结论

本设计提供了在 Rust 中构建超低延迟 Wayland → WebRTC 远程桌面后端的全面蓝图。关键亮点:

  1. 零拷贝架构:通过 DMA-BUF 和引用计数缓冲区最小化 CPU 拷贝,实现 <5ms 拷贝开销
  2. 硬件加速:配置为 <5ms 编码延迟的 VA-API/NVENC 编码器
  3. 最小缓冲:单帧发送缓冲和管道全程 0-10ms 抖动缓冲
  4. 低延迟 WebRTC自定义配置0-20ms 播放延迟、无 FEC、有限 NACK
  5. 性能:本地网络 60 FPS 下 15-25ms 延迟目标
  6. 自适应质量基于网络条件动态帧率30-60fps和比特率调整
  7. 损坏跟踪:针对静态内容的部分区域更新以减少编码负载

延迟预算分解15-25ms 目标):

  • 捕获2-3msDMA-BUF 零拷贝)
  • 编码器3-5ms硬件、无 B 帧)
  • 打包1-2ms内联 RTP
  • 网络0.5-1msLAN
  • 抖动缓冲0-2ms最小
  • 解码器1-2ms硬件
  • 显示1-2msvsync 旁路)

分阶段实施方法允许增量开发和测试:

  • 第 1 阶段4-6 周)MVP<100ms 延迟
  • 第 2 阶段3-4 周):硬件加速,<50ms 延迟
  • 第 3 阶段4-5 周)低延迟优化25-35ms 延迟
  • 第 4 阶段5-7 周)超低延迟调优15-25ms 延迟

实现 15-25ms 延迟的关键 P0 优化:

  1. 零 B 帧、15 帧 GOP 的硬件编码器
  2. DMA-BUF 零拷贝捕获管道
  3. 最小缓冲1 帧发送、0-10ms 抖动)
  4. 低延迟 WebRTC 配置0-20ms 播放延迟)

附加资源

Wayland & PipeWire

WebRTC

视频编码

Rust