Files
wl-webrtc/DETAILED_DESIGN_CN.md.backup
2026-02-03 11:14:25 +08:00

6037 lines
166 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Wayland → WebRTC 远程桌面后端详细设计文档
## 目录
## 1. 系统概述
### 1.1 项目背景
### 1.2 目标和约束
### 1.3 性能目标
## 2. 架构设计
### 2.1 整体架构
### 2.2 模块划分
### 2.3 模块交互
### 2.4 数据流
### 2.5 时序图
## 3. 详细组件设计
### 3.1 捕获模块
### 3.2 编码模块
### 3.3 WebRTC 传输模块
### 3.4 缓冲管理模块
### 3.5 信令模块
## 4. 数据结构设计
### 4.1 核心数据结构
### 4.2 内存管理
### 4.3 状态管理
## 5. 接口设计
### 5.1 公共 API
### 5.2 内部接口
### 5.3 错误处理接口
## 6. 性能优化
### 6.1 性能指标
### 6.2 优化策略
### 6.3 性能监控
### 6.4 调优指南
## 7. 并发设计
### 7.1 线程模型
### 7.2 同步机制
### 7.3 任务调度
### 7.4 锁策略
## 8. 网络设计
### 8.1 协议栈
### 8.2 数据格式
### 8.3 网络优化
### 8.4 错误处理
## 9. 安全设计
### 9.1 认证授权
### 9.2 数据加密
### 9.3 安全审计
### 9.4 防护措施
## 10. 测试策略
### 10.1 测试层次
### 10.2 测试用例
### 10.3 性能测试
### 10.4 持续集成
## 11. 部署运维
### 11.1 部署方案
### 11.2 配置管理
### 11.3 监控告警
### 11.4 日志管理
### 11.5 故障处理
## 12. 扩展设计
### 12.1 插件机制
### 12.2 版本管理
### 12.3 扩展点
## 附录
### A. 术语表
### B. 参考资料
### C. 配置示例
---
## 1. 系统概述
### 1.1 项目背景
Wayland → WebRTC 远程桌面后端是一个高性能、低延迟的远程桌面解决方案,旨在通过 WebRTC 技术将 Wayland 桌面会话实时传输到 Web 浏览器。该系统充分利用现代 Linux 图形栈的优势,通过零拷贝技术实现超低延迟的桌面共享体验。
**关键特性:**
- 基于 Wayland 协议的现代图形栈支持
- PipeWire xdg-desktop-portal 权限管理
- 硬件加速的视频编码VA-API、NVENC
- 零拷贝的内存管理DMA-BUF
- WebRTC 标准协议支持
- 自适应码率和质量控制
- 损坏区域检测和部分更新编码
### 1.2 目标和约束
**功能目标:**
1. 支持 Wayland 桌面的实时屏幕捕获
2. 通过 WebRTC 实现端到端的视频传输
3. 支持多种视频编码格式H.264、H.265、VP9
4. 提供可靠的输入回传机制
5. 支持多客户端并发连接
**性能目标:**
- 端到端延迟:本地网络 < 25ms广域网 < 100ms
- 帧率:支持 30-60 FPS
- 分辨率:最高支持 4K 分辨率
- 吞吐量:单会话支持最高 8 Mbps
**技术约束:**
- 操作系统LinuxWayland 环境)
- GPU支持 VA-APIIntel/AMD或 NVENCNVIDIA
- 浏览器:支持 WebRTC 的现代浏览器
- 网络:支持 UDP 传输和 NAT 穿透
**资源约束:**
- CPU 使用率:单核 < 30%(硬件编码)
- 内存占用:< 500MB
- GPU 显存:< 2GB
### 1.3 性能目标
**延迟目标:**
| 场景 | 目标延迟 | 优选延迟 | 最大可接受延迟 |
|------|---------|---------|--------------|
| 局域网(硬件编码) | 15-20ms | 10-15ms | 30ms |
| 局域网(软件编码) | 30-40ms | 25-30ms | 60ms |
| 广域网(低延迟模式) | 40-60ms | 30-40ms | 100ms |
| 广域网(标准模式) | 60-100ms | 50-80ms | 150ms |
**质量目标:**
| 分辨率 | 帧率 | 比特率 | 目标质量 |
|--------|------|--------|---------|
| 1920x1080 | 60 FPS | 4 Mbps | 高清 |
| 1920x1080 | 30 FPS | 2 Mbps | 标准 |
| 3840x2160 | 30 FPS | 8 Mbps | 超高清 |
| 1280x720 | 60 FPS | 1.5 Mbps | 流畅 |
**吞吐量目标:**
- 单服务器并发会话数10-20
- 单会话最大数据包速率2000 pps包/秒)
- 网络带宽利用率:> 90%(在限制内)
**资源目标:**
- 帧捕获时间:< 2ms
- 编码延迟:< 8ms硬件< 20ms软件
- WebRTC 打包延迟:< 3ms
- 网络传输延迟:< 5ms局域网
---
## 2. 架构设计
### 2.1 整体架构
```
┌──────────────────────────────────────────────────────────────────────────┐
│ 客户端浏览器 │
│ (WebRTC 接收端) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 视频 │ │ 音频 │ │ 输入 │ │ 控制 │ │
│ │ 解码 │ │ 解码 │ │ 处理 │ │ 管理 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────┬────────────────────────────────────────────┘
│ WebRTC 媒体流 (RTP/RTCP)
│ WebRTC 数据通道
│ WebSocket 信令
┌──────────────────────────────────────────────────────────────────────────┐
│ 信令服务器 │
│ (WebSocket/WebSocket Secure) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 会话管理 │ │ SDP交换 │ │ ICE候选 │ │ 认证授权 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────┬────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────┐
│ Rust 后端服务器 │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ 捕获管理器 │ │ 编码器管道 │ │ WebRTC传输层 │ │
│ ├──────────────────┤ ├──────────────────┤ ├──────────────────┤ │
│ │ • PipeWire接口 │───▶│ • 视频编码器 │───▶│ • PeerConnection │ │
│ │ • DMA-BUF获取 │ │ • 码率控制 │ │ • RTP打包 │ │
│ │ • 损坏跟踪 │ │ • 帧率控制 │ │ • ICE处理 │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ 缓冲管理器 │ │ 输入处理器 │ │ 数据通道管理 │ │
│ ├──────────────────┤ ├──────────────────┤ ├──────────────────┤ │
│ │ • DMA-BUF池 │ │ • 鼠标事件 │ │ • 控制消息 │ │
│ │ • 编码缓冲池 │ │ • 键盘事件 │ │ • 输入事件 │ │
│ │ • 内存所有权 │ │ • 事件转换 │ │ • 状态同步 │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 零拷贝缓冲区管理器 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ DMA-BUF池 │ │ 共享内存池 │ │ 字节缓冲池 │ │ │
│ │ │ (GPU内存) │ │ (跨进程) │ │ (CPU内存) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────┐
│ Wayland 合成器 │
│ (PipeWire 屏幕共享 / KDE Plasma / GNOME) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 窗口管理 │ │ 合成渲染 │ │ 输入处理 │ │ 输出管理 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
```
### 2.2 模块划分
#### 2.2.1 捕获管理模块Capture Manager
**职责:**
- 与 PipeWire 和 xdg-desktop-portal 交互
- 获取屏幕捕获权限
- 接收 DMA-BUF 格式的帧数据
- 跟踪屏幕损坏区域
- 管理捕获会话生命周期
**子模块:**
1. `PipeWireClient`: PipeWire 协议客户端
2. `PortalClient`: xdg-desktop-portal 客户端
3. `DamageTracker`: 损坏区域跟踪器
4. `FrameAcquisition`: 帧获取协调器
#### 2.2.2 编码器模块Encoder Module
**职责:**
- 视频帧编码H.264/H.265/VP9
- 硬件加速支持VA-API、NVENC
- 自适应码率控制
- 帧率调整
- 关键帧请求
**子模块:**
1. `EncoderFactory`: 编码器工厂
2. `VaapiEncoder`: VA-API 硬件编码器
3. `NvencEncoder`: NVENC 硬件编码器
4. `SoftwareEncoder`: 软件编码器后备
5. `BitrateController`: 码率控制器
6. `FrameRateController`: 帧率控制器
#### 2.2.3 WebRTC 传输模块WebRTC Transport Module
**职责:**
- WebRTC 对等连接管理
- RTP/RTCP 数据包处理
- ICE/STUN/TURN 协议处理
- 拥塞控制
- 丢包恢复
**子模块:**
1. `PeerConnectionManager`: 对等连接管理器
2. `RtpPacketizer`: RTP 打包器
3. `IceManager`: ICE 候选管理器
4. `CongestionController`: 拥塞控制器
5. `NackHandler`: NACK 处理器
6. `DataChannelManager`: 数据通道管理器
#### 2.2.4 缓冲管理模块Buffer Management Module
**职责:**
- DMA-BUF 生命周期管理
- 内存池分配和回收
- 零拷贝所有权转移
- 内存泄漏防护
**子模块:**
1. `DmaBufPool`: DMA-BUF 池
2. `EncodedBufferPool`: 编码缓冲池
3. `SharedMemoryPool`: 共享内存池
4. `MemoryTracker`: 内存跟踪器
#### 2.2.5 信令模块Signaling Module
**职责:**
- WebSocket 连接管理
- SDP 会话描述交换
- ICE 候选传输
- 会话状态管理
**子模块:**
1. `WebSocketServer`: WebSocket 服务器
2. `SdpHandler`: SDP 处理器
3. `IceCandidateHandler`: ICE 候选处理器
4. `SessionManager`: 会话管理器
### 2.3 模块交互
**主要交互流程:**
```
[Wayland Compositor]
│ DMA-BUF (屏幕内容)
[PipeWire]
│ 屏幕捕获流
[捕获管理器] ────────────▶ [缓冲管理器]
│ │
│ 损坏区域信息 │ DMA-BUF 句柄
▼ ▼
[编码器模块] ◀──────────────┘
│ 编码后的数据 (Bytes)
[WebRTC传输层]
│ RTP 数据包
[网络] ─────────────────────▶ [客户端浏览器]
│ 控制信息
[数据通道管理器]
│ 输入事件
[输入处理器] ────────────▶ [Wayland Compositor]
│ SDP/ICE 候选
[信令服务器] ◀─────────────▶ [客户端浏览器]
```
### 2.4 数据流
**详细数据流:**
```
阶段 1: 捕获
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: GPU 帧缓冲区 (Wayland 合成器输出)
处理:
1. PipeWire 获取帧缓冲区
2. 创建 DMA-BUF 文件描述符
3. 设置损坏区域标记
4. 生成时间戳
输出: CapturedFrame { dma_buf, width, height, format, timestamp }
拷贝: 无 (零拷贝)
延迟: 1-2ms
阶段 2: 缓冲管理
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: CapturedFrame (带 DMA-BUF FD)
处理:
1. 从 DMA-BUF 池获取空闲缓冲区
2. 验证 DMA-BUF 有效性
3. 转移所有权 (移动语义)
4. 更新内存跟踪
输出: DmaBufHandle { fd, size, stride, offset }
拷贝: 无 (所有权转移)
延迟: < 0.1ms
阶段 3: 编码
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: DmaBufHandle
处理:
1. 导入 DMA-BUF 到编码器 (硬件)
2. 执行编码操作
3. 输出编码数据到映射缓冲区
4. 生成编码帧元数据
输出: EncodedFrame { data: Bytes, is_keyframe, timestamp, seq_num }
拷贝: 无 (DMA-BUF 直接导入)
延迟:
- VA-API: 3-5ms
- NVENC: 2-4ms
- x264: 15-25ms
阶段 4: RTP 打包
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: EncodedFrame
处理:
1. 分片大帧到多个 RTP 包
2. 添加 RTP 头部
3. 设置时间戳和序列号
4. 关联数据包到 Bytes
输出: Vec<RtpPacket>
拷贝: 无 (Bytes 引用)
延迟: < 0.5ms
阶段 5: 网络传输
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: Vec<RtpPacket>
处理:
1. UDP 套接字发送
2. 内核空间拷贝
3. 网络协议栈处理
4. 物理传输
输出: UDP 数据包
拷贝: 一次 (内核空间)
延迟:
- 局域网: < 1ms
- 广域网: 10-100ms
阶段 6: 客户端接收
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
输入: UDP 数据包
处理:
1. 接收数据包
2. RTP 解包
3. 重组帧
4. 解码播放
输出: 显示输出
延迟:
- 解码: 2-5ms (硬件), 10-20ms (软件)
- 渲染: 1-2ms
总延迟 (局域网 + 硬件编码): 10-20ms
总延迟 (局域网 + 软件编码): 30-45ms
总延迟 (广域网 + 硬件编码): 40-100ms
```
### 2.5 时序图
#### 2.5.1 会话建立时序图
```
客户端 信令服务器 后端服务器 Wayland/编码器
│ │ │ │
│── 连接 WebSocket ──▶│ │ │
│◀─── 接受连接 ──────│ │ │
│ │ │ │
│── 开始捕获请求 ────▶│── 转发请求 ───────▶│ │
│ │ │── 请求权限 ──────▶│
│ │ │◀── 获取流句柄 ───│
│ │ │ │
│◀─── 流 ID ─────────│◀─── 流 ID ─────────│ │
│ │ │ │
│── 创建 Offer ──────▶│── 转发 Offer ─────▶│ │
│ │ │── 创建 PC ───────▶│
│ │ │◀── ICE 候选 ──────│
│◀─── Answer ────────│◀─── Answer ────────│ │
│ │ │ │
│◀── ICE 候选 ────────│◀── ICE 候选 ────────│ │
│── ICE 候选 ────────▶│── ICE 候选 ───────▶│ │
│ │ │ │
│◀─── 连接建立 ──────│◀─── 连接建立 ──────│ │
│ │ │ │
│ │ │── 开始捕获 ──────▶│
│ │ │◀── DMA-BUF 帧数据 ─│
│◀─── 视频 RTP ──────│ │── 编码 ──────────▶│
│ │ │◀── 编码数据 ──────│
│◀─── 视频 RTP ────────────────────────────│ │
│ │ │ │
│── 输入事件 ────────▶│── 转发输入 ───────▶│ │
│ │ │── 发送到 Wayland ─▶│
```
#### 2.5.2 帧处理时序图
```
PipeWire 捕获管理器 缓冲管理器 编码器 WebRTC 网络
│ │ │ │ │ │
│── DMA-BUF ──▶│ │ │ │ │
│ │ │ │ │ │
│ │── 创建帧 ─────▶│ │ │ │
│ │ │ │ │ │
│ │── 获取句柄 ───▶│ │ │ │
│ │◀── 句柄ID ────│ │ │ │
│ │ │ │ │ │
│ │── 编码请求 ───────────────▶│ │ │
│ │ │ │ │ │
│ │ │── 导入DMA ─▶│ │ │
│ │ │◀── 表面ID ─│ │ │
│ │ │ │ │ │
│ │ │ │── 编码 ──▶│ │
│ │ │ │◀── 数据 ──│ │
│ │ │ │ │ │
│ │◀── 编码帧 ──────────────────│ │ │
│ │ │ │ │
│ │── RTP 打包 ───────────────────────────▶│ │
│ │ │ │ │
│ │ │ │── 发送 ──▶│
│ │ │ │ │
│ │◀── 发送完成 ─────────────────────────────│ │
│ │ │ │ │
│ │── 释放DMA ───▶│ │ │ │
│◀── 缓冲回收 ──│ │ │ │ │
时间 (硬件编码): 5-8ms
时间 (软件编码): 20-30ms
```
#### 2.5.3 错误恢复时序图
```
客户端 WebRTC 后端 编码器 管道
│ │ │ │ │
│── 丢失帧NACK ─▶│ │ │ │
│ │── 重传请求 ──▶│ │ │
│ │◀── 重传数据 ──│ │ │
│◀── 重传数据 ──│ │ │ │
│ │ │ │ │
│ │── 编码错误 ──────────────▶│ │
│ │ │ │ │
│ │ │── 重启 ───▶│ │
│ │ │◀── 就绪 ──│ │
│ │── 请求关键帧 ─────────────────────▶│
│◀── 关键帧 ────│◀── 关键帧 ──│◀── 关键帧 ─│◀── 关键帧 │
│ │ │ │ │
│ │── 恢复流 ────────────────────────▶│
│◀── 恢复流 ────│ │ │ │
```
---
## 3. 详细组件设计
### 3.1 捕获模块
#### 3.1.1 PipeWire 客户端
**设计原理:**
PipeWire 客户端负责与 PipeWire 守护进程通信,通过 xdg-desktop-portal 获取屏幕捕获权限,并接收实时视频流。
**数据结构:**
```rust
/// PipeWire 核心连接
pub struct PipewireCore {
/// PipeWire 主循环
main_loop: MainLoop,
/// PipeWire 上下文
context: Context,
/// PipeWire 核心连接
core: Arc<Core>,
/// 事件循环线程句柄
thread_handle: JoinHandle<()>,
}
/// PipeWire 捕获流
pub struct PipewireStream {
/// 流句柄
stream: Stream,
/// 流状态
state: StreamState,
/// 帧格式
format: Option<Format>,
/// 缓冲区配置
buffer_config: BufferConfig,
/// 帧发送器
frame_sender: async_channel::Sender<CapturedFrame>,
}
/// 流状态枚举
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamState {
Unconnected,
Connecting,
Connected,
Streaming,
Error,
}
/// 缓冲区配置
pub struct BufferConfig {
/// 缓冲区数量
pub num_buffers: usize,
/// 缓冲区大小
pub buffer_size: usize,
/// 最小缓冲区数量
pub min_buffers: usize,
/// 最大缓冲区数量
pub max_buffers: usize,
}
```
**接口定义:**
```rust
impl PipewireCore {
/// 创建新的 PipeWire 核心连接
pub fn new() -> Result<Self, PipewireError> {
let main_loop = MainLoop::new(None)?;
let context = Context::new(&main_loop)?;
let core = Arc::new(context.connect(None)?);
// 启动事件循环线程
let core_clone = core.clone();
let main_loop_clone = main_loop.clone();
let thread_handle = std::thread::spawn(move || {
main_loop_clone.run();
});
Ok(Self {
main_loop,
context,
core,
thread_handle,
})
}
/// 获取 PipeWire 核心
pub fn core(&self) -> &Arc<Core> {
&self.core
}
/// 关闭连接
pub fn shutdown(self) {
self.main_loop.quit();
self.thread_handle.join().ok();
}
}
impl PipewireStream {
/// 创建新的捕获流
pub fn new(
core: &Arc<Core>,
sender: async_channel::Sender<CapturedFrame>,
) -> Result<Self, PipewireError> {
let mut stream = Stream::new(
core,
"wl-webrtc-capture",
properties! {
*pw::keys::MEDIA_TYPE => "Video",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_ROLE => "Screen",
},
)?;
// 设置流监听器
let listener = stream.add_local_listener()?;
// 设置参数变更回调
listener.register(
pw::stream::events::Events::ParamChanged,
Self::on_param_changed,
)?;
// 设置处理回调
listener.register(
pw::stream::events::Events::Process,
Self::on_process,
)?;
Ok(Self {
stream,
state: StreamState::Unconnected,
format: None,
buffer_config: BufferConfig::default(),
frame_sender: sender,
})
}
/// 连接到屏幕捕获源
pub fn connect(&mut self, node_id: u32) -> Result<(), PipewireError> {
self.stream.connect(
pw::spa::direction::Direction::Input,
Some(node_id),
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
)?;
self.state = StreamState::Connected;
Ok(())
}
/// 处理参数变更
fn on_param_changed(stream: &Stream, event_id: u32, event_data: *mut c_void) {
// 获取流格式信息
if let Some(format) = stream.format() {
let video_info = format.parse::<VideoInfoRaw>().unwrap();
// 保存格式信息
}
}
/// 处理新帧
fn on_process(stream: &Stream) {
// 获取缓冲区
let buffer = stream.dequeue_buffer().expect("no buffer");
// 获取 DMA-BUF 信息
let datas = buffer.datas();
let data = &datas[0];
// 提取 DMA-BUF 文件描述符
let fd = data.fd().expect("no fd");
let size = data.chunk().size() as usize;
let stride = data.chunk().stride();
// 创建捕获帧
let frame = CapturedFrame {
dma_buf: DmaBufHandle::new(fd, size, stride, 0),
width: stream.format().unwrap().size().width,
height: stream.format().unwrap().size().height,
format: PixelFormat::from_spa_format(&stream.format().unwrap()),
timestamp: timestamp_ns(),
};
// 发送帧
if let Err(e) = self.frame_sender.try_send(frame) {
warn!("Failed to send frame: {:?}", e);
}
}
}
```
#### 3.1.2 损坏跟踪器
**设计原理:**
损坏跟踪器通过比较连续帧的差异,只编码和传输屏幕上发生变化的区域,从而显著减少带宽和编码开销。
**数据结构:**
```rust
/// 损坏区域跟踪器
pub struct DamageTracker {
/// 上一帧的句柄
last_frame: Option<DmaBufHandle>,
/// 损坏区域队列
damaged_regions: VecDeque<ScreenRegion>,
/// 最小损坏阈值(像素)
min_damage_threshold: u32,
/// 最大损坏区域数
max_regions: usize,
/// 统计信息
stats: DamageStats,
}
/// 屏幕区域
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ScreenRegion {
pub x: u32,
pub y: u32,
pub width: u32,
pub height: u32,
}
/// 损坏统计
pub struct DamageStats {
pub total_frames: u64,
pub damaged_frames: u64,
pub total_regions: u64,
pub avg_region_size: f32,
}
```
**状态机:**
```
初始化
[等待第一帧]
[全屏标记]
┌─────────────┐
│ 检测损坏 │◀─────────┐
└─────────────┘ │
│ │
│ 无损坏 │ 有损坏
▼ │
[跳过编码] │
│ │
▼ │
[等待下一帧]────────────┘
[输出损坏区域]
```
**接口定义:**
```rust
impl DamageTracker {
/// 创建新的损坏跟踪器
pub fn new(min_threshold: u32, max_regions: usize) -> Self {
Self {
last_frame: None,
damaged_regions: VecDeque::with_capacity(max_regions),
min_damage_threshold: min_threshold,
max_regions,
stats: DamageStats::default(),
}
}
/// 更新损坏区域
pub fn update(&mut self, new_frame: &CapturedFrame) -> Vec<ScreenRegion> {
self.stats.total_frames += 1;
match &self.last_frame {
Some(last) => {
// 比较帧差异
let regions = self.compute_damage_regions(last, new_frame);
if !regions.is_empty() {
self.stats.damaged_frames += 1;
self.stats.total_regions += regions.len() as u64;
// 更新平均区域大小
let total_pixels: u64 = regions.iter()
.map(|r| r.width * r.height)
.sum();
self.stats.avg_region_size = total_pixels as f32 / regions.len() as f32;
}
self.last_frame = Some(new_frame.dma_buf.clone());
regions
}
None => {
// 第一帧,标记全屏
self.stats.damaged_frames += 1;
self.stats.total_regions += 1;
self.stats.avg_region_size =
(new_frame.width * new_frame.height) as f32;
self.last_frame = Some(new_frame.dma_buf.clone());
vec![ScreenRegion {
x: 0,
y: 0,
width: new_frame.width,
height: new_frame.height,
}]
}
}
}
/// 计算损坏区域
fn compute_damage_regions(
&self,
last: &DmaBufHandle,
new: &CapturedFrame,
) -> Vec<ScreenRegion> {
// 将帧划分为块进行比较
let block_size = 16;
let blocks_x = (new.width as usize + block_size - 1) / block_size;
let blocks_y = (new.height as usize + block_size - 1) / block_size;
let mut damaged_blocks = Vec::new();
// 比较每个块
for y in 0..blocks_y {
for x in 0..blocks_x {
let changed = self.compare_block(last, new, x, y, block_size);
if changed {
damaged_blocks.push((x, y));
}
}
}
// 合并相邻块为区域
self.merge_blocks_to_regions(damaged_blocks, block_size, new.width, new.height)
}
/// 比较块是否变化
fn compare_block(
&self,
last: &DmaBufHandle,
new: &CapturedFrame,
block_x: usize,
block_y: usize,
block_size: usize,
) -> bool {
// 映射两个 DMA-BUF
let last_ptr = unsafe { last.as_ptr() };
let new_ptr = unsafe { new.dma_buf.as_ptr() };
// 计算块偏移
let width = new.width as usize;
let block_start = (block_y * block_size * width + block_x * block_size) * 4; // RGBA
// 简单比较(实际可以使用更高效的哈希或采样)
unsafe {
for i in 0..block_size.min(new.height as usize - block_y * block_size) {
for j in 0..block_size.min(new.width as usize - block_x * block_size) {
let offset = block_start + (i * width + j) * 4;
let last_pixel = *(last_ptr.add(offset) as *const [u8; 4]);
let new_pixel = *(new_ptr.add(offset) as *const [u8; 4]);
if last_pixel != new_pixel {
return true;
}
}
}
}
false
}
/// 合并块为区域
fn merge_blocks_to_regions(
&self,
blocks: Vec<(usize, usize)>,
block_size: usize,
width: u32,
height: u32,
) -> Vec<ScreenRegion> {
if blocks.is_empty() {
return vec![];
}
// 使用并查集或扫描线算法合并块
// 简化实现:转换为区域
let mut regions = Vec::new();
for (bx, by) in blocks {
regions.push(ScreenRegion {
x: (bx * block_size) as u32,
y: (by * block_size) as u32,
width: block_size as u32,
height: block_size as u32,
});
}
// 合并相邻区域
self.merge_adjacent_regions(regions)
}
/// 合并相邻区域
fn merge_adjacent_regions(&self, mut regions: Vec<ScreenRegion>) -> Vec<ScreenRegion> {
// 简化实现:直接返回前 N 个区域
regions.truncate(self.max_regions);
regions
}
/// 获取统计信息
pub fn stats(&self) -> &DamageStats {
&self.stats
}
/// 重置跟踪器
pub fn reset(&mut self) {
self.last_frame = None;
self.damaged_regions.clear();
self.stats = DamageStats::default();
}
}
```
### 3.2 编码模块
#### 3.2.1 编码器抽象
**设计原理:**
编码器抽象定义了统一的视频编码接口,支持多种编码器实现(硬件和软件),便于动态切换和扩展。
**数据结构:**
```rust
/// 视频编码器 trait
#[async_trait]
pub trait VideoEncoder: Send + Sync {
/// 编码一帧
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError>;
/// 重新配置编码器
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError>;
/// 请求关键帧
async fn request_keyframe(&mut self) -> Result<(), EncoderError>;
/// 获取编码器统计信息
fn stats(&self) -> EncoderStats;
/// 获取支持的特性
fn capabilities(&self) -> EncoderCapabilities;
}
/// 编码器配置
#[derive(Debug, Clone)]
pub struct EncoderConfig {
/// 编码器类型
pub encoder_type: EncoderType,
/// 视频分辨率
pub width: u32,
pub height: u32,
/// 帧率
pub frame_rate: u32,
/// 目标比特率 (bps)
pub bitrate: u32,
/// 最大比特率 (bps)
pub max_bitrate: u32,
/// 最小比特率 (bps)
pub min_bitrate: u32,
/// 关键帧间隔
pub keyframe_interval: u32,
/// 编码预设
pub preset: EncodePreset,
/// 编码调优
pub tune: EncodeTune,
}
/// 编码器类型
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncoderType {
H264_VAAPI, // H.264 VA-API 硬件编码
H264_NVENC, // H.264 NVENC 硬件编码
H264_X264, // H.264 x264 软件编码
H265_VAAPI, // H.265 VA-API 硬件编码
VP9_VAAPI, // VP9 VA-API 硬件编码
}
/// 编码预设
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncodePreset {
Ultrafast, // 极快,最低延迟
Superfast, // 超快
Veryfast, // 很快
Faster, // 快
Fast, // 中等
Medium, // 较慢
Slow, // 慢
Slower, // 更慢
Veryslow, // 极慢
}
/// 编码调优
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncodeTune {
Zerolatency, // 零延迟
Film, // 电影
Animation, // 动画
Grain, // 胶片颗粒
Stillimage, // 静态图像
}
/// 编码后的帧
#[derive(Debug, Clone)]
pub struct EncodedFrame {
/// 编码数据 (零拷贝 Bytes)
pub data: Bytes,
/// 是否为关键帧
pub is_keyframe: bool,
/// 时间戳 (ns)
pub timestamp: u64,
/// 序列号
pub sequence_number: u64,
/// RTP 时间戳
pub rtp_timestamp: u32,
}
/// 编码器错误
#[derive(Debug, thiserror::Error)]
pub enum EncoderError {
#[error("编码器初始化失败: {0}")]
InitFailed(String),
#[error("编码失败: {0}")]
EncodeFailed(String),
#[error("重新配置失败: {0}")]
ReconfigureFailed(String),
#[error("关键帧请求失败: {0}")]
KeyframeRequestFailed(String),
#[error("不支持的格式: {0}")]
UnsupportedFormat(String),
#[error("DMA-BUF 导入失败")]
DmaBufImportFailed,
}
/// 编码器统计信息
#[derive(Debug, Clone, Default)]
pub struct EncoderStats {
/// 已编码帧数
pub frames_encoded: u64,
/// 关键帧数
pub keyframes: u64,
/// 平均编码延迟 (ms)
pub avg_encode_latency_ms: f64,
/// 总输出字节数
pub total_bytes: u64,
/// 实际比特率 (bps)
pub actual_bitrate: u32,
/// 丢帧数
pub dropped_frames: u64,
}
/// 编码器能力
#[derive(Debug, Clone, Default)]
pub struct EncoderCapabilities {
/// 是否支持硬件加速
pub hardware_accelerated: bool,
/// 是否支持 DMA-BUF 导入
pub supports_dma_buf: bool,
/// 支持的最大分辨率
pub max_resolution: (u32, u32),
/// 支持的最大帧率
pub max_frame_rate: u32,
/// 支持的比特率范围
pub bitrate_range: (u32, u32),
/// 是否支持动态码率调整
pub supports_dynamic_bitrate: bool,
}
```
#### 3.2.2 VA-API 编码器
**设计原理:**
VA-API 编码器利用 Intel/AMD GPU 的硬件视频编码能力,通过直接导入 DMA-BUF 实现零拷贝编码,提供极低的延迟。
**数据结构:**
```rust
/// VA-API H.264 编码器
pub struct VaapiH264Encoder {
/// VA 显示
display: va::Display,
/// VA 配置
config_id: va::ConfigID,
/// VA 上下文
context_id: va::ContextID,
/// 编码器配置
config: EncoderConfig,
/// 序列参数缓冲区
seq_param: VAEncSequenceParameterBufferH264,
/// 图像参数缓冲区
pic_param: VAEncPictureParameterBufferH264,
/// 切片参数缓冲区
slice_param: VAEncSliceParameterBufferH264,
/// 参考帧
reference_frames: [Option<va::SurfaceID>; 2],
/// 当前参考帧索引
current_ref_frame: usize,
/// 序列号
sequence_number: u64,
/// 统计信息
stats: EncoderStats,
/// RTP 时间戳基数
rtp_timestamp_base: u32,
/// 编码计时器
encode_timer: MovingAverage,
}
/// VA-API 编码器错误
#[derive(Debug, thiserror::Error)]
pub enum VaapiError {
#[error("VA 初始化失败: {0}")]
InitializationFailed(String),
#[error("VA 配置失败: {0}")]
ConfigurationFailed(String),
#[error("VA 表面创建失败")]
SurfaceCreationFailed,
#[error("DMA-BUF 导入失败")]
DmaBufImportFailed,
#[error("编码操作失败: {0}")]
EncodeOperationFailed(String),
}
```
**状态机:**
```
[未初始化]
│ initialize()
[已初始化]
│ configure()
[已配置]
│ encode()
┌────────────────┐
│ 编码中 │◀─────────┐
│ (每帧) │ │
└────────────────┘ │
│ │
│ keyframe │ normal frame
▼ │
[编码关键帧] │
│ │
▼ │
[完成] ──────────────┘
│ error
[错误状态]
│ reconfigure()
[已配置]
```
**接口实现:**
```rust
impl VaapiH264Encoder {
/// 创建新的 VA-API 编码器
pub fn new(config: EncoderConfig) -> Result<Self, EncoderError> {
// 打开 VA 显示
let display = va::Display::open(None)
.map_err(|e| EncoderError::InitFailed(format!("无法打开 VA 显示: {:?}", e)))?;
// 创建 VA 配置
let config_attribs = vec![
va::ConfigAttrib {
type_: va::ConfigAttribType::RTFormat,
value: VA_RT_FORMAT_YUV420 as i32,
},
va::ConfigAttrib {
type_: va::ConfigAttribType::RateControl,
value: VA_RC_CBR as i32,
},
va::ConfigAttrib {
type_: va::ConfigAttribType::EncMaxRefFrames,
value: 1, // 最小参考帧
},
];
let config_id = display.create_config(
VAProfileH264ConstrainedBaseline,
VAEntrypointEncSlice,
&config_attribs,
).map_err(|e| EncoderError::InitFailed(format!("创建 VA 配置失败: {:?}", e)))?;
// 创建 VA 上下文
let surfaces = (0..3)
.map(|_| display.create_surface(config.width, config.height, VA_RT_FORMAT_YUV420))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| EncoderError::InitFailed(format!("创建 VA 表面失败: {:?}", e)))?;
let context_id = display.create_context(config_id, surfaces)
.map_err(|e| EncoderError::InitFailed(format!("创建 VA 上下文失败: {:?}", e)))?;
// 设置序列参数
let seq_param = VAEncSequenceParameterBufferH264 {
intra_period: config.keyframe_interval,
ip_period: 1, // 无 B 帧
max_num_ref_frames: 1,
bits_per_second: config.bitrate,
time_scale: 90000,
num_units_in_tick: 90000 / config.frame_rate as u32,
};
// 设置图像参数
let pic_param = VAEncPictureParameterBufferH264 {
reference_frames: [
VAReferenceFrame {
picture_id: surfaces[0],
flags: 0,
},
VAReferenceFrame {
picture_id: surfaces[1],
flags: 0,
},
],
num_ref_idx_l0_active_minus1: 0,
num_ref_idx_l1_active_minus1: 0,
pic_fields: VAPictureH264 {
idr_pic_flag: 0,
reference_pic_flag: 1,
},
};
// 设置切片参数
let slice_param = VAEncSliceParameterBufferH264 {
macroblock_address: 0,
num_macroblocks: (config.width / 16) * (config.height / 16),
slice_type: VAEncSliceType::PSlice,
num_ref_idx_l0_active_minus1: 0,
num_ref_idx_l1_active_minus1: 0,
disable_deblocking_filter_idc: 1, // 更快
};
Ok(Self {
display,
config_id,
context_id,
config,
seq_param,
pic_param,
slice_param,
reference_frames: [None, None],
current_ref_frame: 0,
sequence_number: 0,
stats: EncoderStats::default(),
rtp_timestamp_base: 0,
encode_timer: MovingAverage::new(100),
})
}
}
#[async_trait]
impl VideoEncoder for VaapiH264Encoder {
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
let start = Instant::now();
// 导入 DMA-BUF 到 VA 表面
let surface = unsafe {
self.display.import_dma_buf(
frame.dma_buf.fd,
frame.width,
frame.height,
VA_RT_FORMAT_YUV420,
).map_err(|e| EncoderError::DmaBufImportFailed)?
};
// 确定帧类型
let is_keyframe = self.sequence_number == 0
|| (self.sequence_number % self.config.keyframe_interval as u64 == 0);
// 更新图像参数
if is_keyframe {
self.pic_param.pic_fields.idr_pic_flag = 1;
} else {
self.pic_param.pic_fields.idr_pic_flag = 0;
}
// 编码图像
let encoded_data = self.display.encode_surface(
surface,
&self.seq_param,
&self.pic_param,
&self.slice_param,
).map_err(|e| EncoderError::EncodeFailed(format!("编码失败: {:?}", e)))?;
// 更新统计信息
let latency = start.elapsed().as_secs_f64() * 1000.0;
self.encode_timer.add_sample(latency);
self.stats.avg_encode_latency_ms = self.encode_timer.average();
self.stats.frames_encoded += 1;
self.stats.total_bytes += encoded_data.len() as u64;
if is_keyframe {
self.stats.keyframes += 1;
}
// 计算实际比特率
let elapsed = start.elapsed();
if elapsed.as_secs() > 0 {
self.stats.actual_bitrate =
(self.stats.total_bytes * 8 / elapsed.as_secs() as u64) as u32;
}
// 更新序列号
self.sequence_number += 1;
// 计算 RTP 时间戳
let rtp_timestamp = self.rtp_timestamp_base
+ (self.sequence_number * 90000 / self.config.frame_rate as u64) as u32;
Ok(EncodedFrame {
data: Bytes::from(encoded_data),
is_keyframe,
timestamp: frame.timestamp,
sequence_number: self.sequence_number,
rtp_timestamp,
})
}
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
// 更新序列参数
self.seq_param.intra_period = config.keyframe_interval;
self.seq_param.bits_per_second = config.bitrate;
self.seq_param.num_units_in_tick = 90000 / config.frame_rate as u32;
self.config = config;
Ok(())
}
async fn request_keyframe(&mut self) -> Result<(), EncoderError> {
// 强制下一帧为关键帧
self.pic_param.pic_fields.idr_pic_flag = 1;
Ok(())
}
fn stats(&self) -> EncoderStats {
self.stats.clone()
}
fn capabilities(&self) -> EncoderCapabilities {
EncoderCapabilities {
hardware_accelerated: true,
supports_dma_buf: true,
max_resolution: (4096, 4096),
max_frame_rate: 120,
bitrate_range: (1000000, 50000000),
supports_dynamic_bitrate: true,
}
}
}
```
#### 3.2.3 自适应码率控制器
**设计原理:**
自适应码率控制器根据网络状况和目标延迟动态调整编码比特率,在保证低延迟的同时最大化视频质量。
**数据结构:**
```rust
/// 自适应码率控制器
pub struct AdaptiveBitrateController {
/// 目标延迟 (ms)
target_latency_ms: u32,
/// 当前比特率 (bps)
current_bitrate: u32,
/// 最大比特率 (bps)
max_bitrate: u32,
/// 最小比特率 (bps)
min_bitrate: u32,
/// 帧率
frame_rate: u32,
/// 网络质量监测
network_monitor: NetworkQualityMonitor,
/// 延迟监测
latency_monitor: LatencyMonitor,
/// 比特率调整因子
adjustment_factor: f64,
/// 调整间隔
adjustment_interval: Duration,
/// 上次调整时间
last_adjustment: Instant,
}
/// 网络质量监测器
pub struct NetworkQualityMonitor {
/// 带宽 (bps)
bandwidth: u32,
/// 丢包率 (0-1)
packet_loss_rate: f64,
/// 抖动 (ms)
jitter_ms: u32,
/// 往返时间 (ms)
rtt_ms: u32,
}
/// 延迟监测器
pub struct LatencyMonitor {
/// 测量的延迟值
measurements: VecDeque<LatencyMeasurement>,
/// 平均延迟
avg_latency_ms: f64,
/// P95 延迟
p95_latency_ms: f64,
}
/// 延迟测量
struct LatencyMeasurement {
timestamp: Instant,
latency_ms: u32,
type_: LatencyType,
}
#[derive(Debug, Clone, Copy)]
enum LatencyType {
Capture,
Encode,
Transport,
Total,
}
```
**算法实现:**
```rust
impl AdaptiveBitrateController {
/// 创建新的自适应码率控制器
pub fn new(
target_latency_ms: u32,
initial_bitrate: u32,
min_bitrate: u32,
max_bitrate: u32,
frame_rate: u32,
) -> Self {
Self {
target_latency_ms,
current_bitrate: initial_bitrate,
max_bitrate,
min_bitrate,
frame_rate,
network_monitor: NetworkQualityMonitor::default(),
latency_monitor: LatencyMonitor::new(100),
adjustment_factor: 0.1, // 每次调整 10%
adjustment_interval: Duration::from_millis(500), // 每 500ms 调整一次
last_adjustment: Instant::now(),
}
}
/// 更新延迟测量
pub fn update_latency(&mut self, latency_ms: u32, type_: LatencyType) {
self.latency_monitor.add_measurement(LatencyMeasurement {
timestamp: Instant::now(),
latency_ms,
type_,
});
// 检查是否需要调整码率
self.check_and_adjust();
}
/// 更新网络质量
pub fn update_network_quality(&mut self, bandwidth: u32, packet_loss_rate: f64, jitter_ms: u32, rtt_ms: u32) {
self.network_monitor.bandwidth = bandwidth;
self.network_monitor.packet_loss_rate = packet_loss_rate;
self.network_monitor.jitter_ms = jitter_ms;
self.network_monitor.rtt_ms = rtt_ms;
self.check_and_adjust();
}
/// 检查并调整码率
fn check_and_adjust(&mut self) {
let now = Instant::now();
// 检查调整间隔
if now.duration_since(self.last_adjustment) < self.adjustment_interval {
return;
}
// 计算当前平均延迟
let avg_latency = self.latency_monitor.average_latency();
if avg_latency.is_none() {
return;
}
let avg_latency_ms = avg_latency.unwrap();
// 计算延迟比率
let latency_ratio = avg_latency_ms as f64 / self.target_latency_ms as f64;
// 根据延迟比率调整码率
let new_bitrate = if latency_ratio > 1.5 {
// 延迟过高 - 激进降低码率
self.current_bitrate = (self.current_bitrate as f64 * 0.7) as u32;
} else if latency_ratio > 1.2 {
// 延迟偏高 - 降低码率
self.current_bitrate = (self.current_bitrate as f64 * 0.85) as u32;
} else if latency_ratio < 0.8 && self.network_monitor.packet_loss_rate < 0.01 {
// 延迟较低且丢包率低 - 可以增加码率
self.current_bitrate = (self.current_bitrate as f64 * 1.1) as u32;
}
// 考虑网络带宽限制
if self.current_bitrate > self.network_monitor.bandwidth * 9 / 10 {
self.current_bitrate = self.network_monitor.bandwidth * 9 / 10;
}
// 考虑丢包率
if self.network_monitor.packet_loss_rate > 0.05 {
// 丢包率高,降低码率
self.current_bitrate = (self.current_bitrate as f64 * 0.8) as u32;
}
// 限制在范围内
self.current_bitrate = self.current_bitrate.clamp(self.min_bitrate, self.max_bitrate);
self.last_adjustment = now;
}
/// 获取当前码率
pub fn current_bitrate(&self) -> u32 {
self.current_bitrate
}
/// 获取推荐的编码器配置
pub fn get_encoder_config(&self, base_config: EncoderConfig) -> EncoderConfig {
let mut config = base_config.clone();
config.bitrate = self.current_bitrate;
config.max_bitrate = self.current_bitrate * 11 / 10; // +10%
config.min_bitrate = self.current_bitrate * 9 / 10; // -10%
config
}
}
impl LatencyMonitor {
/// 创建新的延迟监测器
fn new(max_samples: usize) -> Self {
Self {
measurements: VecDeque::with_capacity(max_samples),
avg_latency_ms: 0.0,
p95_latency_ms: 0.0,
}
}
/// 添加延迟测量
fn add_measurement(&mut self, measurement: LatencyMeasurement) {
self.measurements.push_back(measurement);
if self.measurements.len() > self.measurements.capacity() {
self.measurements.pop_front();
}
self.update_statistics();
}
/// 更新统计信息
fn update_statistics(&mut self) {
if self.measurements.is_empty() {
return;
}
let total: u32 = self.measurements.iter().map(|m| m.latency_ms).sum();
self.avg_latency_ms = total as f64 / self.measurements.len() as f64;
// 计算 P95
let mut sorted: Vec<u32> = self.measurements.iter().map(|m| m.latency_ms).collect();
sorted.sort();
let p95_index = (sorted.len() * 95 / 100).min(sorted.len() - 1);
self.p95_latency_ms = sorted[p95_index] as f64;
}
/// 获取平均延迟
fn average_latency(&self) -> Option<f64> {
if self.measurements.is_empty() {
None
} else {
Some(self.avg_latency_ms)
}
}
/// 获取 P95 延迟
fn p95_latency(&self) -> Option<f64> {
if self.measurements.is_empty() {
None
} else {
Some(self.p95_latency_ms)
}
}
}
```
### 3.3 WebRTC 传输模块
#### 3.3.1 对等连接管理器
**设计原理:**
对等连接管理器负责创建和管理 WebRTC 对等连接,处理 SDP 交换、ICE 候选、媒体轨道和数据通道。
**数据结构:**
```rust
/// 对等连接管理器
pub struct PeerConnectionManager {
/// WebRTC API
api: Arc<webrtc::API>,
/// 所有对等连接
peer_connections: Arc<RwLock<HashMap<String, PeerConnection>>>,
/// 配置
config: WebRtcConfig,
/// 统计信息
stats: ConnectionStats,
}
/// 对等连接
pub struct PeerConnection {
/// WebRTC 对等连接
pc: RTCPeerConnection,
/// 视频轨道
video_track: Arc<TrackLocalStaticSample>,
/// 数据通道
data_channel: Option<Arc<RTCDataChannel>>,
/// 连接状态
state: RTCPeerConnectionState,
/// 会话 ID
session_id: String,
}
/// WebRTC 配置
#[derive(Debug, Clone)]
pub struct WebRtcConfig {
/// ICE 服务器
pub ice_servers: Vec<IceServer>,
/// ICE 传输策略
pub ice_transport_policy: IceTransportPolicy,
/// 视频编解码器
pub video_codecs: Vec<VideoCodecConfig>,
/// 音频编解码器
pub audio_codecs: Vec<AudioCodecConfig>,
/// 最大比特率 (bps)
pub max_bitrate: u32,
/// 最小比特率 (bps)
pub min_bitrate: u32,
/// 起始比特率 (bps)
pub start_bitrate: u32,
}
/// ICE 服务器
#[derive(Debug, Clone)]
pub struct IceServer {
pub urls: Vec<String>,
pub username: Option<String>,
pub credential: Option<String>,
}
/// 视频编解码器配置
#[derive(Debug, Clone)]
pub struct VideoCodecConfig {
pub name: String,
pub clock_rate: u32,
pub num_channels: u16,
pub parameters: CodecParameters,
}
/// 编解码器参数
#[derive(Debug, Clone)]
pub struct CodecParameters {
pub profile_level_id: String,
pub packetization_mode: u8,
pub level_asymmetry_allowed: bool,
}
/// 连接统计信息
#[derive(Debug, Clone, Default)]
pub struct ConnectionStats {
/// 总连接数
pub total_connections: u64,
/// 活跃连接数
pub active_connections: u64,
/// 总发送字节数
pub total_bytes_sent: u64,
/// 总接收字节数
pub total_bytes_received: u64,
/// 总丢包数
pub total_packets_lost: u64,
}
```
**状态机:**
```
[新建]
│ createOffer()
[检查本地状态]
│ setLocalDescription()
[本地描述已设置]
│ 等待远程描述
│ setRemoteDescription()
[远程描述已设置]
│ ICE 收集
[ICE 收集中]
│ ICE 连接完成
[已连接]
│ 传输中
[活跃传输中]
│ 断开/错误
[已断开]
```
**接口实现:**
```rust
impl PeerConnectionManager {
/// 创建新的对等连接管理器
pub fn new(config: WebRtcConfig) -> Result<Self, WebRtcError> {
let mut setting_engine = webrtc::api::SettingEngine::default();
// 配置 ICE 服务器
let ice_servers: Vec<RTCIceServer> = config.ice_servers.iter()
.map(|s| RTCIceServer {
urls: s.urls.clone(),
username: s.username.clone().unwrap_or_default(),
credential: s.credential.clone().unwrap_or_default(),
..Default::default()
})
.collect();
// 创建 WebRTC API
let api = Arc::new(
webrtc::api::APIBuilder::new()
.with_setting_engine(setting_engine)
.build()
);
Ok(Self {
api,
peer_connections: Arc::new(RwLock::new(HashMap::new())),
config,
stats: ConnectionStats::default(),
})
}
/// 创建新的对等连接
pub async fn create_peer_connection(
&self,
session_id: String,
) -> Result<String, WebRtcError> {
// 配置 RTC
let rtc_config = RTCConfiguration {
ice_servers: self.config.ice_servers.iter()
.map(|s| RTCIceServer {
urls: s.urls.clone(),
username: s.username.clone().unwrap_or_default(),
credential: s.credential.clone().unwrap_or_default(),
..Default::default()
})
.collect(),
ice_transport_policy: self.config.ice_transport_policy,
..Default::default()
};
// 创建对等连接
let pc = self.api.new_peer_connection(rtc_config).await?;
// 创建视频轨道
let video_track = Arc::new(
TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: "video/H264".to_string(),
clock_rate: 90000,
channels: 1,
sdp_fmtp_line: "profile-level-id=42e01f;packetization-mode=1".to_string(),
..Default::default()
},
"video".to_string(),
)
);
// 添加视频轨道
let rtp_transceiver = pc.add_track(Arc::clone(&video_track)).await?;
// 设置 ICE 候选处理
let pc_clone = pc.clone();
pc.on_ice_candidate(Box::new(move |candidate| {
let pc_clone = pc_clone.clone();
Box::pin(async move {
if let Some(candidate) = candidate {
debug!("ICE 候选: {:?}", candidate);
// 发送候选到信令服务器
}
})
})).await;
// 设置连接状态变化
let session_id_clone = session_id.clone();
pc.on_peer_connection_state_change(Box::new(move |state| {
debug!("对等连接 {} 状态变化: {:?}", session_id_clone, state);
Box::pin(async move {})
})).await;
// 存储对等连接
let peer_connection = PeerConnection::new(session_id.clone(), pc, video_track);
self.peer_connections.write().await.insert(session_id.clone(), peer_connection);
self.stats.total_connections += 1;
Ok(session_id)
}
/// 创建 Offer
pub async fn create_offer(&self, session_id: &str) -> Result<RTCSessionDescription, WebRtcError> {
let peer_connections = self.peer_connections.read().await;
let peer_connection = peer_connections.get(session_id)
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
peer_connection.create_offer().await
}
/// 设置远程描述
pub async fn set_remote_description(
&self,
session_id: &str,
desc: RTCSessionDescription,
) -> Result<(), WebRtcError> {
let peer_connections = self.peer_connections.read().await;
let peer_connection = peer_connections.get(session_id)
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
peer_connection.set_remote_description(desc).await
}
/// 发送视频帧
pub async fn send_video_frame(
&self,
session_id: &str,
frame: EncodedFrame,
) -> Result<(), WebRtcError> {
let peer_connections = self.peer_connections.read().await;
let peer_connection = peer_connections.get(session_id)
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
peer_connection.write_sample(frame).await?;
self.stats.total_bytes_sent += frame.data.len() as u64;
Ok(())
}
/// 关闭对等连接
pub async fn close_peer_connection(&self, session_id: &str) -> Result<(), WebRtcError> {
let mut peer_connections = self.peer_connections.write().await;
if let Some(peer_connection) = peer_connections.remove(session_id) {
peer_connection.close().await;
self.stats.active_connections -= 1;
}
Ok(())
}
/// 获取统计信息
pub fn stats(&self) -> &ConnectionStats {
&self.stats
}
}
impl PeerConnection {
/// 创建新的对等连接
fn new(
session_id: String,
pc: RTCPeerConnection,
video_track: Arc<TrackLocalStaticSample>,
) -> Self {
Self {
pc,
video_track,
data_channel: None,
state: RTCPeerConnectionState::New,
session_id,
}
}
/// 创建 Offer
async fn create_offer(&self) -> Result<RTCSessionDescription, WebRtcError> {
let offer = self.pc.create_offer(None).await?;
self.pc.set_local_description(offer.clone()).await?;
Ok(offer)
}
/// 创建 Answer
async fn create_answer(&self) -> Result<RTCSessionDescription, WebRtcError> {
let answer = self.pc.create_answer(None).await?;
self.pc.set_local_description(answer.clone()).await?;
Ok(answer)
}
/// 设置远程描述
async fn set_remote_description(&self, desc: RTCSessionDescription) -> Result<(), WebRtcError> {
self.pc.set_remote_description(desc).await
}
/// 添加 ICE 候选
async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<(), WebRtcError> {
self.pc.add_ice_candidate(candidate).await
}
/// 写入视频样本
async fn write_sample(&self, frame: EncodedFrame) -> Result<(), WebRtcError> {
let sample = Sample {
data: frame.data.to_vec(),
duration: Duration::from_millis(1000 / 60), // 假设 60 FPS
..Default::default()
};
self.video_track.write_sample(&sample).await
}
/// 关闭连接
async fn close(self) {
self.pc.close().await;
}
}
```
#### 3.3.2 RTP 打包器
**设计原理:**
RTP 打包器将编码后的视频帧分片为符合 MTU 限制的 RTP 数据包,并添加必要的 RTP 头部信息。
**数据结构:**
```rust
/// RTP 打包器
pub struct RtpPacketizer {
/// 最大载荷大小
max_payload_size: usize,
/// 序列号
sequence_number: u16,
/// 时间戳基数
timestamp_base: u32,
/// SSRC
ssrc: u32,
/// 统计信息
stats: PacketizerStats,
}
/// RTP 数据包
#[derive(Debug, Clone)]
pub struct RtpPacket {
/// RTP 头部
pub header: RtpHeader,
/// 载荷
pub payload: Bytes,
}
/// RTP 头部
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct RtpHeader {
/// 版本 (2 bits) | 填充 (1 bit) | 扩展 (1 bit) | CSRC 计数 (4 bits)
pub v_p_x_cc: u8,
/// 标记 (1 bit) | 载荷类型 (7 bits)
pub m_pt: u8,
/// 序列号
pub sequence_number: u16,
/// 时间戳
pub timestamp: u32,
/// SSRC
pub ssrc: u32,
/// CSRC 列表
pub csrc: [u32; 15],
}
/// 打包器统计信息
#[derive(Debug, Clone, Default)]
pub struct PacketizerStats {
/// 总数据包数
pub total_packets: u64,
/// 总字节数
pub total_bytes: u64,
/// 最大数据包大小
pub max_packet_size: usize,
/// 最小数据包大小
pub min_packet_size: usize,
}
```
**打包算法:**
```rust
impl RtpPacketizer {
/// 创建新的 RTP 打包器
pub fn new(
max_payload_size: usize,
ssrc: u32,
timestamp_base: u32,
) -> Self {
Self {
max_payload_size: max_payload_size.min(1200), // 标准 MTU
sequence_number: rand::random(),
timestamp_base,
ssrc,
stats: PacketizerStats::default(),
}
}
/// 将编码帧打包为 RTP 数据包
pub fn packetize(&mut self, frame: &EncodedFrame) -> Result<Vec<RtpPacket>, PacketizationError> {
let mut packets = Vec::new();
// 计算需要的包数
let data_len = frame.data.len();
let num_packets = (data_len + self.max_payload_size - 1) / self.max_payload_size;
// 标记位设置
// 对于关键帧,设置 M 位
let marker = if frame.is_keyframe { 0x80 } else { 0x00 };
// 分片数据
for i in 0..num_packets {
let offset = i * self.max_payload_size;
let len = self.max_payload_size.min(data_len - offset);
let payload = frame.data.slice(offset..offset + len);
// 创建 RTP 头部
let header = RtpHeader {
v_p_x_cc: 0x80, // Version 2, no padding, no extension, no CSRC
m_pt: marker | 96, // Payload type 96 for H.264
sequence_number: self.sequence_number,
timestamp: frame.rtp_timestamp,
ssrc: self.ssrc,
csrc: [0; 15],
};
// 创建 RTP 数据包
let packet = RtpPacket {
header,
payload,
};
packets.push(packet);
// 更新序列号
self.sequence_number = self.sequence_number.wrapping_add(1);
// 更新统计信息
self.stats.total_packets += 1;
self.stats.total_bytes += packet.payload.len();
self.stats.max_packet_size = self.stats.max_packet_size.max(packet.payload.len());
self.stats.min_packet_size = if self.stats.min_packet_size == 0 {
packet.payload.len()
} else {
self.stats.min_packet_size.min(packet.payload.len())
};
}
Ok(packets)
}
/// 获取序列号
pub fn sequence_number(&self) -> u16 {
self.sequence_number
}
/// 重置序列号
pub fn reset_sequence_number(&mut self) {
self.sequence_number = rand::random();
}
/// 获取统计信息
pub fn stats(&self) -> &PacketizerStats {
&self.stats
}
}
impl RtpHeader {
/// 序列化头部为字节
pub fn to_bytes(&self) -> [u8; 12] {
let mut bytes = [0u8; 12];
bytes[0] = self.v_p_x_cc;
bytes[1] = self.m_pt;
bytes[2..4].copy_from_slice(&self.sequence_number.to_be_bytes());
bytes[4..8].copy_from_slice(&self.timestamp.to_be_bytes());
bytes[8..12].copy_from_slice(&self.ssrc.to_be_bytes());
bytes
}
/// 从字节解析头部
pub fn from_bytes(bytes: &[u8]) -> Result<Self, PacketizationError> {
if bytes.len() < 12 {
return Err(PacketizationError::InvalidHeaderLength);
}
Ok(Self {
v_p_x_cc: bytes[0],
m_pt: bytes[1],
sequence_number: u16::from_be_bytes([bytes[2], bytes[3]]),
timestamp: u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
ssrc: u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
csrc: [0; 15],
})
}
}
/// 打包错误
#[derive(Debug, thiserror::Error)]
pub enum PacketizationError {
#[error("无效的头部长度")]
InvalidHeaderLength,
#[error("载荷太大")]
PayloadTooLarge,
#[error("序列化失败")]
SerializationFailed,
}
```
### 3.4 缓冲管理模块
#### 3.4.1 DMA-BUF 池
**设计原理:**
DMA-BUF 池管理 GPU 内存缓冲区的分配和回收,减少频繁分配带来的开销,并防止内存泄漏。
**数据结构:**
```rust
/// DMA-BUF 池
pub struct DmaBufPool {
/// 空闲缓冲区
idle_buffers: VecDeque<DmaBufHandle>,
/// 已分配缓冲区
allocated_buffers: HashMap<usize, DmaBufHandle>,
/// 缓冲区配置
config: PoolConfig,
/// 分配计数器
allocation_counter: usize,
/// 统计信息
stats: PoolStats,
}
/// 缓冲区池配置
pub struct PoolConfig {
/// 最大缓冲区数
pub max_buffers: usize,
/// 最小缓冲区数
pub min_buffers: usize,
/// 预分配缓冲区数
pub preallocated_buffers: usize,
/// 缓冲区大小
pub buffer_size: usize,
/// 跨距
pub stride: u32,
}
/// 池统计信息
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
/// 总分配数
pub total_allocations: u64,
/// 总释放数
pub total_frees: u64,
/// 当前使用数
pub current_usage: usize,
/// 峰值使用数
pub peak_usage: usize,
/// 分配失败数
pub allocation_failures: u64,
}
```
**池管理算法:**
```rust
impl DmaBufPool {
/// 创建新的 DMA-BUF 池
pub fn new(config: PoolConfig) -> Result<Self, PoolError> {
let mut pool = Self {
idle_buffers: VecDeque::with_capacity(config.max_buffers),
allocated_buffers: HashMap::new(),
config,
allocation_counter: 0,
stats: PoolStats::default(),
};
// 预分配缓冲区
for _ in 0..config.preallocated_buffers.min(config.max_buffers) {
let buffer = pool.allocate_buffer()?;
pool.idle_buffers.push_back(buffer);
}
Ok(pool)
}
/// 分配缓冲区
pub fn acquire(&mut self) -> Result<DmaBufHandle, PoolError> {
// 尝试从空闲队列获取
if let Some(buffer) = self.idle_buffers.pop_front() {
self.allocation_counter += 1;
self.stats.current_usage += 1;
self.stats.peak_usage = self.stats.peak_usage.max(self.stats.current_usage);
let id = self.allocation_counter;
self.allocated_buffers.insert(id, buffer);
// 返回带 ID 的句柄
let mut handle = self.allocated_buffers.get(&id).unwrap().clone();
handle.id = Some(id);
return Ok(handle);
}
// 检查是否达到最大限制
if self.idle_buffers.len() + self.allocated_buffers.len() >= self.config.max_buffers {
self.stats.allocation_failures += 1;
return Err(PoolError::PoolExhausted);
}
// 分配新缓冲区
let buffer = self.allocate_buffer()?;
self.allocation_counter += 1;
self.stats.current_usage += 1;
self.stats.peak_usage = self.stats.peak_usage.max(self.stats.current_usage);
self.stats.total_allocations += 1;
let id = self.allocation_counter;
self.allocated_buffers.insert(id, buffer.clone());
buffer.id = Some(id);
Ok(buffer)
}
/// 释放缓冲区
pub fn release(&mut self, handle: DmaBufHandle) -> Result<(), PoolError> {
let id = handle.id.ok_or(PoolError::InvalidHandle)?;
if let Some(mut buffer) = self.allocated_buffers.remove(&id) {
// 重置句柄 ID
buffer.id = None;
// 返回到空闲队列
self.idle_buffers.push_back(buffer);
self.stats.current_usage -= 1;
self.stats.total_frees += 1;
Ok(())
} else {
Err(PoolError::InvalidHandle)
}
}
/// 分配新的 DMA-BUF
fn allocate_buffer(&self) -> Result<DmaBufHandle, PoolError> {
// 创建 DMA-BUF
// 这需要调用 DRM 或 VA-API 来分配 GPU 内存
// 这里简化实现
Ok(DmaBufHandle {
fd: -1, // 需要实际分配
size: self.config.buffer_size,
stride: self.config.stride,
offset: 0,
id: None,
})
}
/// 获取统计信息
pub fn stats(&self) -> &PoolStats {
&self.stats
}
/// 清理池
pub fn cleanup(&mut self) {
// 释放所有缓冲区
self.idle_buffers.clear();
self.allocated_buffers.clear();
}
}
/// 池错误
#[derive(Debug, thiserror::Error)]
pub enum PoolError {
#[error("池已耗尽")]
PoolExhausted,
#[error("无效的句柄")]
InvalidHandle,
#[error("分配失败: {0}")]
AllocationFailed(String),
}
/// DMA-BUF 句柄
#[derive(Debug, Clone)]
pub struct DmaBufHandle {
pub fd: RawFd,
pub size: usize,
pub stride: u32,
pub offset: u32,
pub id: Option<usize>, // 池 ID
}
```
#### 3.4.2 内存泄漏防护
**设计原理:**
通过引用计数、所有权跟踪和定期检查,防止内存泄漏。
**数据结构:**
```rust
/// 内存跟踪器
pub struct MemoryTracker {
/// 跟踪的分配
allocations: HashMap<usize, AllocationInfo>,
/// 分配计数器
counter: usize,
/// 泄漏检测阈值
leak_threshold: Duration,
/// 最后检查时间
last_check: Instant,
}
/// 分配信息
struct AllocationInfo {
/// 类型
allocation_type: AllocationType,
/// 大小
size: usize,
/// 分配时间
allocated_at: Instant,
/// 分配位置
location: &'static str,
}
/// 分配类型
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AllocationType {
DmaBuf,
EncodedBuffer,
SharedMemory,
}
```
**实现:**
```rust
impl MemoryTracker {
/// 创建新的内存跟踪器
pub fn new(leak_threshold: Duration) -> Self {
Self {
allocations: HashMap::new(),
counter: 0,
leak_threshold,
last_check: Instant::now(),
}
}
/// 跟踪分配
pub fn track_allocation(
&mut self,
allocation_type: AllocationType,
size: usize,
location: &'static str,
) -> usize {
let id = self.counter;
self.counter += 1;
self.allocations.insert(id, AllocationInfo {
allocation_type,
size,
allocated_at: Instant::now(),
location,
});
id
}
/// 跟踪释放
pub fn track_free(&mut self, id: usize) -> Result<(), MemoryError> {
self.allocations.remove(&id)
.ok_or(MemoryError::InvalidAllocationId(id))?;
Ok(())
}
/// 检查泄漏
pub fn check_leaks(&mut self) -> Vec<LeakInfo> {
let now = Instant::now();
let mut leaks = Vec::new();
for (id, info) in &self.allocations {
let age = now.duration_since(info.allocated_at);
if age > self.leak_threshold {
leaks.push(LeakInfo {
id: *id,
allocation_type: info.allocation_type,
size: info.size,
age,
location: info.location,
});
}
}
leaks
}
/// 获取统计信息
pub fn stats(&self) -> MemoryTrackerStats {
let total_size: usize = self.allocations.values()
.map(|info| info.size)
.sum();
let by_type: HashMap<AllocationType, usize> = self.allocations.values()
.fold(HashMap::new(), |mut acc, info| {
*acc.entry(info.allocation_type).or_insert(0) += 1;
acc
});
MemoryTrackerStats {
total_allocations: self.allocations.len(),
total_size,
by_type,
}
}
}
/// 泄漏信息
#[derive(Debug)]
pub struct LeakInfo {
pub id: usize,
pub allocation_type: AllocationType,
pub size: usize,
pub age: Duration,
pub location: &'static str,
}
/// 内存跟踪器统计
#[derive(Debug)]
pub struct MemoryTrackerStats {
pub total_allocations: usize,
pub total_size: usize,
pub by_type: HashMap<AllocationType, usize>,
}
/// 内存错误
#[derive(Debug, thiserror::Error)]
pub enum MemoryError {
#[error("无效的分配 ID: {0}")]
InvalidAllocationId(usize),
}
```
### 3.5 信令模块
#### 3.5.1 WebSocket 服务器
**设计原理:**
WebSocket 服务器处理客户端连接,交换 SDP 和 ICE 候选,协调会话建立。
**数据结构:**
```rust
/// WebSocket 信令服务器
pub struct SignalingServer {
/// WebSocket 监听器
listener: WebSocketListener,
/// 活跃连接
connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
/// 会话管理器
session_manager: SessionManager,
/// 配置
config: SignalingConfig,
}
/// 连接
pub struct Connection {
/// 连接 ID
id: ConnectionId,
/// WebSocket 写入端
write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
/// 会话 ID
session_id: Option<String>,
}
/// 信令消息
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SignalingMessage {
#[serde(rename = "offer")]
Offer {
sdp: String,
session_id: String,
},
#[serde(rename = "answer")]
Answer {
sdp: String,
session_id: String,
},
#[serde(rename = "ice-candidate")]
IceCandidate {
candidate: String,
sdp_mid: String,
sdp_mline_index: u16,
session_id: String,
},
#[serde(rename = "error")]
Error {
message: String,
},
}
```
**实现:**
```rust
impl SignalingServer {
/// 创建新的信令服务器
pub async fn new(config: SignalingConfig) -> Result<Self, SignalingError> {
let listener = TcpListener::bind(&config.bind_addr).await?;
let connections = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
listener,
connections,
session_manager: SessionManager::new(),
config,
})
}
/// 启动服务器
pub async fn run(&self) -> Result<(), SignalingError> {
info!("信令服务器监听于 {}", self.config.bind_addr);
while let Ok((stream, addr)) = self.listener.accept().await {
info!("新连接来自 {}", addr);
let connections = self.connections.clone();
let session_manager = self.session_manager.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, connections, session_manager).await {
error!("连接处理错误: {:?}", e);
}
});
}
Ok(())
}
/// 处理连接
async fn handle_connection(
stream: TcpStream,
connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
session_manager: SessionManager,
) -> Result<(), SignalingError> {
let ws_stream = accept_async(stream).await?;
let (write, mut read) = ws_stream.split();
let connection_id = ConnectionId::new();
let connection = Connection {
id: connection_id,
write,
session_id: None,
};
// 存储连接
connections.write().await.insert(connection_id, connection.clone());
// 处理消息
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Err(e) = Self::handle_message(
text,
&connection_id,
&connections,
&session_manager,
).await {
error!("消息处理错误: {:?}", e);
}
}
Ok(Message::Close(_)) => {
info!("连接 {} 关闭", connection_id);
break;
}
Err(e) => {
error!("WebSocket 错误: {:?}", e);
break;
}
_ => {}
}
}
// 清理连接
connections.write().await.remove(&connection_id);
Ok(())
}
/// 处理消息
async fn handle_message(
text: String,
connection_id: &ConnectionId,
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
session_manager: &SessionManager,
) -> Result<(), SignalingError> {
// 解析消息
let msg: SignalingMessage = serde_json::from_str(&text)
.map_err(|e| SignalingError::InvalidMessage(format!("{:?}", e)))?;
match msg {
SignalingMessage::Offer { sdp, session_id } => {
// 处理 Offer
Self::handle_offer(sdp, session_id, connection_id, connections, session_manager).await?;
}
SignalingMessage::Answer { sdp, session_id } => {
// 处理 Answer
Self::handle_answer(sdp, session_id, connection_id, connections, session_manager).await?;
}
SignalingMessage::IceCandidate { candidate, sdp_mid, sdp_mline_index, session_id } => {
// 处理 ICE 候选
Self::handle_ice_candidate(candidate, sdp_mid, sdp_mline_index, session_id, connections).await?;
}
_ => {}
}
Ok(())
}
/// 处理 Offer
async fn handle_offer(
sdp: String,
session_id: String,
connection_id: &ConnectionId,
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
session_manager: &SessionManager,
) -> Result<(), SignalingError> {
// 创建会话
session_manager.create_session(session_id.clone())?;
// 更新连接的会话 ID
{
let mut connections_guard = connections.write().await;
if let Some(conn) = connections_guard.get_mut(connection_id) {
conn.session_id = Some(session_id.clone());
}
}
// 调用后端创建 Answer
let answer = session_manager.create_answer(session_id.clone(), sdp).await?;
// 发送 Answer
let response = SignalingMessage::Answer {
sdp: answer,
session_id,
};
let mut connections_guard = connections.write().await;
if let Some(conn) = connections_guard.get_mut(connection_id) {
let json = serde_json::to_string(&response)?;
conn.write.send(Message::Text(json)).await?;
}
Ok(())
}
/// 处理 Answer
async fn handle_answer(
sdp: String,
session_id: String,
connection_id: &ConnectionId,
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
session_manager: &SessionManager,
) -> Result<(), SignalingError> {
// 设置远程描述
session_manager.set_remote_description(session_id, sdp).await?;
Ok(())
}
/// 处理 ICE 候选
async fn handle_ice_candidate(
candidate: String,
sdp_mid: String,
sdp_mline_index: u16,
session_id: String,
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
) -> Result<(), SignalingError> {
// 转发 ICE 候选到后端
// ...
Ok(())
}
/// 广播消息
pub async fn broadcast(&self, msg: SignalingMessage, session_id: &str) -> Result<(), SignalingError> {
let json = serde_json::to_string(&msg)?;
let connections = self.connections.read().await;
for connection in connections.values() {
if connection.session_id.as_deref() == Some(session_id) {
connection.write.send(Message::Text(json.clone())).await?;
}
}
Ok(())
}
}
```
---
由于文档篇幅较长,我将继续在下一部分完成剩余章节...
**(待续)**
## 4. 数据结构设计
### 4.1 核心数据结构
#### 4.1.1 帧相关结构
```rust
/// 捕获的帧
#[derive(Debug, Clone)]
pub struct CapturedFrame {
/// DMA-BUF 句柄
pub dma_buf: DmaBufHandle,
/// 宽度
pub width: u32,
/// 高度
pub height: u32,
/// 像素格式
pub format: PixelFormat,
/// 时间戳 (ns)
pub timestamp: u64,
/// 帧编号
pub frame_number: u64,
/// 损坏区域
pub damaged_regions: Vec<ScreenRegion>,
}
/// 编码后的帧
#[derive(Debug, Clone)]
pub struct EncodedFrame {
/// 编码数据 (零拷贝 Bytes)
pub data: Bytes,
/// 是否为关键帧
pub is_keyframe: bool,
/// 时间戳 (ns)
pub timestamp: u64,
/// 序列号
pub sequence_number: u64,
/// RTP 时间戳
pub rtp_timestamp: u32,
/// 帧类型
pub frame_type: FrameType,
/// 编码参数
pub encoding_params: EncodingParams,
}
/// 帧类型
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameType {
I, // I 帧 (关键帧)
P, // P 帧 (预测帧)
B, // B 帧 (双向预测帧)
}
/// 编码参数
#[derive(Debug, Clone)]
pub struct EncodingParams {
/// 使用的比特率
pub bitrate: u32,
/// 量化参数
pub qp: u8,
/// 编码延迟 (ms)
pub encode_latency_ms: f64,
}
/// 像素格式
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PixelFormat {
RGBA,
RGB,
YUV420,
YUV422,
YUV444,
NV12,
}
impl PixelFormat {
/// 获取每个像素的字节数
pub fn bytes_per_pixel(&self) -> u32 {
match self {
PixelFormat::RGBA => 4,
PixelFormat::RGB => 3,
PixelFormat::YUV420 => 3 / 2,
PixelFormat::YUV422 => 2,
PixelFormat::YUV444 => 3,
PixelFormat::NV12 => 3 / 2,
}
}
}
```
#### 4.1.2 会话相关结构
```rust
/// 会话信息
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// 会话 ID
pub session_id: String,
/// 客户端 ID
pub client_id: String,
/// 会话状态
pub state: SessionState,
/// 创建时间
pub created_at: Instant,
/// 最后活动时间
pub last_activity: Instant,
/// 视频配置
pub video_config: VideoConfig,
/// 网络配置
pub network_config: NetworkConfig,
/// 统计信息
pub stats: SessionStats,
}
/// 会话状态
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionState {
Initializing,
Connecting,
Connected,
Streaming,
Paused,
Disconnected,
Error,
}
/// 视频配置
#[derive(Debug, Clone)]
pub struct VideoConfig {
/// 编码器类型
pub encoder_type: EncoderType,
/// 分辨率
pub resolution: (u32, u32),
/// 帧率
pub frame_rate: u32,
/// 目标比特率
pub target_bitrate: u32,
/// 关键帧间隔
pub keyframe_interval: u32,
}
/// 网络配置
#[derive(Debug, Clone)]
pub struct NetworkConfig {
/// ICE 服务器
pub ice_servers: Vec<IceServer>,
/// 最大比特率
pub max_bitrate: u32,
/// 最小比特率
pub min_bitrate: u32,
/// 拥塞控制策略
pub congestion_control: CongestionControlStrategy,
}
/// 拥塞控制策略
#[derive(Debug, Clone, Copy)]
pub enum CongestionControlStrategy {
// Google Congestion Control
GCC,
// Transport Wide Congestion Control
TWCC,
// Fixed Rate
Fixed,
}
/// 会话统计信息
#[derive(Debug, Clone, Default)]
pub struct SessionStats {
/// 总帧数
pub total_frames: u64,
/// 关键帧数
pub keyframes: u64,
/// 丢帧数
pub dropped_frames: u64,
/// 总发送字节数
pub total_bytes_sent: u64,
/// 实际比特率
pub actual_bitrate: u32,
/// 平均延迟 (ms)
pub avg_latency_ms: f64,
/// P99 延迟 (ms)
pub p99_latency_ms: f64,
/// 丢包率
pub packet_loss_rate: f64,
}
```
### 4.2 内存管理
#### 4.2.1 内存池策略
**设计原则:**
1. 预分配策略:启动时预分配大部分缓冲区
2. 对象池模式:重用对象避免频繁分配
3. 所有权管理:使用 Rust 类型系统确保安全
4. 引用计数Bytes 和 Arc 管理共享数据
**池类型:**
```rust
/// 通用对象池
pub struct ObjectPool<T> {
idle: VecDeque<T>,
factory: fn() -> T,
resetter: fn(&mut T),
max_size: usize,
min_size: usize,
}
impl<T> ObjectPool<T> {
pub fn new(
factory: fn() -> T,
resetter: fn(&mut T),
min_size: usize,
max_size: usize,
) -> Self {
let mut pool = Self {
idle: VecDeque::with_capacity(max_size),
factory,
resetter,
max_size,
min_size,
};
// 预分配到最小大小
for _ in 0..min_size {
pool.idle.push_back((pool.factory)());
}
pool
}
pub fn acquire(&mut self) -> T {
self.idle.pop_front()
.unwrap_or_else(|| (self.factory)())
}
pub fn release(&mut self, mut item: T) {
if self.idle.len() < self.max_size {
(self.resetter)(&mut item);
self.idle.push_back(item);
}
}
}
```
#### 4.2.2 内存监控
```rust
/// 内存监控器
pub struct MemoryMonitor {
/// 分配跟踪
allocations: HashMap<usize, AllocationRecord>,
/// 统计信息
stats: MemoryStats,
/// 告警阈值
thresholds: MemoryThresholds,
}
/// 分配记录
struct AllocationRecord {
size: usize,
type_: AllocationType,
location: &'static str,
allocated_at: Instant,
}
/// 内存统计
#[derive(Debug, Clone)]
pub struct MemoryStats {
/// 总分配字节数
pub total_allocated_bytes: u64,
/// 当前使用字节数
pub current_used_bytes: u64,
/// 峰值使用字节数
pub peak_used_bytes: u64,
/// 分配次数
pub allocation_count: u64,
/// 释放次数
pub free_count: u64,
}
impl MemoryMonitor {
pub fn new(thresholds: MemoryThresholds) -> Self {
Self {
allocations: HashMap::new(),
stats: MemoryStats::default(),
thresholds,
}
}
pub fn track_allocation(
&mut self,
size: usize,
type_: AllocationType,
location: &'static str,
) {
let id = self.stats.allocation_count as usize;
self.stats.allocation_count += 1;
self.stats.total_allocated_bytes += size as u64;
self.stats.current_used_bytes += size as u64;
if self.stats.current_used_bytes > self.stats.peak_used_bytes {
self.stats.peak_used_bytes = self.stats.current_used_bytes;
}
self.allocations.insert(id, AllocationRecord {
size,
type_,
location,
allocated_at: Instant::now(),
});
// 检查阈值
self.check_thresholds();
}
pub fn track_free(&mut self, size: usize) {
self.stats.free_count += 1;
self.stats.current_used_bytes -= size as u64;
}
fn check_thresholds(&self) {
if self.stats.current_used_bytes > self.thresholds.warning_bytes {
warn!("内存使用超过警告阈值: {} / {}",
self.stats.current_used_bytes,
self.thresholds.warning_bytes);
}
if self.stats.current_used_bytes > self.thresholds.critical_bytes {
error!("内存使用超过临界阈值: {} / {}",
self.stats.current_used_bytes,
self.thresholds.critical_bytes);
}
}
pub fn stats(&self) -> &MemoryStats {
&self.stats
}
}
```
### 4.3 状态管理
#### 4.3.1 状态机实现
```rust
/// 状态机 trait
pub trait StateMachine: Send + Sync {
type State;
type Event;
type Error;
/// 处理事件,返回新状态
fn handle_event(&self, state: Self::State, event: Self::Event)
-> Result<Self::State, Self::Error>;
/// 检查状态是否有效
fn is_valid(&self, state: &Self::State) -> bool;
/// 获取允许的转换
fn allowed_transitions(&self, state: &Self::State) -> Vec<Self::State>;
}
/// 捕获器状态机
pub struct CaptureStateMachine;
impl StateMachine for CaptureStateMachine {
type State = CaptureState;
type Event = CaptureEvent;
type Error = CaptureError;
fn handle_event(&self, state: Self::State, event: Self::Event)
-> Result<Self::State, Self::Error> {
match (state, event) {
(CaptureState::Idle, CaptureEvent::Start) => {
Ok(CaptureState::Connecting)
}
(CaptureState::Connecting, CaptureEvent::Connected) => {
Ok(CaptureState::Streaming)
}
(CaptureState::Streaming, CaptureEvent::Stop) => {
Ok(CaptureState::Stopping)
}
(CaptureState::Stopping, CaptureEvent::Stopped) => {
Ok(CaptureState::Idle)
}
(state, CaptureEvent::Error) => {
Ok(CaptureState::Error(state))
}
_ => Err(CaptureError::InvalidTransition),
}
}
fn is_valid(&self, state: &Self::State) -> bool {
!matches!(state, CaptureState::Error(_))
}
fn allowed_transitions(&self, state: &Self::State) -> Vec<Self::State> {
match state {
CaptureState::Idle => vec![
CaptureState::Connecting,
],
CaptureState::Connecting => vec![
CaptureState::Streaming,
CaptureState::Error(CaptureState::Connecting),
],
CaptureState::Streaming => vec![
CaptureState::Stopping,
CaptureState::Error(CaptureState::Streaming),
],
CaptureState::Stopping => vec![
CaptureState::Idle,
CaptureState::Error(CaptureState::Stopping),
],
CaptureState::Error(_) => vec![
CaptureState::Idle,
],
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CaptureState {
Idle,
Connecting,
Streaming,
Stopping,
Error(CaptureState),
}
#[derive(Debug)]
pub enum CaptureEvent {
Start,
Connected,
Stop,
Stopped,
Error,
}
```
## 5. 接口设计
### 5.1 公共 API
#### 5.1.1 主控制器接口
```rust
/// 远程桌面控制器
pub struct RemoteDesktopController {
/// 捕获管理器
capture_manager: Arc<Mutex<CaptureManager>>,
/// 编码器管道
encoder_pipeline: Arc<Mutex<EncoderPipeline>>,
/// WebRTC 传输
webrtc_transport: Arc<Mutex<WebRtcTransport>>,
/// 会话管理器
session_manager: Arc<Mutex<SessionManager>>,
}
impl RemoteDesktopController {
/// 创建新的控制器
pub async fn new(config: ControllerConfig) -> Result<Self, ControllerError> {
let capture_manager = Arc::new(Mutex::new(
CaptureManager::new(config.capture_config).await?
));
let encoder_pipeline = Arc::new(Mutex::new(
EncoderPipeline::new(config.encoder_config)?
));
let webrtc_transport = Arc::new(Mutex::new(
WebRtcTransport::new(config.webrtc_config).await?
));
let session_manager = Arc::new(Mutex::new(
SessionManager::new(config.session_config)
));
Ok(Self {
capture_manager,
encoder_pipeline,
webrtc_transport,
session_manager,
})
}
/// 启动会话
pub async fn start_session(&self, session_id: String) -> Result<(), ControllerError> {
// 启动捕获
self.capture_manager.lock().await.start().await?;
// 启动编码器
self.encoder_pipeline.lock().await.start()?;
// 创建 WebRTC 连接
self.webrtc_transport.lock().await
.create_peer_connection(session_id.clone()).await?;
// 启动处理循环
self.start_processing_loop(session_id).await?;
Ok(())
}
/// 停止会话
pub async fn stop_session(&self, session_id: String) -> Result<(), ControllerError> {
// 停止捕获
self.capture_manager.lock().await.stop().await?;
// 停止编码器
self.encoder_pipeline.lock().await.stop()?;
// 关闭 WebRTC 连接
self.webrtc_transport.lock().await
.close_peer_connection(&session_id).await?;
Ok(())
}
/// 更新配置
pub async fn update_config(
&self,
session_id: &str,
config: VideoConfig,
) -> Result<(), ControllerError> {
// 更新编码器配置
self.encoder_pipeline.lock().await
.reconfigure(config.into()).await?;
Ok(())
}
/// 获取会话统计
pub async fn get_session_stats(&self, session_id: &str) -> Result<SessionStats, ControllerError> {
Ok(self.session_manager.lock().await
.get_stats(session_id)?)
}
}
/// 控制器配置
#[derive(Debug, Clone)]
pub struct ControllerConfig {
pub capture_config: CaptureConfig,
pub encoder_config: EncoderConfig,
pub webrtc_config: WebRtcConfig,
pub session_config: SessionConfig,
}
```
### 5.2 内部接口
#### 5.2.1 模块间通信接口
```rust
/// 帧接收接口
#[async_trait]
pub trait FrameReceiver: Send + Sync {
async fn receive_frame(&self) -> Result<CapturedFrame, ReceiverError>;
}
/// 帧发送接口
#[async_trait]
pub trait FrameSender: Send + Sync {
async fn send_frame(&self, frame: EncodedFrame) -> Result<(), SenderError>;
}
/// 编码接口
#[async_trait]
pub trait Encoder: Send + Sync {
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError>;
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError>;
}
/// 编码后的帧接收接口
#[async_trait]
pub trait EncodedFrameReceiver: Send + Sync {
async fn receive(&self) -> Result<EncodedFrame, ReceiverError>;
}
/// 会话管理接口
#[async_trait]
pub trait SessionManager: Send + Sync {
async fn create_session(&self, id: String) -> Result<(), SessionManagerError>;
async fn get_session(&self, id: &str) -> Option<SessionInfo>;
async fn update_session(&self, id: &str, info: SessionInfo) -> Result<(), SessionManagerError>;
async fn remove_session(&self, id: &str) -> Result<(), SessionManagerError>;
}
```
### 5.3 错误处理接口
#### 5.3.1 错误类型定义
```rust
/// 控制器错误
#[derive(Debug, thiserror::Error)]
pub enum ControllerError {
#[error("捕获错误: {0}")]
Capture(#[from] CaptureError),
#[error("编码器错误: {0}")]
Encoder(#[from] EncoderError),
#[error("WebRTC 错误: {0}")]
WebRtc(#[from] WebRtcError),
#[error("会话错误: {0}")]
Session(#[from] SessionManagerError),
#[error("配置错误: {0}")]
Config(String),
#[error("内部错误: {0}")]
Internal(String),
}
/// 捕获错误
#[derive(Debug, thiserror::Error)]
pub enum CaptureError {
#[error("PipeWire 初始化失败: {0}")]
PipeWireInitFailed(String),
#[error("权限被拒绝")]
PermissionDenied,
#[error("缓冲区获取失败")]
BufferAcquisitionFailed,
#[error("流状态错误: {0:?}")]
InvalidStreamState(StreamState),
#[error("超时")]
Timeout,
}
/// 编码器错误
#[derive(Debug, thiserror::Error)]
pub enum EncoderError {
#[error("初始化失败: {0}")]
InitFailed(String),
#[error("编码失败: {0}")]
EncodeFailed(String),
#[error("重新配置失败: {0}")]
ReconfigureFailed(String),
#[error("不支持的格式: {0}")]
UnsupportedFormat(String),
#[error("硬件加速失败")]
HardwareAccelerationFailed,
}
/// WebRTC 错误
#[derive(Debug, thiserror::Error)]
pub enum WebRtcError {
#[error("对等连接创建失败")]
PeerConnectionCreationFailed,
#[error("SDP 协商失败")]
SdpNegotiationFailed,
#[error("ICE 连接失败")]
IceConnectionFailed,
#[error("会话不存在: {0}")]
SessionNotFound(String),
#[error("数据通道错误")]
DataChannelError(String),
}
/// 会话管理错误
#[derive(Debug, thiserror::Error)]
pub enum SessionManagerError {
#[error("会话已存在: {0}")]
SessionExists(String),
#[error("会话不存在: {0}")]
SessionNotFound(String),
#[error("无效的状态转换")]
InvalidStateTransition,
}
```
## 6. 性能优化
### 6.1 性能指标
#### 6.1.1 指标定义
```rust
/// 性能指标收集器
pub struct PerformanceMetrics {
/// 延迟指标
latency_metrics: LatencyMetrics,
/// 吞吐量指标
throughput_metrics: ThroughputMetrics,
/// 资源指标
resource_metrics: ResourceMetrics,
/// 质量指标
quality_metrics: QualityMetrics,
}
/// 延迟指标
#[derive(Debug, Clone)]
pub struct LatencyMetrics {
/// 端到端延迟 (P50)
pub end_to_end_p50_ms: f64,
/// 端到端延迟 (P95)
pub end_to_end_p95_ms: f64,
/// 端到端延迟 (P99)
pub end_to_end_p99_ms: f64,
/// 捕获延迟
pub capture_latency_ms: f64,
/// 编码延迟
pub encode_latency_ms: f64,
/// 传输延迟
pub transport_latency_ms: f64,
/// 渲染延迟
pub render_latency_ms: f64,
}
/// 吞吐量指标
#[derive(Debug, Clone)]
pub struct ThroughputMetrics {
/// 实际比特率 (bps)
pub actual_bitrate: u32,
/// 目标比特率 (bps)
pub target_bitrate: u32,
/// 帧率 (FPS)
pub frame_rate: f64,
/// 丢包率
pub packet_loss_rate: f64,
/// 丢帧率
pub frame_drop_rate: f64,
}
/// 资源指标
#[derive(Debug, Clone)]
pub struct ResourceMetrics {
/// CPU 使用率 (%)
pub cpu_usage: f64,
/// 内存使用量 (bytes)
pub memory_usage: u64,
/// GPU 使用率 (%)
pub gpu_usage: f64,
/// 网络带宽使用 (bps)
pub network_usage: u32,
}
/// 质量指标
#[derive(Debug, Clone)]
pub struct QualityMetrics {
/// PSNR
pub psnr: f64,
/// SSIM
pub ssim: f64,
/// 平均量化参数
pub avg_qp: f64,
/// 关键帧间隔
pub keyframe_interval: u32,
}
impl PerformanceMetrics {
pub fn new() -> Self {
Self {
latency_metrics: LatencyMetrics::default(),
throughput_metrics: ThroughputMetrics::default(),
resource_metrics: ResourceMetrics::default(),
quality_metrics: QualityMetrics::default(),
}
}
/// 更新延迟指标
pub fn update_latency(&mut self, category: LatencyCategory, value_ms: f64) {
match category {
LatencyCategory::Capture => {
self.latency_metrics.capture_latency_ms = value_ms;
}
LatencyCategory::Encode => {
self.latency_metrics.encode_latency_ms = value_ms;
}
LatencyCategory::Transport => {
self.latency_metrics.transport_latency_ms = value_ms;
}
LatencyCategory::Render => {
self.latency_metrics.render_latency_ms = value_ms;
}
LatencyCategory::EndToEnd => {
self.latency_metrics.end_to_end_p50_ms = value_ms;
}
}
}
/// 获取总体评分
pub fn overall_score(&self) -> PerformanceScore {
let latency_score = self.calculate_latency_score();
let throughput_score = self.calculate_throughput_score();
let resource_score = self.calculate_resource_score();
let quality_score = self.calculate_quality_score();
PerformanceScore {
latency: latency_score,
throughput: throughput_score,
resource: resource_score,
quality: quality_score,
overall: (latency_score + throughput_score + resource_score + quality_score) / 4.0,
}
}
fn calculate_latency_score(&self) -> f64 {
// 目标延迟 20ms计算得分
let target = 20.0;
let current = self.latency_metrics.end_to_end_p95_ms;
(target / current).min(1.0) * 100.0
}
fn calculate_throughput_score(&self) -> f64 {
let efficiency = self.throughput_metrics.actual_bitrate as f64
/ self.throughput_metrics.target_bitrate as f64;
efficiency.min(1.0) * 100.0
}
fn calculate_resource_score(&self) -> f64 {
// CPU 和内存使用率的倒数
let cpu_score = (1.0 - self.resource_metrics.cpu_usage / 100.0) * 100.0;
let memory_score = (1.0 - self.resource_metrics.memory_usage as f64 / 512_000_000.0) * 100.0;
(cpu_score + memory_score) / 2.0
}
fn calculate_quality_score(&self) -> f64 {
self.quality_metrics.ssim * 100.0
}
}
#[derive(Debug, Clone)]
pub struct PerformanceScore {
pub latency: f64,
pub throughput: f64,
pub resource: f64,
pub quality: f64,
pub overall: f64,
}
#[derive(Debug, Clone, Copy)]
pub enum LatencyCategory {
Capture,
Encode,
Transport,
Render,
EndToEnd,
}
```
### 6.2 优化策略
#### 6.2.1 延迟优化策略
```rust
/// 延迟优化器
pub struct LatencyOptimizer {
/// 当前策略
strategy: LatencyOptimizationStrategy,
/// 性能监控
monitor: PerformanceMonitor,
/// 调整间隔
adjustment_interval: Duration,
}
/// 延迟优化策略
#[derive(Debug, Clone, Copy)]
pub enum LatencyOptimizationStrategy {
/// 极低延迟模式
UltraLow,
/// 低延迟模式
Low,
/// 平衡模式
Balanced,
/// 高质量模式
HighQuality,
}
impl LatencyOptimizer {
pub fn new(strategy: LatencyOptimizationStrategy) -> Self {
Self {
strategy,
monitor: PerformanceMonitor::new(),
adjustment_interval: Duration::from_millis(500),
}
}
/// 根据性能调整策略
pub async fn optimize(&mut self, metrics: &PerformanceMetrics) -> OptimizationAction {
let current_latency = metrics.latency_metrics.end_to_end_p95_ms;
match self.strategy {
LatencyOptimizationStrategy::UltraLow => {
if current_latency > 25.0 {
OptimizationAction::ReduceBuffering
} else if current_latency < 15.0 {
OptimizationAction::IncreaseQuality
} else {
OptimizationAction::None
}
}
LatencyOptimizationStrategy::Low => {
if current_latency > 40.0 {
OptimizationAction::ReduceFrameRate
} else if current_latency < 30.0 {
OptimizationAction::IncreaseFrameRate
} else {
OptimizationAction::None
}
}
LatencyOptimizationStrategy::Balanced => {
if current_latency > 60.0 {
OptimizationAction::ReduceBitrate
} else if current_latency < 40.0 {
OptimizationAction::IncreaseBitrate
} else {
OptimizationAction::None
}
}
LatencyOptimizationStrategy::HighQuality => {
if current_latency > 100.0 {
OptimizationAction::ReduceQuality
} else {
OptimizationAction::None
}
}
}
}
}
/// 优化动作
#[derive(Debug, Clone, Copy)]
pub enum OptimizationAction {
None,
ReduceBuffering,
IncreaseQuality,
ReduceFrameRate,
IncreaseFrameRate,
ReduceBitrate,
IncreaseBitrate,
ReduceQuality,
SwitchCodec,
}
```
#### 6.2.2 缓冲优化
```rust
/// 自适应缓冲控制器
pub struct AdaptiveBufferController {
/// 当前缓冲区大小
current_buffer_size: usize,
/// 最小缓冲区大小
min_buffer_size: usize,
/// 最大缓冲区大小
max_buffer_size: usize,
/// 延迟历史
latency_history: VecDeque<f64>,
}
impl AdaptiveBufferController {
pub fn new(initial_size: usize, min_size: usize, max_size: usize) -> Self {
Self {
current_buffer_size: initial_size,
min_buffer_size: min_size,
max_buffer_size: max_size,
latency_history: VecDeque::with_capacity(100),
}
}
/// 根据延迟调整缓冲区大小
pub fn adjust_buffer_size(&mut self, current_latency_ms: f64) -> BufferAdjustment {
self.latency_history.push_back(current_latency_ms);
if self.latency_history.len() > 100 {
self.latency_history.pop_front();
}
let avg_latency: f64 = self.latency_history.iter().sum::<f64>()
/ self.latency_history.len() as f64;
let target_latency = 25.0;
let adjustment_factor = (avg_latency / target_latency).ln();
let new_size = if adjustment_factor > 0.5 {
// 延迟太高,减少缓冲
self.current_buffer_size.saturating_sub(1).max(self.min_buffer_size)
} else if adjustment_factor < -0.5 {
// 延迟较低,可以增加缓冲
self.current_buffer_size.saturating_add(1).min(self.max_buffer_size)
} else {
self.current_buffer_size
};
BufferAdjustment {
old_size: self.current_buffer_size,
new_size,
action: if new_size > self.current_buffer_size {
BufferAction::Increase
} else if new_size < self.current_buffer_size {
BufferAction::Decrease
} else {
BufferAction::Maintain
},
}
}
}
/// 缓冲调整
#[derive(Debug, Clone)]
pub struct BufferAdjustment {
pub old_size: usize,
pub new_size: usize,
pub action: BufferAction,
}
#[derive(Debug, Clone, Copy)]
pub enum BufferAction {
Increase,
Decrease,
Maintain,
}
```
### 6.3 性能监控
#### 6.3.1 实时监控
```rust
/// 性能监控器
pub struct PerformanceMonitor {
/// 延迟跟踪器
latency_tracker: LatencyTracker,
/// 吞吐量跟踪器
throughput_tracker: ThroughputTracker,
/// 资源跟踪器
resource_tracker: ResourceTracker,
}
/// 延迟跟踪器
pub struct LatencyTracker {
/// 延迟样本
samples: VecDeque<LatencySample>,
/// 最大样本数
max_samples: usize,
}
#[derive(Debug, Clone)]
struct LatencySample {
timestamp: Instant,
latency_ms: f64,
category: LatencyCategory,
}
impl LatencyTracker {
pub fn new(max_samples: usize) -> Self {
Self {
samples: VecDeque::with_capacity(max_samples),
max_samples,
}
}
pub fn record_latency(&mut self, latency_ms: f64, category: LatencyCategory) {
self.samples.push_back(LatencySample {
timestamp: Instant::now(),
latency_ms,
category,
});
if self.samples.len() > self.max_samples {
self.samples.pop_front();
}
}
pub fn get_percentile(&self, percentile: f64) -> Option<f64> {
if self.samples.is_empty() {
return None;
}
let mut latencies: Vec<f64> = self.samples.iter()
.map(|s| s.latency_ms)
.collect();
latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
let index = (latencies.len() as f64 * percentile / 100.0) as usize;
latencies.get(index).copied()
}
pub fn get_average(&self) -> Option<f64> {
if self.samples.is_empty() {
return None;
}
let sum: f64 = self.samples.iter()
.map(|s| s.latency_ms)
.sum();
Some(sum / self.samples.len() as f64)
}
}
```
### 6.4 调优指南
#### 6.4.1 调优参数
```rust
/// 调优参数集合
pub struct TuningParameters {
/// 编码器参数
pub encoder: EncoderTuningParams,
/// 网络参数
pub network: NetworkTuningParams,
/// 缓冲参数
pub buffer: BufferTuningParams,
}
/// 编码器调优参数
#[derive(Debug, Clone)]
pub struct EncoderTuningParams {
/// 比特率调整步长 (bps)
pub bitrate_step: u32,
/// 最小比特率
pub min_bitrate: u32,
/// 最大比特率
pub max_bitrate: u32,
/// GOP 大小
pub gop_size: u32,
/// B 帧数量
pub b_frames: u32,
/// 预设
pub preset: EncodePreset,
/// 调优
pub tune: EncodeTune,
}
/// 网络调优参数
#[derive(Debug, Clone)]
pub struct NetworkTuningParams {
/// NACK 窗口大小 (ms)
pub nack_window_ms: u32,
/// FEC 开关
pub fec_enabled: bool,
/// 最大重传次数
pub max_retransmissions: u32,
/// 拥塞控制算法
pub congestion_algorithm: CongestionControlAlgorithm,
}
/// 缓冲调优参数
#[derive(Debug, Clone)]
pub struct BufferTuningParams {
/// 最小缓冲区大小
pub min_buffer_size: usize,
/// 最大缓冲区大小
pub max_buffer_size: usize,
/// 初始缓冲区大小
pub initial_buffer_size: usize,
}
impl TuningParameters {
/// 根据场景创建调优参数
pub fn for_scenario(scenario: Scenario) -> Self {
match scenario {
Scenario::LowLatency => Self {
encoder: EncoderTuningParams {
bitrate_step: 500_000,
min_bitrate: 1_000_000,
max_bitrate: 8_000_000,
gop_size: 15,
b_frames: 0,
preset: EncodePreset::Ultrafast,
tune: EncodeTune::Zerolatency,
},
network: NetworkTuningParams {
nack_window_ms: 20,
fec_enabled: false,
max_retransmissions: 1,
congestion_algorithm: CongestionControlAlgorithm::GCC,
},
buffer: BufferTuningParams {
min_buffer_size: 2,
max_buffer_size: 5,
initial_buffer_size: 3,
},
},
Scenario::HighQuality => Self {
encoder: EncoderTuningParams {
bitrate_step: 1_000_000,
min_bitrate: 2_000_000,
max_bitrate: 15_000_000,
gop_size: 30,
b_frames: 2,
preset: EncodePreset::Faster,
tune: EncodeTune::Film,
},
network: NetworkTuningParams {
nack_window_ms: 100,
fec_enabled: true,
max_retransmissions: 3,
congestion_algorithm: CongestionControlAlgorithm::TWCC,
},
buffer: BufferTuningParams {
min_buffer_size: 5,
max_buffer_size: 10,
initial_buffer_size: 7,
},
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Scenario {
LowLatency,
HighQuality,
Balanced,
}
#[derive(Debug, Clone, Copy)]
pub enum CongestionControlAlgorithm {
GCC,
TWCC,
}
```
## 7. 并发设计
### 7.1 线程模型
#### 7.1.1 线程架构
```
主线程 (Tokio Runtime)
├── 网络线程池 (处理 I/O)
│ ├── WebSocket 接收
│ ├── WebSocket 发送
│ ├── UDP 接收
│ └── UDP 发送
├── 编码线程池 (处理编码)
│ ├── 编码线程 1
│ ├── 编码线程 2
│ └── 编码线程 3
├── 处理线程 (处理帧)
│ ├── 帧处理任务
│ ┍ 统计收集
│ └── 配置更新
└── 监控线程 (后台任务)
├── 性能监控
├── 健康检查
└── 日志轮转
```
#### 7.1.2 线程分配
```rust
/// 线程池配置
pub struct ThreadPoolConfig {
/// 网络线程数
pub network_threads: usize,
/// 编码线程数
pub encoder_threads: usize,
/// 处理线程数
pub processing_threads: usize,
/// 监控线程数
pub monitoring_threads: usize,
}
impl ThreadPoolConfig {
/// 根据系统配置
pub fn auto() -> Self {
let num_cpus = num_cpus::get();
Self {
network_threads: 2,
encoder_threads: (num_cpus / 2).max(1),
processing_threads: (num_cpus / 4).max(1),
monitoring_threads: 1,
}
}
/// 创建 Tokio 运行时
pub fn create_runtime(&self) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.encoder_threads + self.processing_threads)
.thread_name("wl-webrtc-worker")
.enable_all()
.build()
.expect("Failed to create runtime")
}
}
```
### 7.2 同步机制
#### 7.2.1 锁的选择
| 场景 | 锁类型 | 原因 |
|------|--------|------|
| 配置读取 | `RwLock` | 读取频繁,写入很少 |
| 会话映射 | `RwLock` | 需要并发读取 |
| 编码器状态 | `Mutex` | 临界区短,简单 |
| 统计数据 | `RwLock` | 读取频繁 |
| 缓冲区池 | `Mutex` | 需要原子操作 |
| 无锁队列 | `crossbeam::queue` | 高性能通道 |
```rust
/// 使用 RwLock 的配置管理
pub struct ConfigManager {
config: Arc<RwLock<Config>>,
}
impl ConfigManager {
pub fn read_config(&self) -> Config {
self.config.read().unwrap().clone()
}
pub fn update_config(&self, new_config: Config) {
let mut config = self.config.write().unwrap();
*config = new_config;
}
}
/// 使用 Mutex 的编码器状态
pub struct EncoderState {
state: Arc<Mutex<EncoderStateInner>>,
}
struct EncoderStateInner {
is_encoding: bool,
current_bitrate: u32,
frame_count: u64,
}
impl EncoderState {
pub async fn update_state(&self, update: StateUpdate) {
let mut state = self.state.lock().await;
match update {
StateUpdate::SetEncoding(v) => state.is_encoding = v,
StateUpdate::SetBitrate(b) => state.current_bitrate = b,
StateUpdate::IncrementFrameCount => state.frame_count += 1,
}
}
}
#[derive(Debug)]
pub enum StateUpdate {
SetEncoding(bool),
SetBitrate(u32),
IncrementFrameCount,
}
```
#### 7.2.2 无锁数据结构
```rust
/// 使用无锁队列的帧通道
pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
/// 无锁帧通道
pub type FrameSender = Sender<CapturedFrame>;
pub type FrameReceiver = Receiver<CapturedFrame>;
/// 创建帧通道
pub fn create_frame_channel(capacity: usize) -> (FrameSender, FrameReceiver) {
bounded(capacity)
}
/// 使用无锁队列的统计收集
pub struct LockFreeMetrics {
/// 帧计数
frame_count: AtomicU64,
/// 字节计数
byte_count: AtomicU64,
/// 延迟总和
latency_sum: AtomicU64,
/// 样本计数
sample_count: AtomicU64,
}
impl LockFreeMetrics {
pub fn new() -> Self {
Self {
frame_count: AtomicU64::new(0),
byte_count: AtomicU64::new(0),
latency_sum: AtomicU64::new(0),
sample_count: AtomicU64::new(0),
}
}
pub fn record_frame(&self, frame_size: usize, latency_ms: u64) {
self.frame_count.fetch_add(1, Ordering::Relaxed);
self.byte_count.fetch_add(frame_size as u64, Ordering::Relaxed);
self.latency_sum.fetch_add(latency_ms, Ordering::Relaxed);
self.sample_count.fetch_add(1, Ordering::Relaxed);
}
pub fn get_metrics(&self) -> MetricsSnapshot {
MetricsSnapshot {
frame_count: self.frame_count.load(Ordering::Relaxed),
byte_count: self.byte_count.load(Ordering::Relaxed),
avg_latency_ms: {
let sum = self.latency_sum.load(Ordering::Relaxed);
let count = self.sample_count.load(Ordering::Relaxed);
if count > 0 {
sum / count
} else {
0
}
},
}
}
}
```
### 7.3 任务调度
#### 7.3.1 优先级调度
```rust
/// 优先级任务调度器
pub struct PriorityScheduler {
/// 高优先级队列
high_priority: tokio::sync::mpsc::Sender<Task>,
/// 中优先级队列
medium_priority: tokio::sync::mpsc::Sender<Task>,
/// 低优先级队列
low_priority: tokio::sync::mpsc::Sender<Task>,
}
#[derive(Debug)]
pub struct Task {
pub id: TaskId,
pub priority: TaskPriority,
pub operation: Operation,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TaskPriority {
High,
Medium,
Low,
}
impl PriorityScheduler {
pub fn new() -> Self {
let (high_tx, mut high_rx) = tokio::sync::mpsc::channel(100);
let (medium_tx, mut medium_rx) = tokio::sync::mpsc::channel(100);
let (low_tx, mut low_rx) = tokio::sync::mpsc::channel(100);
// 启动调度循环
tokio::spawn(async move {
loop {
tokio::select! {
biased;
// 高优先级
Some(task) = high_rx.recv() => {
Self::execute_task(task).await;
}
// 中优先级
Some(task) = medium_rx.recv() => {
Self::execute_task(task).await;
}
// 低优先级
Some(task) = low_rx.recv() => {
Self::execute_task(task).await;
}
}
}
});
Self {
high_priority: high_tx,
medium_priority: medium_tx,
low_priority: low_tx,
}
}
pub async fn schedule(&self, task: Task) -> Result<(), ScheduleError> {
match task.priority {
TaskPriority::High => {
self.high_priority.send(task).await?;
}
TaskPriority::Medium => {
self.medium_priority.send(task).await?;
}
TaskPriority::Low => {
self.low_priority.send(task).await?;
}
}
Ok(())
}
async fn execute_task(task: Task) {
debug!("执行任务 {:?}, 优先级: {:?}", task.id, task.priority);
// 执行任务...
}
}
```
### 7.4 锁策略
#### 7.4.1 死锁预防
```rust
/// 锁顺序管理器
/// 通过强制锁获取顺序来预防死锁
pub struct LockOrderManager;
impl LockOrderManager {
/// 定义锁顺序
/// 较小的数字应该先获取
pub const LOCK_ORDER: &[LockId] = &[
LockId::Config,
LockId::Session,
LockId::Encoder,
LockId::Buffer,
];
/// 检查锁顺序是否正确
pub fn validate_order(held_locks: &[LockId], requested_lock: LockId) -> bool {
if held_locks.is_empty() {
return true;
}
let held_max = held_locks.iter().max().unwrap();
*held_max < requested_lock
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum LockId {
Config = 1,
Session = 2,
Encoder = 3,
Buffer = 4,
}
/// RAII 锁保护
pub struct LockGuard<'a, T> {
data: &'a T,
acquired: bool,
}
impl<'a, T> LockGuard<'a, T> {
pub fn new(data: &'a T) -> Self {
Self {
data,
acquired: true,
}
}
pub fn get(&self) -> &T {
self.data
}
}
impl<'a, T> Drop for LockGuard<'a, T> {
fn drop(&mut self) {
if self.acquired {
// 自动释放锁
}
}
}
```
## 8. 网络设计
### 8.1 协议栈
#### 8.1.1 WebRTC 协议层次
```
┌─────────────────────────────────────┐
│ 应用层 │
│ (视频/音频/数据通道) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ RTP/RTCP 层 │
│ (数据包/控制包) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ DTLS/SRTP 层 │
│ (加密/认证) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ ICE 层 │
│ (连接建立/候选选择) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ UDP 层 │
│ (传输层) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ IP 层 │
│ (网络层) │
└─────────────────────────────────────┘
```
### 8.2 数据格式
#### 8.2.1 RTP 数据包格式
```rust
/// RTP 数据包
#[derive(Debug, Clone)]
pub struct RtpPacket {
/// RTP 头部
pub header: RtpHeader,
/// 扩展头 (可选)
pub extension: Option<RtpExtension>,
/// 载荷
pub payload: Bytes,
}
/// RTP 头部 (12 字节)
#[derive(Debug, Clone, Copy)]
#[repr(C, packed)]
pub struct RtpHeader {
/// V:2, P, X, CC:4
pub byte0: u8,
/// M, PT:7
pub byte1: u8,
pub sequence_number: u16,
pub timestamp: u32,
pub ssrc: u32,
}
impl RtpHeader {
pub fn new(sequence_number: u16, timestamp: u32, ssrc: u32) -> Self {
Self {
byte0: 0x80, // Version 2
byte1: 0x00, // No marker, default PT
sequence_number,
timestamp,
ssrc,
}
}
pub fn version(&self) -> u8 {
(self.byte0 & 0xC0) >> 6
}
pub fn padding(&self) -> bool {
(self.byte0 & 0x20) != 0
}
pub fn extension(&self) -> bool {
(self.byte0 & 0x10) != 0
}
pub fn csrc_count(&self) -> u8 {
self.byte0 & 0x0F
}
pub fn marker(&self) -> bool {
(self.byte1 & 0x80) != 0
}
pub fn payload_type(&self) -> u8 {
self.byte1 & 0x7F
}
pub fn set_marker(&mut self, marker: bool) {
if marker {
self.byte1 |= 0x80;
} else {
self.byte1 &= 0x7F;
}
}
pub fn set_payload_type(&mut self, pt: u8) {
self.byte1 = (self.byte1 & 0x80) | (pt & 0x7F);
}
pub fn to_bytes(&self) -> [u8; 12] {
[
self.byte0,
self.byte1,
(self.sequence_number >> 8) as u8,
self.sequence_number as u8,
(self.timestamp >> 24) as u8,
(self.timestamp >> 16) as u8,
(self.timestamp >> 8) as u8,
self.timestamp as u8,
(self.ssrc >> 24) as u8,
(self.ssrc >> 16) as u8,
(self.ssrc >> 8) as u8,
self.ssrc as u8,
]
}
}
/// RTP 扩展头
#[derive(Debug, Clone)]
pub struct RtpExtension {
/// 扩展类型
pub extension_type: u16,
/// 扩展数据
pub data: Bytes,
}
/// RTP 载荷类型
pub const RTP_PAYLOAD_TYPE_H264: u8 = 96;
pub const RTP_PAYLOAD_TYPE_H265: u8 = 97;
pub const RTP_PAYLOAD_TYPE_VP9: u8 = 98;
pub const RTP_PAYLOAD_TYPE_AV1: u8 = 99;
```
#### 8.2.2 H.264 RTP 打包
```rust
/// H.264 打包器
pub struct H264RtpPacketizer {
/// 最大载荷大小
max_payload_size: usize,
/// 序列号
sequence_number: u16,
}
impl H264RtpPacketizer {
pub fn new(max_payload_size: usize) -> Self {
Self {
max_payload_size: max_payload_size - 2, // 留出 FU 头
sequence_number: rand::random(),
}
}
/// 打包 NAL 单元
pub fn packetize(&mut self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
-> Vec<RtpPacket>
{
let nalu_type = nalu[0] & 0x1F;
// NALU 太大,需要分片
if nalu.len() > self.max_payload_size {
self.packetize_fu_a(nalu, timestamp, is_keyframe)
} else {
// 单 NAL 单元包
vec![self.single_nalu_packet(nalu, timestamp, is_keyframe)]
}
}
/// 单 NAL 单元包
fn single_nalu_packet(&self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
-> RtpPacket
{
let mut header = RtpHeader::new(self.sequence_number, timestamp, 0);
header.set_marker(true);
header.set_payload_type(RTP_PAYLOAD_TYPE_H264);
RtpPacket {
header,
extension: None,
payload: Bytes::copy_from_slice(nalu),
}
}
/// FU-A 分片打包
fn packetize_fu_a(&mut self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
-> Vec<RtpPacket>
{
let nalu_type = nalu[0] & 0x1F;
let nalu_payload = &nalu[1..];
let payload_size = nalu_payload.len();
let num_packets = (payload_size + self.max_payload_size - 1) / self.max_payload_size;
let mut packets = Vec::new();
for i in 0..num_packets {
let offset = i * self.max_payload_size;
let size = self.max_payload_size.min(payload_size - offset);
let payload = &nalu_payload[offset..offset + size];
let mut header = RtpHeader::new(self.sequence_number, timestamp, 0);
// 只有最后一个包设置 marker
header.set_marker(i == num_packets - 1);
header.set_payload_type(RTP_PAYLOAD_TYPE_H264);
// 创建 FU-A 头
let fu_indicator = (28 << 5) | nalu_type;
let fu_header = {
let mut h = 0;
if i == 0 {
h |= 0x80; // Start bit
}
if i == num_packets - 1 {
h |= 0x40; // End bit
}
h
};
let mut packet_payload = Vec::with_capacity(2 + size);
packet_payload.push(fu_indicator);
packet_payload.push(fu_header);
packet_payload.extend_from_slice(payload);
packets.push(RtpPacket {
header,
extension: None,
payload: Bytes::from(packet_payload),
});
self.sequence_number = self.sequence_number.wrapping_add(1);
}
packets
}
}
```
### 8.3 网络优化
#### 8.3.1 NAT 穿透策略
```rust
/// NAT 穿透管理器
pub struct NatTraversalManager {
/// STUN 服务器
stun_servers: Vec<String>,
/// TURN 服务器
turn_servers: Vec<TurnServerConfig>,
/// ICE 候选收集状态
ice_state: IceState,
}
#[derive(Debug, Clone)]
pub struct TurnServerConfig {
pub url: String,
pub username: String,
pub credential: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IceState {
Gathering,
GComplete,
Checking,
Connected,
Failed,
}
impl NatTraversalManager {
pub fn new(
stun_servers: Vec<String>,
turn_servers: Vec<TurnServerConfig>,
) -> Self {
Self {
stun_servers,
turn_servers,
ice_state: IceState::Gathering,
}
}
/// 获取 ICE 配置
pub fn get_ice_servers(&self) -> Vec<RTCIceServer> {
let mut servers = Vec::new();
// STUN 服务器
for stun in &self.stun_servers {
servers.push(RTCIceServer {
urls: vec![stun.clone()],
..Default::default()
});
}
// TURN 服务器
for turn in &self.turn_servers {
servers.push(RTCIceServer {
urls: vec![turn.url.clone()],
username: turn.username.clone(),
credential: turn.credential.clone(),
..Default::default()
});
}
servers
}
/// 选择最佳候选
pub fn select_best_candidate(&self, candidates: &[RTCIceCandidate]) -> Option<RTCIceCandidate> {
// 优先选择中继候选
if let Some(relay) = candidates.iter()
.find(|c| c.candidate_type == Some("relay".to_string()))
{
return Some(relay.clone());
}
// 其次选择 srflx (server reflexive)
if let Some(srflx) = candidates.iter()
.find(|c| c.candidate_type == Some("srflx".to_string()))
{
return Some(srflx.clone());
}
// 最后选择 host 候选
candidates.iter().find(|c| c.candidate_type == Some("host".to_string())).cloned()
}
}
```
#### 8.3.2 拥塞控制
```rust
/// 拥塞控制器
pub trait CongestionController: Send + Sync {
/// 更新丢包信息
fn on_packet_loss(&mut self, loss_rate: f64);
/// 更新往返时间
fn on_rtt_update(&mut self, rtt_ms: u32);
/// 获取推荐比特率
fn get_target_bitrate(&self) -> u32;
/// 是否应该减少比特率
fn should_reduce_bitrate(&self) -> bool;
}
/// Google 拥塞控制 (GCC)
pub struct GccCongestionController {
/// 当前比特率
current_bitrate: u32,
/// 最小比特率
min_bitrate: u32,
/// 最大比特率
max_bitrate: u32,
/// 最近丢包率
recent_loss_rate: f64,
/// 最近 RTT
recent_rtt_ms: u32,
}
impl CongestionController for GccCongestionController {
fn on_packet_loss(&mut self, loss_rate: f64) {
self.recent_loss_rate = loss_rate;
// 丢包率 > 10%,显著降低比特率
if loss_rate > 0.10 {
self.current_bitrate = (self.current_bitrate as f64 * 0.8) as u32;
} else if loss_rate > 0.02 {
// 丢包率 2-10%,适度降低
self.current_bitrate = (self.current_bitrate as f64 * 0.9) as u32;
}
}
fn on_rtt_update(&mut self, rtt_ms: u32) {
self.recent_rtt_ms = rtt_ms;
// RTT > 200ms降低比特率
if rtt_ms > 200 {
self.current_bitrate = (self.current_bitrate as f64 * 0.9) as u32;
}
}
fn get_target_bitrate(&self) -> u32 {
self.current_bitrate.clamp(self.min_bitrate, self.max_bitrate)
}
fn should_reduce_bitrate(&self) -> bool {
self.recent_loss_rate > 0.05 || self.recent_rtt_ms > 150
}
}
```
### 8.4 错误处理
#### 8.4.1 网络错误恢复
```rust
/// 网络错误处理器
pub struct NetworkErrorHandler {
/// 最大重试次数
max_retries: u32,
/// 重试延迟
retry_delay: Duration,
/// 当前重试次数
retry_count: AtomicU32,
}
impl NetworkErrorHandler {
pub fn new(max_retries: u32, retry_delay: Duration) -> Self {
Self {
max_retries,
retry_delay,
retry_count: AtomicU32::new(0),
}
}
/// 处理网络错误
pub async fn handle_error<T, F, Fut>(&self, operation: F) -> Result<T, NetworkError>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, NetworkError>>,
{
loop {
match operation().await {
Ok(result) => {
// 成功,重置重试计数
self.retry_count.store(0, Ordering::Relaxed);
return Ok(result);
}
Err(e) => {
let count = self.retry_count.fetch_add(1, Ordering::Relaxed);
if count >= self.max_retries {
error!("超过最大重试次数: {}", count);
return Err(e);
}
warn!("网络错误,重试 ({}/{}): {:?}", count + 1, self.max_retries, e);
tokio::time::sleep(self.retry_delay).await;
}
}
}
}
/// 判断错误是否可恢复
pub fn is_recoverable(&self, error: &NetworkError) -> bool {
match error {
NetworkError::ConnectionReset => true,
NetworkError::Timeout => true,
NetworkError::TemporaryFailure => true,
_ => false,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum NetworkError {
#[error("连接重置")]
ConnectionReset,
#[error("连接超时")]
Timeout,
#[error("临时故障")]
TemporaryFailure,
#[error("永久故障")]
PermanentFailure,
#[error("解析失败: {0}")]
ParseError(String),
}
```
## 9. 安全设计
### 9.1 认证授权
#### 9.1.1 会话认证
```rust
/// 认证管理器
pub struct AuthenticationManager {
/// 密钥管理
key_manager: KeyManager,
/// 会话令牌
tokens: Arc<RwLock<HashMap<String, SessionToken>>>,
}
impl AuthenticationManager {
pub fn new(key_manager: KeyManager) -> Self {
Self {
key_manager,
tokens: Arc::new(RwLock::new(HashMap::new())),
}
}
/// 生成会话令牌
pub async fn generate_token(&self, session_id: &str, ttl: Duration)
-> Result<String, AuthError>
{
let secret = self.key_manager.generate_secret()?;
let token = SessionToken {
session_id: session_id.to_string(),
secret,
issued_at: Instant::now(),
expires_at: Instant::now() + ttl,
};
let token_id = Uuid::new_v4().to_string();
self.tokens.write().await.insert(token_id.clone(), token);
Ok(token_id)
}
/// 验证令牌
pub async fn verify_token(&self, token_id: &str) -> Result<SessionToken, AuthError> {
let tokens = self.tokens.read().await;
let token = tokens.get(token_id)
.ok_or(AuthError::InvalidToken)?;
if token.expires_at < Instant::now() {
return Err(AuthError::TokenExpired);
}
Ok(token.clone())
}
/// 撤销令牌
pub async fn revoke_token(&self, token_id: &str) {
self.tokens.write().await.remove(token_id);
}
}
/// 会话令牌
#[derive(Debug, Clone)]
pub struct SessionToken {
pub session_id: String,
pub secret: Vec<u8>,
pub issued_at: Instant,
pub expires_at: Instant,
}
/// 密钥管理器
pub struct KeyManager {
master_key: Vec<u8>,
}
impl KeyManager {
pub fn new() -> Result<Self, CryptoError> {
let master_key = Self::generate_key()?;
Ok(Self { master_key })
}
fn generate_key() -> Result<Vec<u8>, CryptoError> {
let mut key = vec![0u8; 32];
getrandom::getrandom(&mut key)
.map_err(|e| CryptoError::KeyGenerationFailed(e.to_string()))?;
Ok(key)
}
pub fn generate_secret(&self) -> Result<Vec<u8>, CryptoError> {
Self::generate_key()
}
}
#[derive(Debug, thiserror::Error)]
pub enum AuthError {
#[error("无效的令牌")]
InvalidToken,
#[error("令牌已过期")]
TokenExpired,
#[error("加密错误: {0}")]
CryptoError(#[from] CryptoError),
}
#[derive(Debug, thiserror::Error)]
pub enum CryptoError {
#[error("密钥生成失败: {0}")]
KeyGenerationFailed(String),
#[error("加密失败: {0}")]
EncryptionFailed(String),
#[error("解密失败: {0}")]
DecryptionFailed(String),
}
```
### 9.2 数据加密
#### 9.2.1 SRTP 加密
```rust
/// SRTP 加密器
pub struct SrtpEncryptor {
/// 加密密钥
key: Vec<u8>,
/// 盐
salt: Vec<u8>,
/// 序列号
sequence_number: u32,
}
impl SrtpEncryptor {
pub fn new(key: Vec<u8>, salt: Vec<u8>) -> Self {
Self {
key,
salt,
sequence_number: 0,
}
}
/// 加密 RTP 数据包
pub fn encrypt(&mut self, packet: &mut RtpPacket) -> Result<(), CryptoError> {
// 使用 AES-GCM 加密
let nonce = self.generate_nonce(packet.header.sequence_number);
let cipher = aes_gcm::AesGcm::<aes::Aes256>::new_from_slice(&self.key)?;
let ciphertext = cipher.encrypt(&nonce, packet.payload.as_ref(), &[])
.map_err(|e| CryptoError::EncryptionFailed(e.to_string()))?;
packet.payload = Bytes::from(ciphertext);
Ok(())
}
/// 解密 RTP 数据包
pub fn decrypt(&mut self, packet: &mut RtpPacket) -> Result<(), CryptoError> {
let nonce = self.generate_nonce(packet.header.sequence_number);
let cipher = aes_gcm::AesGcm::<aes::Aes256>::new_from_slice(&self.key)?;
let plaintext = cipher.decrypt(&nonce, packet.payload.as_ref())
.map_err(|e| CryptoError::DecryptionFailed(e.to_string()))?;
packet.payload = Bytes::from(plaintext);
Ok(())
}
fn generate_nonce(&self, sequence_number: u16) -> Vec<u8> {
// 结合盐和序列号生成 nonce
let mut nonce = self.salt.clone();
let seq_bytes = sequence_number.to_be_bytes();
nonce.extend_from_slice(&seq_bytes);
nonce.truncate(12); // AES-GCM 需要 12 字节 nonce
nonce
}
}
```
### 9.3 安全审计
#### 9.3.1 审计日志
```rust
/// 审计日志记录器
pub struct AuditLogger {
/// 日志写入器
writer: AuditLogWriter,
/// 配置
config: AuditConfig,
}
/// 审计日志条目
#[derive(Debug, Serialize)]
pub struct AuditEntry {
/// 时间戳
pub timestamp: DateTime<Utc>,
/// 会话 ID
pub session_id: String,
/// 用户 ID
pub user_id: String,
/// 事件类型
pub event_type: AuditEventType,
/// 事件详情
pub details: serde_json::Value,
/// IP 地址
pub ip_address: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditEventType {
/// 会话创建
SessionCreated,
/// 会话结束
SessionEnded,
/// 认证成功
AuthenticationSuccess,
/// 认证失败
AuthenticationFailure,
/// 权限拒绝
AccessDenied,
/// 配置更改
ConfigChanged,
/// 错误
Error,
}
impl AuditLogger {
pub fn new(config: AuditConfig) -> Result<Self, AuditError> {
let writer = AuditLogWriter::new(&config.log_path)?;
Ok(Self { writer, config })
}
/// 记录审计事件
pub fn log(&self, entry: AuditEntry) -> Result<(), AuditError> {
let json = serde_json::to_string(&entry)?;
self.writer.write(&json)?;
// 同时输出到标准错误
eprintln!("[AUDIT] {}", json);
Ok(())
}
/// 记录会话创建
pub fn log_session_created(
&self,
session_id: String,
user_id: String,
ip_address: Option<String>,
) {
let entry = AuditEntry {
timestamp: Utc::now(),
session_id,
user_id,
event_type: AuditEventType::SessionCreated,
details: serde_json::json!({}),
ip_address,
};
if let Err(e) = self.log(entry) {
error!("审计日志写入失败: {:?}", e);
}
}
}
```
### 9.4 防护措施
#### 9.4.1 速率限制
```rust
/// 速率限制器
pub struct RateLimiter {
/// 令牌桶
bucket: TokenBucket,
/// 最大突发
max_burst: u32,
}
/// 令牌桶
struct TokenBucket {
/// 容量
capacity: u32,
/// 令牌数
tokens: u32,
/// 速率 (tokens/秒)
rate: u32,
/// 最后更新
last_update: Instant,
}
impl RateLimiter {
pub fn new(rate: u32, max_burst: u32) -> Self {
Self {
bucket: TokenBucket {
capacity: max_burst,
tokens: max_burst,
rate,
last_update: Instant::now(),
},
max_burst,
}
}
/// 检查是否允许
pub fn check(&mut self) -> bool {
self.bucket.try_consume(1)
}
/// 检查是否允许消耗多个令牌
pub fn check_n(&mut self, n: u32) -> bool {
self.bucket.try_consume(n)
}
}
impl TokenBucket {
fn try_consume(&mut self, n: u32) -> bool {
// 更新令牌数
self.refill();
if self.tokens >= n {
self.tokens -= n;
true
} else {
false
}
}
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_update).as_secs_f64();
self.last_update = now;
let new_tokens = (elapsed * self.rate as f64) as u32;
self.tokens = (self.tokens + new_tokens).min(self.capacity);
}
}
```
## 10. 测试策略
### 10.1 测试层次
#### 10.1.1 测试金字塔
```
/\
/ \
/ E2E \ (端到端测试, 10%)
/--------\
/ 集成 \ (集成测试, 20%)
/------------\
/ 单元 \ (单元测试, 70%)
/----------------\
```
### 10.2 测试用例
#### 10.2.1 单元测试示例
```rust
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dma_buf_handle_creation() {
let handle = DmaBufHandle {
fd: 42,
size: 1920 * 1080 * 4,
stride: 1920 * 4,
offset: 0,
id: None,
};
assert_eq!(handle.fd, 42);
assert_eq!(handle.size, 1920 * 1080 * 4);
}
#[test]
fn test_damage_tracker_full_frame() {
let mut tracker = DamageTracker::new(100, 10);
let frame = create_test_frame(1920, 1080);
let regions = tracker.update(&frame);
assert_eq!(regions.len(), 1);
assert_eq!(regions[0].width, 1920);
assert_eq!(regions[0].height, 1080);
}
#[test]
fn test_frame_pool_allocation() {
let mut pool = ObjectPool::new(
|| Vec::with_capacity(1024),
|v: &mut Vec<u8>| v.clear(),
10,
20,
);
let vec1 = pool.acquire();
let vec2 = pool.acquire();
pool.release(vec1);
pool.release(vec2);
assert_eq!(pool.idle.len(), 2);
}
fn create_test_frame(width: u32, height: u32) -> CapturedFrame {
CapturedFrame {
dma_buf: DmaBufHandle {
fd: 0,
size: (width * height * 4) as usize,
stride: width * 4,
offset: 0,
id: None,
},
width,
height,
format: PixelFormat::RGBA,
timestamp: 0,
frame_number: 0,
damaged_regions: vec![],
}
}
}
```
### 10.3 性能测试
#### 10.3.1 基准测试
```rust
#[cfg(test)]
mod benchmarks {
use super::*;
use std::time::Instant;
#[bench]
fn bench_encode_frame(b: &mut Bencher) {
let mut encoder = VaapiEncoder::new(create_test_config()).unwrap();
let frame = create_test_frame(1920, 1080);
b.iter(|| {
encoder.encode(frame.clone()).unwrap()
});
}
#[bench]
fn bench_rtp_packetize(b: &mut Bencher) {
let mut packetizer = H264RtpPacketizer::new(1200);
let frame = create_encoded_frame(50000);
b.iter(|| {
packetizer.packetize(&frame.data, 0, false)
});
}
fn create_test_config() -> EncoderConfig {
EncoderConfig {
encoder_type: EncoderType::H264_VAAPI,
width: 1920,
height: 1080,
frame_rate: 60,
bitrate: 4000000,
max_bitrate: 8000000,
min_bitrate: 1000000,
keyframe_interval: 30,
preset: EncodePreset::Ultrafast,
tune: EncodeTune::Zerolatency,
}
}
fn create_encoded_frame(size: usize) -> EncodedFrame {
EncodedFrame {
data: Bytes::from(vec![0u8; size]),
is_keyframe: false,
timestamp: 0,
sequence_number: 0,
rtp_timestamp: 0,
frame_type: FrameType::P,
encoding_params: EncodingParams {
bitrate: 4000000,
qp: 26,
encode_latency_ms: 5.0,
},
}
}
}
```
### 10.4 持续集成
#### 10.4.1 CI 配置示例
```yaml
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libva-dev vainfo
- name: Cache Rust
uses: actions/cache@v3
with:
path: ~/.cargo/registry
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- name: Run tests
run: cargo test --all --verbose
- name: Run benchmarks
run: cargo bench --no-run
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Clippy
run: cargo clippy -- -D warnings
- name: Format check
run: cargo fmt -- --check
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build release
run: cargo build --release --all
```
## 11. 部署运维
### 11.1 部署方案
#### 11.1.1 生产环境架构
```
┌────────────────┐
│ 负载均衡器 │
│ (Nginx/HAProxy)│
└────────┬───────┘
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 实例 1 │ │ 实例 2 │ │ 实例 3 │
│ (Rust后端) │ │ (Rust后端) │ │ (Rust后端) │
└───────────┘ └───────────┘ └───────────┘
│ │ │
└────────────────────┼────────────────────┘
┌────────▼───────┐
│ Redis 缓存 │
│ (会话状态) │
└────────────────┘
┌────────▼───────┐
│ PostgreSQL │
│ (持久化数据) │
└────────────────┘
```
### 11.2 配置管理
#### 11.2.1 配置文件格式
```toml
# config.toml
[server]
bind_addr = "0.0.0.0:8080"
signaling_addr = "0.0.0.0:8443"
max_sessions = 20
[encoder]
default_encoder_type = "h264_vaapi"
default_bitrate = 4000000
max_bitrate = 8000000
min_bitrate = 1000000
keyframe_interval = 30
[webrtc]
stun_servers = ["stun:stun.l.google.com:19302"]
# turn_servers = []
max_bitrate = 8000000
min_bitrate = 500000
start_bitrate = 4000000
[performance]
target_latency_ms = 25
max_buffer_size = 10
min_buffer_size = 2
[logging]
level = "info"
file = "/var/log/wl-webrtc/app.log"
max_size = "100MB"
max_files = 10
[monitoring]
enabled = true
metrics_port = 9090
health_check_port = 8081
[security]
enable_authentication = true
token_ttl_hours = 24
enable_audit_log = true
audit_log_path = "/var/log/wl-webrtc/audit.log"
```
### 11.3 监控告警
#### 11.3.1 Prometheus 指标
```rust
/// 指标收集器
pub struct MetricsCollector {
/// HTTP 请求计数器
http_requests_total: IntCounterVec,
/// 请求延迟直方图
request_duration_seconds: HistogramVec,
/// 活跃会话数
active_sessions: IntGauge,
/// 编码帧计数器
frames_encoded_total: IntCounterVec,
/// 延迟指标
latency_seconds: HistogramVec,
}
impl MetricsCollector {
pub fn new() -> Self {
let opts = Opts::new("wl_webrtc", "Wayland WebRTC Remote Desktop");
let http_requests_total = register_int_counter_vec!(
opts.clone(),
&["method", "endpoint", "status"],
"Total HTTP requests"
).unwrap();
let request_duration_seconds = register_histogram_vec!(
histogram_opts!("request_duration_seconds", "Request duration in seconds"),
&["method", "endpoint"]
).unwrap();
let active_sessions = register_int_gauge!(
"active_sessions",
"Number of active sessions"
).unwrap();
let frames_encoded_total = register_int_counter_vec!(
opts.clone(),
&["encoder_type", "frame_type"],
"Total frames encoded"
).unwrap();
let latency_seconds = register_histogram_vec!(
histogram_opts!("latency_seconds", "Latency in seconds"),
&["category"]
).unwrap();
Self {
http_requests_total,
request_duration_seconds,
active_sessions,
frames_encoded_total,
latency_seconds,
}
}
pub fn increment_active_sessions(&self) {
self.active_sessions.inc();
}
pub fn decrement_active_sessions(&self) {
self.active_sessions.dec();
}
pub fn record_frame_encoded(&self, encoder_type: &str, frame_type: &str) {
self.frames_encoded_total
.with_label_values(&[encoder_type, frame_type])
.inc();
}
pub fn record_latency(&self, category: &str, seconds: f64) {
self.latency_seconds
.with_label_values(&[category])
.observe(seconds);
}
}
```
### 11.4 日志管理
#### 11.4.1 日志配置
```rust
/// 初始化日志系统
pub fn init_logging(config: &LoggingConfig) -> Result<(), LogError> {
let level = config.level.parse::<LevelFilter>()?;
// 控制台输出
let console_layer = fmt::layer()
.with_target(false)
.with_level(true)
.with_filter(level);
// 文件输出
let file_appender = RollingFileAppender::new(
Rotation::DAILY,
config.log_path.as_path(),
)?;
let (non_blocking, _guard) = non_blocking(file_appender);
let file_layer = fmt::layer()
.with_writer(non_blocking)
.with_filter(level);
// 性能指标
let perf_layer = tracing_subscriber::filter::Targets::new()
.with_target("wl_webrtc::performance", Level::DEBUG);
// 初始化
Registry::default()
.with(console_layer)
.with(file_layer)
.with(perf_layer)
.init();
Ok(())
}
```
### 11.5 故障处理
#### 11.5.1 常见问题排查
```markdown
## 常见问题排查手册
### 1. 高延迟问题
**症状:** 端到端延迟超过 100ms
**可能原因:**
- 网络带宽不足
- 编码器负载过高
- 缓冲区过大
**排查步骤:**
1. 检查网络带宽使用情况
```bash
iftop -i eth0
```
2. 查看编码器统计信息
```bash
curl http://localhost:9090/metrics | grep encode_latency
```
3. 调整缓冲区大小
```toml
[performance]
max_buffer_size = 5 # 从 10 减少到 5
```
### 2. 画面卡顿
**症状:** 帧率不稳定,画面卡顿
**可能原因:**
- 丢包率过高
- CPU/GPU 负载过高
- 编码器配置不当
**排查步骤:**
1. 检查丢包率
```bash
curl http://localhost:9090/metrics | grep packet_loss_rate
```
2. 检查系统资源
```bash
top -p $(pgrep wl-webrtc)
```
3. 检查编码器状态
```bash
journalctl -u wl-webrtc | grep encoder
```
### 3. 无法建立连接
**症状:** WebSocket 连接失败
**可能原因:**
- 端口被占用
- 防火墙阻止
- 证书问题
**排查步骤:**
1. 检查端口监听
```bash
netstat -tlnp | grep 8443
```
2. 检查防火墙
```bash
sudo iptables -L -n | grep 8443
```
3. 查看应用日志
```bash
journalctl -u wl-webrtc -f
```
```
## 12. 扩展设计
### 12.1 插件机制
#### 12.1.1 插件接口
```rust
/// 插件 trait
pub trait Plugin: Send + Sync {
/// 插件名称
fn name(&self) -> &str;
/// 插件版本
fn version(&self) -> &str;
/// 初始化插件
fn init(&mut self, context: &PluginContext) -> Result<(), PluginError>;
/// 关闭插件
fn shutdown(&mut self) -> Result<(), PluginError>;
/// 处理帧前回调
fn on_frame_before_encode(&self, frame: &mut CapturedFrame) -> Result<(), PluginError>;
/// 处理帧后回调
fn on_frame_after_encode(&self, frame: &EncodedFrame) -> Result<(), PluginError>;
}
/// 插件上下文
pub struct PluginContext {
/// 配置
pub config: Arc<RwLock<Config>>,
/// 会话管理器
pub session_manager: Arc<SessionManager>,
}
/// 插件管理器
pub struct PluginManager {
/// 加载的插件
plugins: Vec<Box<dyn Plugin>>,
/// 上下文
context: PluginContext,
}
impl PluginManager {
pub fn new(context: PluginContext) -> Self {
Self {
plugins: Vec::new(),
context,
}
}
/// 加载插件
pub fn load_plugin(&mut self, plugin: Box<dyn Plugin>) -> Result<(), PluginError> {
plugin.init(&self.context)?;
self.plugins.push(plugin);
Ok(())
}
/// 处理帧前回调
pub fn on_frame_before_encode(&self, frame: &mut CapturedFrame) -> Result<(), PluginError> {
for plugin in &self.plugins {
plugin.on_frame_before_encode(frame)?;
}
Ok(())
}
/// 处理帧后回调
pub fn on_frame_after_encode(&self, frame: &EncodedFrame) -> Result<(), PluginError> {
for plugin in &self.plugins {
plugin.on_frame_after_encode(frame)?;
}
Ok(())
}
}
```
### 12.2 版本管理
#### 12.2.1 版本兼容性
```rust
/// 版本信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionInfo {
/// 主版本
pub major: u8,
/// 次版本
pub minor: u8,
/// 补丁版本
pub patch: u8,
/// 预发布标识
pub prerelease: Option<String>,
/// 构建元数据
pub build_metadata: Option<String>,
}
impl VersionInfo {
pub fn current() -> Self {
Self {
major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
patch: env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
prerelease: None,
build_metadata: None,
}
}
/// 检查兼容性
pub fn is_compatible_with(&self, other: &VersionInfo) -> bool {
// 主版本相同,次版本 >=
self.major == other.major && self.minor >= other.minor
}
}
/// 版本协商
pub struct VersionNegotiator;
impl VersionNegotiator {
/// 选择最佳版本
pub fn select_version(
client_versions: &[VersionInfo],
server_version: &VersionInfo,
) -> Option<VersionInfo> {
client_versions
.iter()
.filter(|v| server_version.is_compatible_with(v))
.max_by_key(|v| (v.major, v.minor, v.patch))
.cloned()
}
}
```
### 12.3 扩展点
#### 12.3.1 自定义编码器
```rust
/// 自定义编码器工厂
pub trait EncoderFactory: Send + Sync {
/// 创建编码器实例
fn create_encoder(&self, config: EncoderConfig) -> Result<Box<dyn VideoEncoder>, EncoderError>;
/// 检查是否支持
fn is_supported(&self) -> bool;
}
/// 自定义编码器示例
pub struct CustomEncoder {
// ...
}
#[async_trait]
impl VideoEncoder for CustomEncoder {
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
// 自定义编码逻辑
Ok(EncodedFrame {
data: Bytes::new(),
is_keyframe: false,
timestamp: frame.timestamp,
sequence_number: 0,
rtp_timestamp: 0,
frame_type: FrameType::P,
encoding_params: EncodingParams {
bitrate: 0,
qp: 0,
encode_latency_ms: 0.0,
},
})
}
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
Ok(())
}
fn stats(&self) -> EncoderStats {
EncoderStats::default()
}
fn capabilities(&self) -> EncoderCapabilities {
EncoderCapabilities::default()
}
}
```
## 附录
### A. 术语表
| 术语 | 全称 | 说明 |
|------|------|------|
| DMA-BUF | Direct Memory Access Buffer | Linux 内核提供的零拷贝缓冲区机制 |
| VA-API | Video Acceleration API | 视频加速 API用于硬件加速 |
| NVENC | NVIDIA Encoder | NVIDIA GPU 硬件编码器 |
| WebRTC | Web Real-Time Communication | Web 实时通信标准 |
| SDP | Session Description Protocol | 会话描述协议 |
| ICE | Interactive Connectivity Establishment | 交互式连接建立 |
| STUN | Session Traversal Utilities for NAT | NAT 穿透工具 |
| TURN | Traversal Using Relays around NAT | 使用中继的 NAT 穿透 |
| RTP | Real-time Transport Protocol | 实时传输协议 |
| RTCP | Real-time Control Protocol | 实时控制协议 |
| DTLS | Datagram Transport Layer Security | 数据报传输层安全 |
| SRTP | Secure Real-time Transport Protocol | 安全实时传输协议 |
| NACK | Negative Acknowledgment | 否定确认 |
| FEC | Forward Error Correction | 前向纠错 |
| GOP | Group of Pictures | 图像组 |
| PSNR | Peak Signal-to-Noise Ratio | 峰值信噪比 |
| SSIM | Structural Similarity Index | 结构相似性指数 |
### B. 参考资料
#### 技术规范
- [WebRTC 规范](https://www.w3.org/TR/webrtc/)
- [RTP/RTCP 规范](https://tools.ietf.org/html/rfc3550)
- [H.264 规范](https://www.itu.int/rec/T-REC-H.264)
- [Wayland 协议](https://wayland.freedesktop.org/)
- [PipeWire 文档](https://pipewire.org/)
#### 库和框架
- [webrtc-rs](https://github.com/webrtc-rs/webrtc)
- [tokio](https://tokio.rs/)
- [tracing](https://docs.rs/tracing)
- [bytes](https://docs.rs/bytes)
#### 相关项目
- [Deskreen](https://deskreen.com/)
- [RustDesk](https://rustdesk.com/)
- [Sunshine](https://github.com/LizardByte/Sunshine)
### C. 配置示例
#### C.1 完整配置文件
```toml
# wl-webrtc.toml
[server]
# 服务器绑定地址
bind_addr = "0.0.0.0:8080"
# 信令服务器地址
signaling_addr = "0.0.0.0:8443"
# 最大并发会话数
max_sessions = 20
# 会话超时时间 (秒)
session_timeout = 300
[capture]
# 目标帧率
target_frame_rate = 60
# 损坏跟踪
enable_damage_tracking = true
# 损坏阈值 (像素)
damage_threshold = 100
# 最大损坏区域数
max_damaged_regions = 4
[encoder]
# 默认编码器: h264_vaapi, h264_nvenc, h264_x264
default_encoder_type = "h264_vaapi"
# 默认比特率 (bps)
default_bitrate = 4000000
# 最大比特率 (bps)
max_bitrate = 8000000
# 最小比特率 (bps)
min_bitrate = 1000000
# 关键帧间隔
keyframe_interval = 30
# 编码预设: ultrafast, superfast, veryfast
preset = "veryfast"
# 编码调优: zerolatency, film, animation
tune = "zerolatency"
[webrtc]
# STUN 服务器
stun_servers = [
"stun:stun.l.google.com:19302",
]
# TURN 服务器 (可选)
turn_servers = []
# 最大比特率
max_bitrate = 8000000
# 最小比特率
min_bitrate = 500000
# 起始比特率
start_bitrate = 4000000
# 播放延迟最小值 (ms)
playout_delay_min_ms = 0
# 播放延迟最大值 (ms)
playout_delay_max_ms = 20
# 启用 NACK
nack_enabled = true
# 启用 FEC
fec_enabled = false
# 拥塞控制: gcc, twcc
congestion_control = "gcc"
[performance]
# 目标延迟 (ms)
target_latency_ms = 25
# 最大缓冲区大小
max_buffer_size = 10
# 最小缓冲区大小
min_buffer_size = 2
# 初始缓冲区大小
initial_buffer_size = 3
# 性能监控间隔 (ms)
monitoring_interval_ms = 500
[logging]
# 日志级别: trace, debug, info, warn, error
level = "info"
# 日志文件路径
file = "/var/log/wl-webrtc/app.log"
# 最大日志文件大小
max_size = "100MB"
# 最大日志文件数量
max_files = 10
# 控制台输出
console = true
[monitoring]
# 启用监控
enabled = true
# Prometheus 指标端口
metrics_port = 9090
# 健康检查端口
health_check_port = 8081
# 性能分析端口
profiling_port = 0 # 0 表示禁用
[security]
# 启用认证
enable_authentication = true
# 令牌 TTL (小时)
token_ttl_hours = 24
# 启用审计日志
enable_audit_log = true
# 审计日志路径
audit_log_path = "/var/log/wl-webrtc/audit.log"
# 启用速率限制
enable_rate_limit = true
# 每秒最大请求数
max_requests_per_second = 100
[hardware]
# 启用硬件加速
hardware_acceleration = true
# VA-API 设备
va_device = "/dev/dri/renderD128"
# 首选编码器优先级
encoder_priority = ["h264_nvenc", "h264_vaapi", "h264_x264"]
[experimental]
# 启用实验性功能
enable_experimental = false
# 使用自定义 WebRTC 实现
use_custom_webrtc = false
# 启用性能分析
enable_profiling = false
```
---
**文档版本**: 1.0.0
**最后更新**: 2026-02-02
**维护者**: wl-webrtc 团队