6037 lines
166 KiB
Plaintext
6037 lines
166 KiB
Plaintext
# Wayland → WebRTC 远程桌面后端详细设计文档
|
||
|
||
## 目录
|
||
|
||
## 1. 系统概述
|
||
### 1.1 项目背景
|
||
### 1.2 目标和约束
|
||
### 1.3 性能目标
|
||
|
||
## 2. 架构设计
|
||
### 2.1 整体架构
|
||
### 2.2 模块划分
|
||
### 2.3 模块交互
|
||
### 2.4 数据流
|
||
### 2.5 时序图
|
||
|
||
## 3. 详细组件设计
|
||
### 3.1 捕获模块
|
||
### 3.2 编码模块
|
||
### 3.3 WebRTC 传输模块
|
||
### 3.4 缓冲管理模块
|
||
### 3.5 信令模块
|
||
|
||
## 4. 数据结构设计
|
||
### 4.1 核心数据结构
|
||
### 4.2 内存管理
|
||
### 4.3 状态管理
|
||
|
||
## 5. 接口设计
|
||
### 5.1 公共 API
|
||
### 5.2 内部接口
|
||
### 5.3 错误处理接口
|
||
|
||
## 6. 性能优化
|
||
### 6.1 性能指标
|
||
### 6.2 优化策略
|
||
### 6.3 性能监控
|
||
### 6.4 调优指南
|
||
|
||
## 7. 并发设计
|
||
### 7.1 线程模型
|
||
### 7.2 同步机制
|
||
### 7.3 任务调度
|
||
### 7.4 锁策略
|
||
|
||
## 8. 网络设计
|
||
### 8.1 协议栈
|
||
### 8.2 数据格式
|
||
### 8.3 网络优化
|
||
### 8.4 错误处理
|
||
|
||
## 9. 安全设计
|
||
### 9.1 认证授权
|
||
### 9.2 数据加密
|
||
### 9.3 安全审计
|
||
### 9.4 防护措施
|
||
|
||
## 10. 测试策略
|
||
### 10.1 测试层次
|
||
### 10.2 测试用例
|
||
### 10.3 性能测试
|
||
### 10.4 持续集成
|
||
|
||
## 11. 部署运维
|
||
### 11.1 部署方案
|
||
### 11.2 配置管理
|
||
### 11.3 监控告警
|
||
### 11.4 日志管理
|
||
### 11.5 故障处理
|
||
|
||
## 12. 扩展设计
|
||
### 12.1 插件机制
|
||
### 12.2 版本管理
|
||
### 12.3 扩展点
|
||
|
||
## 附录
|
||
### A. 术语表
|
||
### B. 参考资料
|
||
### C. 配置示例
|
||
|
||
---
|
||
|
||
## 1. 系统概述
|
||
|
||
### 1.1 项目背景
|
||
|
||
Wayland → WebRTC 远程桌面后端是一个高性能、低延迟的远程桌面解决方案,旨在通过 WebRTC 技术将 Wayland 桌面会话实时传输到 Web 浏览器。该系统充分利用现代 Linux 图形栈的优势,通过零拷贝技术实现超低延迟的桌面共享体验。
|
||
|
||
**关键特性:**
|
||
- 基于 Wayland 协议的现代图形栈支持
|
||
- PipeWire xdg-desktop-portal 权限管理
|
||
- 硬件加速的视频编码(VA-API、NVENC)
|
||
- 零拷贝的内存管理(DMA-BUF)
|
||
- WebRTC 标准协议支持
|
||
- 自适应码率和质量控制
|
||
- 损坏区域检测和部分更新编码
|
||
|
||
### 1.2 目标和约束
|
||
|
||
**功能目标:**
|
||
1. 支持 Wayland 桌面的实时屏幕捕获
|
||
2. 通过 WebRTC 实现端到端的视频传输
|
||
3. 支持多种视频编码格式(H.264、H.265、VP9)
|
||
4. 提供可靠的输入回传机制
|
||
5. 支持多客户端并发连接
|
||
|
||
**性能目标:**
|
||
- 端到端延迟:本地网络 < 25ms,广域网 < 100ms
|
||
- 帧率:支持 30-60 FPS
|
||
- 分辨率:最高支持 4K 分辨率
|
||
- 吞吐量:单会话支持最高 8 Mbps
|
||
|
||
**技术约束:**
|
||
- 操作系统:Linux(Wayland 环境)
|
||
- GPU:支持 VA-API(Intel/AMD)或 NVENC(NVIDIA)
|
||
- 浏览器:支持 WebRTC 的现代浏览器
|
||
- 网络:支持 UDP 传输和 NAT 穿透
|
||
|
||
**资源约束:**
|
||
- CPU 使用率:单核 < 30%(硬件编码)
|
||
- 内存占用:< 500MB
|
||
- GPU 显存:< 2GB
|
||
|
||
### 1.3 性能目标
|
||
|
||
**延迟目标:**
|
||
|
||
| 场景 | 目标延迟 | 优选延迟 | 最大可接受延迟 |
|
||
|------|---------|---------|--------------|
|
||
| 局域网(硬件编码) | 15-20ms | 10-15ms | 30ms |
|
||
| 局域网(软件编码) | 30-40ms | 25-30ms | 60ms |
|
||
| 广域网(低延迟模式) | 40-60ms | 30-40ms | 100ms |
|
||
| 广域网(标准模式) | 60-100ms | 50-80ms | 150ms |
|
||
|
||
**质量目标:**
|
||
|
||
| 分辨率 | 帧率 | 比特率 | 目标质量 |
|
||
|--------|------|--------|---------|
|
||
| 1920x1080 | 60 FPS | 4 Mbps | 高清 |
|
||
| 1920x1080 | 30 FPS | 2 Mbps | 标准 |
|
||
| 3840x2160 | 30 FPS | 8 Mbps | 超高清 |
|
||
| 1280x720 | 60 FPS | 1.5 Mbps | 流畅 |
|
||
|
||
**吞吐量目标:**
|
||
- 单服务器并发会话数:10-20
|
||
- 单会话最大数据包速率:2000 pps(包/秒)
|
||
- 网络带宽利用率:> 90%(在限制内)
|
||
|
||
**资源目标:**
|
||
- 帧捕获时间:< 2ms
|
||
- 编码延迟:< 8ms(硬件),< 20ms(软件)
|
||
- WebRTC 打包延迟:< 3ms
|
||
- 网络传输延迟:< 5ms(局域网)
|
||
|
||
---
|
||
|
||
## 2. 架构设计
|
||
|
||
### 2.1 整体架构
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ 客户端浏览器 │
|
||
│ (WebRTC 接收端) │
|
||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||
│ │ 视频 │ │ 音频 │ │ 输入 │ │ 控制 │ │
|
||
│ │ 解码 │ │ 解码 │ │ 处理 │ │ 管理 │ │
|
||
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
|
||
└─────────────────────────────┬────────────────────────────────────────────┘
|
||
│ WebRTC 媒体流 (RTP/RTCP)
|
||
│ WebRTC 数据通道
|
||
│ WebSocket 信令
|
||
▼
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ 信令服务器 │
|
||
│ (WebSocket/WebSocket Secure) │
|
||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||
│ │ 会话管理 │ │ SDP交换 │ │ ICE候选 │ │ 认证授权 │ │
|
||
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
|
||
└─────────────────────────────┬────────────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ Rust 后端服务器 │
|
||
├──────────────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
|
||
│ │ 捕获管理器 │ │ 编码器管道 │ │ WebRTC传输层 │ │
|
||
│ ├──────────────────┤ ├──────────────────┤ ├──────────────────┤ │
|
||
│ │ • PipeWire接口 │───▶│ • 视频编码器 │───▶│ • PeerConnection │ │
|
||
│ │ • DMA-BUF获取 │ │ • 码率控制 │ │ • RTP打包 │ │
|
||
│ │ • 损坏跟踪 │ │ • 帧率控制 │ │ • ICE处理 │ │
|
||
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
|
||
│ │ │ │ │
|
||
│ ▼ ▼ ▼ │
|
||
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
|
||
│ │ 缓冲管理器 │ │ 输入处理器 │ │ 数据通道管理 │ │
|
||
│ ├──────────────────┤ ├──────────────────┤ ├──────────────────┤ │
|
||
│ │ • DMA-BUF池 │ │ • 鼠标事件 │ │ • 控制消息 │ │
|
||
│ │ • 编码缓冲池 │ │ • 键盘事件 │ │ • 输入事件 │ │
|
||
│ │ • 内存所有权 │ │ • 事件转换 │ │ • 状态同步 │ │
|
||
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
|
||
│ │
|
||
│ ┌────────────────────────────────────────────────────────────────┐ │
|
||
│ │ 零拷贝缓冲区管理器 │ │
|
||
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
|
||
│ │ │ DMA-BUF池 │ │ 共享内存池 │ │ 字节缓冲池 │ │ │
|
||
│ │ │ (GPU内存) │ │ (跨进程) │ │ (CPU内存) │ │ │
|
||
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
|
||
│ └────────────────────────────────────────────────────────────────┘ │
|
||
└─────────────────────────────┬────────────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ Wayland 合成器 │
|
||
│ (PipeWire 屏幕共享 / KDE Plasma / GNOME) │
|
||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||
│ │ 窗口管理 │ │ 合成渲染 │ │ 输入处理 │ │ 输出管理 │ │
|
||
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
|
||
└──────────────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 2.2 模块划分
|
||
|
||
#### 2.2.1 捕获管理模块(Capture Manager)
|
||
|
||
**职责:**
|
||
- 与 PipeWire 和 xdg-desktop-portal 交互
|
||
- 获取屏幕捕获权限
|
||
- 接收 DMA-BUF 格式的帧数据
|
||
- 跟踪屏幕损坏区域
|
||
- 管理捕获会话生命周期
|
||
|
||
**子模块:**
|
||
1. `PipeWireClient`: PipeWire 协议客户端
|
||
2. `PortalClient`: xdg-desktop-portal 客户端
|
||
3. `DamageTracker`: 损坏区域跟踪器
|
||
4. `FrameAcquisition`: 帧获取协调器
|
||
|
||
#### 2.2.2 编码器模块(Encoder Module)
|
||
|
||
**职责:**
|
||
- 视频帧编码(H.264/H.265/VP9)
|
||
- 硬件加速支持(VA-API、NVENC)
|
||
- 自适应码率控制
|
||
- 帧率调整
|
||
- 关键帧请求
|
||
|
||
**子模块:**
|
||
1. `EncoderFactory`: 编码器工厂
|
||
2. `VaapiEncoder`: VA-API 硬件编码器
|
||
3. `NvencEncoder`: NVENC 硬件编码器
|
||
4. `SoftwareEncoder`: 软件编码器后备
|
||
5. `BitrateController`: 码率控制器
|
||
6. `FrameRateController`: 帧率控制器
|
||
|
||
#### 2.2.3 WebRTC 传输模块(WebRTC Transport Module)
|
||
|
||
**职责:**
|
||
- WebRTC 对等连接管理
|
||
- RTP/RTCP 数据包处理
|
||
- ICE/STUN/TURN 协议处理
|
||
- 拥塞控制
|
||
- 丢包恢复
|
||
|
||
**子模块:**
|
||
1. `PeerConnectionManager`: 对等连接管理器
|
||
2. `RtpPacketizer`: RTP 打包器
|
||
3. `IceManager`: ICE 候选管理器
|
||
4. `CongestionController`: 拥塞控制器
|
||
5. `NackHandler`: NACK 处理器
|
||
6. `DataChannelManager`: 数据通道管理器
|
||
|
||
#### 2.2.4 缓冲管理模块(Buffer Management Module)
|
||
|
||
**职责:**
|
||
- DMA-BUF 生命周期管理
|
||
- 内存池分配和回收
|
||
- 零拷贝所有权转移
|
||
- 内存泄漏防护
|
||
|
||
**子模块:**
|
||
1. `DmaBufPool`: DMA-BUF 池
|
||
2. `EncodedBufferPool`: 编码缓冲池
|
||
3. `SharedMemoryPool`: 共享内存池
|
||
4. `MemoryTracker`: 内存跟踪器
|
||
|
||
#### 2.2.5 信令模块(Signaling Module)
|
||
|
||
**职责:**
|
||
- WebSocket 连接管理
|
||
- SDP 会话描述交换
|
||
- ICE 候选传输
|
||
- 会话状态管理
|
||
|
||
**子模块:**
|
||
1. `WebSocketServer`: WebSocket 服务器
|
||
2. `SdpHandler`: SDP 处理器
|
||
3. `IceCandidateHandler`: ICE 候选处理器
|
||
4. `SessionManager`: 会话管理器
|
||
|
||
### 2.3 模块交互
|
||
|
||
**主要交互流程:**
|
||
|
||
```
|
||
[Wayland Compositor]
|
||
│
|
||
│ DMA-BUF (屏幕内容)
|
||
▼
|
||
[PipeWire]
|
||
│
|
||
│ 屏幕捕获流
|
||
▼
|
||
[捕获管理器] ────────────▶ [缓冲管理器]
|
||
│ │
|
||
│ 损坏区域信息 │ DMA-BUF 句柄
|
||
▼ ▼
|
||
[编码器模块] ◀──────────────┘
|
||
│
|
||
│ 编码后的数据 (Bytes)
|
||
▼
|
||
[WebRTC传输层]
|
||
│
|
||
│ RTP 数据包
|
||
▼
|
||
[网络] ─────────────────────▶ [客户端浏览器]
|
||
▲
|
||
│ 控制信息
|
||
│
|
||
[数据通道管理器]
|
||
│
|
||
│ 输入事件
|
||
▼
|
||
[输入处理器] ────────────▶ [Wayland Compositor]
|
||
▲
|
||
│ SDP/ICE 候选
|
||
│
|
||
[信令服务器] ◀─────────────▶ [客户端浏览器]
|
||
```
|
||
|
||
### 2.4 数据流
|
||
|
||
**详细数据流:**
|
||
|
||
```
|
||
阶段 1: 捕获
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: GPU 帧缓冲区 (Wayland 合成器输出)
|
||
处理:
|
||
1. PipeWire 获取帧缓冲区
|
||
2. 创建 DMA-BUF 文件描述符
|
||
3. 设置损坏区域标记
|
||
4. 生成时间戳
|
||
输出: CapturedFrame { dma_buf, width, height, format, timestamp }
|
||
拷贝: 无 (零拷贝)
|
||
延迟: 1-2ms
|
||
|
||
阶段 2: 缓冲管理
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: CapturedFrame (带 DMA-BUF FD)
|
||
处理:
|
||
1. 从 DMA-BUF 池获取空闲缓冲区
|
||
2. 验证 DMA-BUF 有效性
|
||
3. 转移所有权 (移动语义)
|
||
4. 更新内存跟踪
|
||
输出: DmaBufHandle { fd, size, stride, offset }
|
||
拷贝: 无 (所有权转移)
|
||
延迟: < 0.1ms
|
||
|
||
阶段 3: 编码
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: DmaBufHandle
|
||
处理:
|
||
1. 导入 DMA-BUF 到编码器 (硬件)
|
||
2. 执行编码操作
|
||
3. 输出编码数据到映射缓冲区
|
||
4. 生成编码帧元数据
|
||
输出: EncodedFrame { data: Bytes, is_keyframe, timestamp, seq_num }
|
||
拷贝: 无 (DMA-BUF 直接导入)
|
||
延迟:
|
||
- VA-API: 3-5ms
|
||
- NVENC: 2-4ms
|
||
- x264: 15-25ms
|
||
|
||
阶段 4: RTP 打包
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: EncodedFrame
|
||
处理:
|
||
1. 分片大帧到多个 RTP 包
|
||
2. 添加 RTP 头部
|
||
3. 设置时间戳和序列号
|
||
4. 关联数据包到 Bytes
|
||
输出: Vec<RtpPacket>
|
||
拷贝: 无 (Bytes 引用)
|
||
延迟: < 0.5ms
|
||
|
||
阶段 5: 网络传输
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: Vec<RtpPacket>
|
||
处理:
|
||
1. UDP 套接字发送
|
||
2. 内核空间拷贝
|
||
3. 网络协议栈处理
|
||
4. 物理传输
|
||
输出: UDP 数据包
|
||
拷贝: 一次 (内核空间)
|
||
延迟:
|
||
- 局域网: < 1ms
|
||
- 广域网: 10-100ms
|
||
|
||
阶段 6: 客户端接收
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
输入: UDP 数据包
|
||
处理:
|
||
1. 接收数据包
|
||
2. RTP 解包
|
||
3. 重组帧
|
||
4. 解码播放
|
||
输出: 显示输出
|
||
延迟:
|
||
- 解码: 2-5ms (硬件), 10-20ms (软件)
|
||
- 渲染: 1-2ms
|
||
|
||
总延迟 (局域网 + 硬件编码): 10-20ms
|
||
总延迟 (局域网 + 软件编码): 30-45ms
|
||
总延迟 (广域网 + 硬件编码): 40-100ms
|
||
```
|
||
|
||
### 2.5 时序图
|
||
|
||
#### 2.5.1 会话建立时序图
|
||
|
||
```
|
||
客户端 信令服务器 后端服务器 Wayland/编码器
|
||
│ │ │ │
|
||
│── 连接 WebSocket ──▶│ │ │
|
||
│◀─── 接受连接 ──────│ │ │
|
||
│ │ │ │
|
||
│── 开始捕获请求 ────▶│── 转发请求 ───────▶│ │
|
||
│ │ │── 请求权限 ──────▶│
|
||
│ │ │◀── 获取流句柄 ───│
|
||
│ │ │ │
|
||
│◀─── 流 ID ─────────│◀─── 流 ID ─────────│ │
|
||
│ │ │ │
|
||
│── 创建 Offer ──────▶│── 转发 Offer ─────▶│ │
|
||
│ │ │── 创建 PC ───────▶│
|
||
│ │ │◀── ICE 候选 ──────│
|
||
│◀─── Answer ────────│◀─── Answer ────────│ │
|
||
│ │ │ │
|
||
│◀── ICE 候选 ────────│◀── ICE 候选 ────────│ │
|
||
│── ICE 候选 ────────▶│── ICE 候选 ───────▶│ │
|
||
│ │ │ │
|
||
│◀─── 连接建立 ──────│◀─── 连接建立 ──────│ │
|
||
│ │ │ │
|
||
│ │ │── 开始捕获 ──────▶│
|
||
│ │ │◀── DMA-BUF 帧数据 ─│
|
||
│◀─── 视频 RTP ──────│ │── 编码 ──────────▶│
|
||
│ │ │◀── 编码数据 ──────│
|
||
│◀─── 视频 RTP ────────────────────────────│ │
|
||
│ │ │ │
|
||
│── 输入事件 ────────▶│── 转发输入 ───────▶│ │
|
||
│ │ │── 发送到 Wayland ─▶│
|
||
```
|
||
|
||
#### 2.5.2 帧处理时序图
|
||
|
||
```
|
||
PipeWire 捕获管理器 缓冲管理器 编码器 WebRTC 网络
|
||
│ │ │ │ │ │
|
||
│── DMA-BUF ──▶│ │ │ │ │
|
||
│ │ │ │ │ │
|
||
│ │── 创建帧 ─────▶│ │ │ │
|
||
│ │ │ │ │ │
|
||
│ │── 获取句柄 ───▶│ │ │ │
|
||
│ │◀── 句柄ID ────│ │ │ │
|
||
│ │ │ │ │ │
|
||
│ │── 编码请求 ───────────────▶│ │ │
|
||
│ │ │ │ │ │
|
||
│ │ │── 导入DMA ─▶│ │ │
|
||
│ │ │◀── 表面ID ─│ │ │
|
||
│ │ │ │ │ │
|
||
│ │ │ │── 编码 ──▶│ │
|
||
│ │ │ │◀── 数据 ──│ │
|
||
│ │ │ │ │ │
|
||
│ │◀── 编码帧 ──────────────────│ │ │
|
||
│ │ │ │ │
|
||
│ │── RTP 打包 ───────────────────────────▶│ │
|
||
│ │ │ │ │
|
||
│ │ │ │── 发送 ──▶│
|
||
│ │ │ │ │
|
||
│ │◀── 发送完成 ─────────────────────────────│ │
|
||
│ │ │ │ │
|
||
│ │── 释放DMA ───▶│ │ │ │
|
||
│◀── 缓冲回收 ──│ │ │ │ │
|
||
|
||
时间 (硬件编码): 5-8ms
|
||
时间 (软件编码): 20-30ms
|
||
```
|
||
|
||
#### 2.5.3 错误恢复时序图
|
||
|
||
```
|
||
客户端 WebRTC 后端 编码器 管道
|
||
│ │ │ │ │
|
||
│── 丢失帧NACK ─▶│ │ │ │
|
||
│ │── 重传请求 ──▶│ │ │
|
||
│ │◀── 重传数据 ──│ │ │
|
||
│◀── 重传数据 ──│ │ │ │
|
||
│ │ │ │ │
|
||
│ │── 编码错误 ──────────────▶│ │
|
||
│ │ │ │ │
|
||
│ │ │── 重启 ───▶│ │
|
||
│ │ │◀── 就绪 ──│ │
|
||
│ │── 请求关键帧 ─────────────────────▶│
|
||
│◀── 关键帧 ────│◀── 关键帧 ──│◀── 关键帧 ─│◀── 关键帧 │
|
||
│ │ │ │ │
|
||
│ │── 恢复流 ────────────────────────▶│
|
||
│◀── 恢复流 ────│ │ │ │
|
||
```
|
||
|
||
---
|
||
|
||
## 3. 详细组件设计
|
||
|
||
### 3.1 捕获模块
|
||
|
||
#### 3.1.1 PipeWire 客户端
|
||
|
||
**设计原理:**
|
||
PipeWire 客户端负责与 PipeWire 守护进程通信,通过 xdg-desktop-portal 获取屏幕捕获权限,并接收实时视频流。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// PipeWire 核心连接
|
||
pub struct PipewireCore {
|
||
/// PipeWire 主循环
|
||
main_loop: MainLoop,
|
||
/// PipeWire 上下文
|
||
context: Context,
|
||
/// PipeWire 核心连接
|
||
core: Arc<Core>,
|
||
/// 事件循环线程句柄
|
||
thread_handle: JoinHandle<()>,
|
||
}
|
||
|
||
/// PipeWire 捕获流
|
||
pub struct PipewireStream {
|
||
/// 流句柄
|
||
stream: Stream,
|
||
/// 流状态
|
||
state: StreamState,
|
||
/// 帧格式
|
||
format: Option<Format>,
|
||
/// 缓冲区配置
|
||
buffer_config: BufferConfig,
|
||
/// 帧发送器
|
||
frame_sender: async_channel::Sender<CapturedFrame>,
|
||
}
|
||
|
||
/// 流状态枚举
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum StreamState {
|
||
Unconnected,
|
||
Connecting,
|
||
Connected,
|
||
Streaming,
|
||
Error,
|
||
}
|
||
|
||
/// 缓冲区配置
|
||
pub struct BufferConfig {
|
||
/// 缓冲区数量
|
||
pub num_buffers: usize,
|
||
/// 缓冲区大小
|
||
pub buffer_size: usize,
|
||
/// 最小缓冲区数量
|
||
pub min_buffers: usize,
|
||
/// 最大缓冲区数量
|
||
pub max_buffers: usize,
|
||
}
|
||
```
|
||
|
||
**接口定义:**
|
||
|
||
```rust
|
||
impl PipewireCore {
|
||
/// 创建新的 PipeWire 核心连接
|
||
pub fn new() -> Result<Self, PipewireError> {
|
||
let main_loop = MainLoop::new(None)?;
|
||
let context = Context::new(&main_loop)?;
|
||
let core = Arc::new(context.connect(None)?);
|
||
|
||
// 启动事件循环线程
|
||
let core_clone = core.clone();
|
||
let main_loop_clone = main_loop.clone();
|
||
let thread_handle = std::thread::spawn(move || {
|
||
main_loop_clone.run();
|
||
});
|
||
|
||
Ok(Self {
|
||
main_loop,
|
||
context,
|
||
core,
|
||
thread_handle,
|
||
})
|
||
}
|
||
|
||
/// 获取 PipeWire 核心
|
||
pub fn core(&self) -> &Arc<Core> {
|
||
&self.core
|
||
}
|
||
|
||
/// 关闭连接
|
||
pub fn shutdown(self) {
|
||
self.main_loop.quit();
|
||
self.thread_handle.join().ok();
|
||
}
|
||
}
|
||
|
||
impl PipewireStream {
|
||
/// 创建新的捕获流
|
||
pub fn new(
|
||
core: &Arc<Core>,
|
||
sender: async_channel::Sender<CapturedFrame>,
|
||
) -> Result<Self, PipewireError> {
|
||
let mut stream = Stream::new(
|
||
core,
|
||
"wl-webrtc-capture",
|
||
properties! {
|
||
*pw::keys::MEDIA_TYPE => "Video",
|
||
*pw::keys::MEDIA_CATEGORY => "Capture",
|
||
*pw::keys::MEDIA_ROLE => "Screen",
|
||
},
|
||
)?;
|
||
|
||
// 设置流监听器
|
||
let listener = stream.add_local_listener()?;
|
||
|
||
// 设置参数变更回调
|
||
listener.register(
|
||
pw::stream::events::Events::ParamChanged,
|
||
Self::on_param_changed,
|
||
)?;
|
||
|
||
// 设置处理回调
|
||
listener.register(
|
||
pw::stream::events::Events::Process,
|
||
Self::on_process,
|
||
)?;
|
||
|
||
Ok(Self {
|
||
stream,
|
||
state: StreamState::Unconnected,
|
||
format: None,
|
||
buffer_config: BufferConfig::default(),
|
||
frame_sender: sender,
|
||
})
|
||
}
|
||
|
||
/// 连接到屏幕捕获源
|
||
pub fn connect(&mut self, node_id: u32) -> Result<(), PipewireError> {
|
||
self.stream.connect(
|
||
pw::spa::direction::Direction::Input,
|
||
Some(node_id),
|
||
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
|
||
)?;
|
||
|
||
self.state = StreamState::Connected;
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理参数变更
|
||
fn on_param_changed(stream: &Stream, event_id: u32, event_data: *mut c_void) {
|
||
// 获取流格式信息
|
||
if let Some(format) = stream.format() {
|
||
let video_info = format.parse::<VideoInfoRaw>().unwrap();
|
||
// 保存格式信息
|
||
}
|
||
}
|
||
|
||
/// 处理新帧
|
||
fn on_process(stream: &Stream) {
|
||
// 获取缓冲区
|
||
let buffer = stream.dequeue_buffer().expect("no buffer");
|
||
|
||
// 获取 DMA-BUF 信息
|
||
let datas = buffer.datas();
|
||
let data = &datas[0];
|
||
|
||
// 提取 DMA-BUF 文件描述符
|
||
let fd = data.fd().expect("no fd");
|
||
let size = data.chunk().size() as usize;
|
||
let stride = data.chunk().stride();
|
||
|
||
// 创建捕获帧
|
||
let frame = CapturedFrame {
|
||
dma_buf: DmaBufHandle::new(fd, size, stride, 0),
|
||
width: stream.format().unwrap().size().width,
|
||
height: stream.format().unwrap().size().height,
|
||
format: PixelFormat::from_spa_format(&stream.format().unwrap()),
|
||
timestamp: timestamp_ns(),
|
||
};
|
||
|
||
// 发送帧
|
||
if let Err(e) = self.frame_sender.try_send(frame) {
|
||
warn!("Failed to send frame: {:?}", e);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.1.2 损坏跟踪器
|
||
|
||
**设计原理:**
|
||
损坏跟踪器通过比较连续帧的差异,只编码和传输屏幕上发生变化的区域,从而显著减少带宽和编码开销。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// 损坏区域跟踪器
|
||
pub struct DamageTracker {
|
||
/// 上一帧的句柄
|
||
last_frame: Option<DmaBufHandle>,
|
||
/// 损坏区域队列
|
||
damaged_regions: VecDeque<ScreenRegion>,
|
||
/// 最小损坏阈值(像素)
|
||
min_damage_threshold: u32,
|
||
/// 最大损坏区域数
|
||
max_regions: usize,
|
||
/// 统计信息
|
||
stats: DamageStats,
|
||
}
|
||
|
||
/// 屏幕区域
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub struct ScreenRegion {
|
||
pub x: u32,
|
||
pub y: u32,
|
||
pub width: u32,
|
||
pub height: u32,
|
||
}
|
||
|
||
/// 损坏统计
|
||
pub struct DamageStats {
|
||
pub total_frames: u64,
|
||
pub damaged_frames: u64,
|
||
pub total_regions: u64,
|
||
pub avg_region_size: f32,
|
||
}
|
||
```
|
||
|
||
**状态机:**
|
||
|
||
```
|
||
初始化
|
||
│
|
||
▼
|
||
[等待第一帧]
|
||
│
|
||
▼
|
||
[全屏标记]
|
||
│
|
||
▼
|
||
┌─────────────┐
|
||
│ 检测损坏 │◀─────────┐
|
||
└─────────────┘ │
|
||
│ │
|
||
│ 无损坏 │ 有损坏
|
||
▼ │
|
||
[跳过编码] │
|
||
│ │
|
||
▼ │
|
||
[等待下一帧]────────────┘
|
||
│
|
||
▼
|
||
[输出损坏区域]
|
||
```
|
||
|
||
**接口定义:**
|
||
|
||
```rust
|
||
impl DamageTracker {
|
||
/// 创建新的损坏跟踪器
|
||
pub fn new(min_threshold: u32, max_regions: usize) -> Self {
|
||
Self {
|
||
last_frame: None,
|
||
damaged_regions: VecDeque::with_capacity(max_regions),
|
||
min_damage_threshold: min_threshold,
|
||
max_regions,
|
||
stats: DamageStats::default(),
|
||
}
|
||
}
|
||
|
||
/// 更新损坏区域
|
||
pub fn update(&mut self, new_frame: &CapturedFrame) -> Vec<ScreenRegion> {
|
||
self.stats.total_frames += 1;
|
||
|
||
match &self.last_frame {
|
||
Some(last) => {
|
||
// 比较帧差异
|
||
let regions = self.compute_damage_regions(last, new_frame);
|
||
|
||
if !regions.is_empty() {
|
||
self.stats.damaged_frames += 1;
|
||
self.stats.total_regions += regions.len() as u64;
|
||
|
||
// 更新平均区域大小
|
||
let total_pixels: u64 = regions.iter()
|
||
.map(|r| r.width * r.height)
|
||
.sum();
|
||
self.stats.avg_region_size = total_pixels as f32 / regions.len() as f32;
|
||
}
|
||
|
||
self.last_frame = Some(new_frame.dma_buf.clone());
|
||
regions
|
||
}
|
||
None => {
|
||
// 第一帧,标记全屏
|
||
self.stats.damaged_frames += 1;
|
||
self.stats.total_regions += 1;
|
||
self.stats.avg_region_size =
|
||
(new_frame.width * new_frame.height) as f32;
|
||
|
||
self.last_frame = Some(new_frame.dma_buf.clone());
|
||
vec![ScreenRegion {
|
||
x: 0,
|
||
y: 0,
|
||
width: new_frame.width,
|
||
height: new_frame.height,
|
||
}]
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 计算损坏区域
|
||
fn compute_damage_regions(
|
||
&self,
|
||
last: &DmaBufHandle,
|
||
new: &CapturedFrame,
|
||
) -> Vec<ScreenRegion> {
|
||
// 将帧划分为块进行比较
|
||
let block_size = 16;
|
||
let blocks_x = (new.width as usize + block_size - 1) / block_size;
|
||
let blocks_y = (new.height as usize + block_size - 1) / block_size;
|
||
|
||
let mut damaged_blocks = Vec::new();
|
||
|
||
// 比较每个块
|
||
for y in 0..blocks_y {
|
||
for x in 0..blocks_x {
|
||
let changed = self.compare_block(last, new, x, y, block_size);
|
||
if changed {
|
||
damaged_blocks.push((x, y));
|
||
}
|
||
}
|
||
}
|
||
|
||
// 合并相邻块为区域
|
||
self.merge_blocks_to_regions(damaged_blocks, block_size, new.width, new.height)
|
||
}
|
||
|
||
/// 比较块是否变化
|
||
fn compare_block(
|
||
&self,
|
||
last: &DmaBufHandle,
|
||
new: &CapturedFrame,
|
||
block_x: usize,
|
||
block_y: usize,
|
||
block_size: usize,
|
||
) -> bool {
|
||
// 映射两个 DMA-BUF
|
||
let last_ptr = unsafe { last.as_ptr() };
|
||
let new_ptr = unsafe { new.dma_buf.as_ptr() };
|
||
|
||
// 计算块偏移
|
||
let width = new.width as usize;
|
||
let block_start = (block_y * block_size * width + block_x * block_size) * 4; // RGBA
|
||
|
||
// 简单比较(实际可以使用更高效的哈希或采样)
|
||
unsafe {
|
||
for i in 0..block_size.min(new.height as usize - block_y * block_size) {
|
||
for j in 0..block_size.min(new.width as usize - block_x * block_size) {
|
||
let offset = block_start + (i * width + j) * 4;
|
||
let last_pixel = *(last_ptr.add(offset) as *const [u8; 4]);
|
||
let new_pixel = *(new_ptr.add(offset) as *const [u8; 4]);
|
||
if last_pixel != new_pixel {
|
||
return true;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
false
|
||
}
|
||
|
||
/// 合并块为区域
|
||
fn merge_blocks_to_regions(
|
||
&self,
|
||
blocks: Vec<(usize, usize)>,
|
||
block_size: usize,
|
||
width: u32,
|
||
height: u32,
|
||
) -> Vec<ScreenRegion> {
|
||
if blocks.is_empty() {
|
||
return vec![];
|
||
}
|
||
|
||
// 使用并查集或扫描线算法合并块
|
||
// 简化实现:转换为区域
|
||
let mut regions = Vec::new();
|
||
|
||
for (bx, by) in blocks {
|
||
regions.push(ScreenRegion {
|
||
x: (bx * block_size) as u32,
|
||
y: (by * block_size) as u32,
|
||
width: block_size as u32,
|
||
height: block_size as u32,
|
||
});
|
||
}
|
||
|
||
// 合并相邻区域
|
||
self.merge_adjacent_regions(regions)
|
||
}
|
||
|
||
/// 合并相邻区域
|
||
fn merge_adjacent_regions(&self, mut regions: Vec<ScreenRegion>) -> Vec<ScreenRegion> {
|
||
// 简化实现:直接返回前 N 个区域
|
||
regions.truncate(self.max_regions);
|
||
regions
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn stats(&self) -> &DamageStats {
|
||
&self.stats
|
||
}
|
||
|
||
/// 重置跟踪器
|
||
pub fn reset(&mut self) {
|
||
self.last_frame = None;
|
||
self.damaged_regions.clear();
|
||
self.stats = DamageStats::default();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.2 编码模块
|
||
|
||
#### 3.2.1 编码器抽象
|
||
|
||
**设计原理:**
|
||
编码器抽象定义了统一的视频编码接口,支持多种编码器实现(硬件和软件),便于动态切换和扩展。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// 视频编码器 trait
|
||
#[async_trait]
|
||
pub trait VideoEncoder: Send + Sync {
|
||
/// 编码一帧
|
||
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError>;
|
||
|
||
/// 重新配置编码器
|
||
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError>;
|
||
|
||
/// 请求关键帧
|
||
async fn request_keyframe(&mut self) -> Result<(), EncoderError>;
|
||
|
||
/// 获取编码器统计信息
|
||
fn stats(&self) -> EncoderStats;
|
||
|
||
/// 获取支持的特性
|
||
fn capabilities(&self) -> EncoderCapabilities;
|
||
}
|
||
|
||
/// 编码器配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct EncoderConfig {
|
||
/// 编码器类型
|
||
pub encoder_type: EncoderType,
|
||
/// 视频分辨率
|
||
pub width: u32,
|
||
pub height: u32,
|
||
/// 帧率
|
||
pub frame_rate: u32,
|
||
/// 目标比特率 (bps)
|
||
pub bitrate: u32,
|
||
/// 最大比特率 (bps)
|
||
pub max_bitrate: u32,
|
||
/// 最小比特率 (bps)
|
||
pub min_bitrate: u32,
|
||
/// 关键帧间隔
|
||
pub keyframe_interval: u32,
|
||
/// 编码预设
|
||
pub preset: EncodePreset,
|
||
/// 编码调优
|
||
pub tune: EncodeTune,
|
||
}
|
||
|
||
/// 编码器类型
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum EncoderType {
|
||
H264_VAAPI, // H.264 VA-API 硬件编码
|
||
H264_NVENC, // H.264 NVENC 硬件编码
|
||
H264_X264, // H.264 x264 软件编码
|
||
H265_VAAPI, // H.265 VA-API 硬件编码
|
||
VP9_VAAPI, // VP9 VA-API 硬件编码
|
||
}
|
||
|
||
/// 编码预设
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum EncodePreset {
|
||
Ultrafast, // 极快,最低延迟
|
||
Superfast, // 超快
|
||
Veryfast, // 很快
|
||
Faster, // 快
|
||
Fast, // 中等
|
||
Medium, // 较慢
|
||
Slow, // 慢
|
||
Slower, // 更慢
|
||
Veryslow, // 极慢
|
||
}
|
||
|
||
/// 编码调优
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum EncodeTune {
|
||
Zerolatency, // 零延迟
|
||
Film, // 电影
|
||
Animation, // 动画
|
||
Grain, // 胶片颗粒
|
||
Stillimage, // 静态图像
|
||
}
|
||
|
||
/// 编码后的帧
|
||
#[derive(Debug, Clone)]
|
||
pub struct EncodedFrame {
|
||
/// 编码数据 (零拷贝 Bytes)
|
||
pub data: Bytes,
|
||
/// 是否为关键帧
|
||
pub is_keyframe: bool,
|
||
/// 时间戳 (ns)
|
||
pub timestamp: u64,
|
||
/// 序列号
|
||
pub sequence_number: u64,
|
||
/// RTP 时间戳
|
||
pub rtp_timestamp: u32,
|
||
}
|
||
|
||
/// 编码器错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum EncoderError {
|
||
#[error("编码器初始化失败: {0}")]
|
||
InitFailed(String),
|
||
|
||
#[error("编码失败: {0}")]
|
||
EncodeFailed(String),
|
||
|
||
#[error("重新配置失败: {0}")]
|
||
ReconfigureFailed(String),
|
||
|
||
#[error("关键帧请求失败: {0}")]
|
||
KeyframeRequestFailed(String),
|
||
|
||
#[error("不支持的格式: {0}")]
|
||
UnsupportedFormat(String),
|
||
|
||
#[error("DMA-BUF 导入失败")]
|
||
DmaBufImportFailed,
|
||
}
|
||
|
||
/// 编码器统计信息
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct EncoderStats {
|
||
/// 已编码帧数
|
||
pub frames_encoded: u64,
|
||
/// 关键帧数
|
||
pub keyframes: u64,
|
||
/// 平均编码延迟 (ms)
|
||
pub avg_encode_latency_ms: f64,
|
||
/// 总输出字节数
|
||
pub total_bytes: u64,
|
||
/// 实际比特率 (bps)
|
||
pub actual_bitrate: u32,
|
||
/// 丢帧数
|
||
pub dropped_frames: u64,
|
||
}
|
||
|
||
/// 编码器能力
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct EncoderCapabilities {
|
||
/// 是否支持硬件加速
|
||
pub hardware_accelerated: bool,
|
||
/// 是否支持 DMA-BUF 导入
|
||
pub supports_dma_buf: bool,
|
||
/// 支持的最大分辨率
|
||
pub max_resolution: (u32, u32),
|
||
/// 支持的最大帧率
|
||
pub max_frame_rate: u32,
|
||
/// 支持的比特率范围
|
||
pub bitrate_range: (u32, u32),
|
||
/// 是否支持动态码率调整
|
||
pub supports_dynamic_bitrate: bool,
|
||
}
|
||
```
|
||
|
||
#### 3.2.2 VA-API 编码器
|
||
|
||
**设计原理:**
|
||
VA-API 编码器利用 Intel/AMD GPU 的硬件视频编码能力,通过直接导入 DMA-BUF 实现零拷贝编码,提供极低的延迟。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// VA-API H.264 编码器
|
||
pub struct VaapiH264Encoder {
|
||
/// VA 显示
|
||
display: va::Display,
|
||
/// VA 配置
|
||
config_id: va::ConfigID,
|
||
/// VA 上下文
|
||
context_id: va::ContextID,
|
||
/// 编码器配置
|
||
config: EncoderConfig,
|
||
/// 序列参数缓冲区
|
||
seq_param: VAEncSequenceParameterBufferH264,
|
||
/// 图像参数缓冲区
|
||
pic_param: VAEncPictureParameterBufferH264,
|
||
/// 切片参数缓冲区
|
||
slice_param: VAEncSliceParameterBufferH264,
|
||
/// 参考帧
|
||
reference_frames: [Option<va::SurfaceID>; 2],
|
||
/// 当前参考帧索引
|
||
current_ref_frame: usize,
|
||
/// 序列号
|
||
sequence_number: u64,
|
||
/// 统计信息
|
||
stats: EncoderStats,
|
||
/// RTP 时间戳基数
|
||
rtp_timestamp_base: u32,
|
||
/// 编码计时器
|
||
encode_timer: MovingAverage,
|
||
}
|
||
|
||
/// VA-API 编码器错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum VaapiError {
|
||
#[error("VA 初始化失败: {0}")]
|
||
InitializationFailed(String),
|
||
|
||
#[error("VA 配置失败: {0}")]
|
||
ConfigurationFailed(String),
|
||
|
||
#[error("VA 表面创建失败")]
|
||
SurfaceCreationFailed,
|
||
|
||
#[error("DMA-BUF 导入失败")]
|
||
DmaBufImportFailed,
|
||
|
||
#[error("编码操作失败: {0}")]
|
||
EncodeOperationFailed(String),
|
||
}
|
||
```
|
||
|
||
**状态机:**
|
||
|
||
```
|
||
[未初始化]
|
||
│
|
||
│ initialize()
|
||
▼
|
||
[已初始化]
|
||
│
|
||
│ configure()
|
||
▼
|
||
[已配置]
|
||
│
|
||
│ encode()
|
||
▼
|
||
┌────────────────┐
|
||
│ 编码中 │◀─────────┐
|
||
│ (每帧) │ │
|
||
└────────────────┘ │
|
||
│ │
|
||
│ keyframe │ normal frame
|
||
▼ │
|
||
[编码关键帧] │
|
||
│ │
|
||
▼ │
|
||
[完成] ──────────────┘
|
||
│
|
||
│ error
|
||
▼
|
||
[错误状态]
|
||
│
|
||
│ reconfigure()
|
||
▼
|
||
[已配置]
|
||
```
|
||
|
||
**接口实现:**
|
||
|
||
```rust
|
||
impl VaapiH264Encoder {
|
||
/// 创建新的 VA-API 编码器
|
||
pub fn new(config: EncoderConfig) -> Result<Self, EncoderError> {
|
||
// 打开 VA 显示
|
||
let display = va::Display::open(None)
|
||
.map_err(|e| EncoderError::InitFailed(format!("无法打开 VA 显示: {:?}", e)))?;
|
||
|
||
// 创建 VA 配置
|
||
let config_attribs = vec![
|
||
va::ConfigAttrib {
|
||
type_: va::ConfigAttribType::RTFormat,
|
||
value: VA_RT_FORMAT_YUV420 as i32,
|
||
},
|
||
va::ConfigAttrib {
|
||
type_: va::ConfigAttribType::RateControl,
|
||
value: VA_RC_CBR as i32,
|
||
},
|
||
va::ConfigAttrib {
|
||
type_: va::ConfigAttribType::EncMaxRefFrames,
|
||
value: 1, // 最小参考帧
|
||
},
|
||
];
|
||
|
||
let config_id = display.create_config(
|
||
VAProfileH264ConstrainedBaseline,
|
||
VAEntrypointEncSlice,
|
||
&config_attribs,
|
||
).map_err(|e| EncoderError::InitFailed(format!("创建 VA 配置失败: {:?}", e)))?;
|
||
|
||
// 创建 VA 上下文
|
||
let surfaces = (0..3)
|
||
.map(|_| display.create_surface(config.width, config.height, VA_RT_FORMAT_YUV420))
|
||
.collect::<Result<Vec<_>, _>>()
|
||
.map_err(|e| EncoderError::InitFailed(format!("创建 VA 表面失败: {:?}", e)))?;
|
||
|
||
let context_id = display.create_context(config_id, surfaces)
|
||
.map_err(|e| EncoderError::InitFailed(format!("创建 VA 上下文失败: {:?}", e)))?;
|
||
|
||
// 设置序列参数
|
||
let seq_param = VAEncSequenceParameterBufferH264 {
|
||
intra_period: config.keyframe_interval,
|
||
ip_period: 1, // 无 B 帧
|
||
max_num_ref_frames: 1,
|
||
bits_per_second: config.bitrate,
|
||
time_scale: 90000,
|
||
num_units_in_tick: 90000 / config.frame_rate as u32,
|
||
};
|
||
|
||
// 设置图像参数
|
||
let pic_param = VAEncPictureParameterBufferH264 {
|
||
reference_frames: [
|
||
VAReferenceFrame {
|
||
picture_id: surfaces[0],
|
||
flags: 0,
|
||
},
|
||
VAReferenceFrame {
|
||
picture_id: surfaces[1],
|
||
flags: 0,
|
||
},
|
||
],
|
||
num_ref_idx_l0_active_minus1: 0,
|
||
num_ref_idx_l1_active_minus1: 0,
|
||
pic_fields: VAPictureH264 {
|
||
idr_pic_flag: 0,
|
||
reference_pic_flag: 1,
|
||
},
|
||
};
|
||
|
||
// 设置切片参数
|
||
let slice_param = VAEncSliceParameterBufferH264 {
|
||
macroblock_address: 0,
|
||
num_macroblocks: (config.width / 16) * (config.height / 16),
|
||
slice_type: VAEncSliceType::PSlice,
|
||
num_ref_idx_l0_active_minus1: 0,
|
||
num_ref_idx_l1_active_minus1: 0,
|
||
disable_deblocking_filter_idc: 1, // 更快
|
||
};
|
||
|
||
Ok(Self {
|
||
display,
|
||
config_id,
|
||
context_id,
|
||
config,
|
||
seq_param,
|
||
pic_param,
|
||
slice_param,
|
||
reference_frames: [None, None],
|
||
current_ref_frame: 0,
|
||
sequence_number: 0,
|
||
stats: EncoderStats::default(),
|
||
rtp_timestamp_base: 0,
|
||
encode_timer: MovingAverage::new(100),
|
||
})
|
||
}
|
||
}
|
||
|
||
#[async_trait]
|
||
impl VideoEncoder for VaapiH264Encoder {
|
||
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
|
||
let start = Instant::now();
|
||
|
||
// 导入 DMA-BUF 到 VA 表面
|
||
let surface = unsafe {
|
||
self.display.import_dma_buf(
|
||
frame.dma_buf.fd,
|
||
frame.width,
|
||
frame.height,
|
||
VA_RT_FORMAT_YUV420,
|
||
).map_err(|e| EncoderError::DmaBufImportFailed)?
|
||
};
|
||
|
||
// 确定帧类型
|
||
let is_keyframe = self.sequence_number == 0
|
||
|| (self.sequence_number % self.config.keyframe_interval as u64 == 0);
|
||
|
||
// 更新图像参数
|
||
if is_keyframe {
|
||
self.pic_param.pic_fields.idr_pic_flag = 1;
|
||
} else {
|
||
self.pic_param.pic_fields.idr_pic_flag = 0;
|
||
}
|
||
|
||
// 编码图像
|
||
let encoded_data = self.display.encode_surface(
|
||
surface,
|
||
&self.seq_param,
|
||
&self.pic_param,
|
||
&self.slice_param,
|
||
).map_err(|e| EncoderError::EncodeFailed(format!("编码失败: {:?}", e)))?;
|
||
|
||
// 更新统计信息
|
||
let latency = start.elapsed().as_secs_f64() * 1000.0;
|
||
self.encode_timer.add_sample(latency);
|
||
self.stats.avg_encode_latency_ms = self.encode_timer.average();
|
||
self.stats.frames_encoded += 1;
|
||
self.stats.total_bytes += encoded_data.len() as u64;
|
||
|
||
if is_keyframe {
|
||
self.stats.keyframes += 1;
|
||
}
|
||
|
||
// 计算实际比特率
|
||
let elapsed = start.elapsed();
|
||
if elapsed.as_secs() > 0 {
|
||
self.stats.actual_bitrate =
|
||
(self.stats.total_bytes * 8 / elapsed.as_secs() as u64) as u32;
|
||
}
|
||
|
||
// 更新序列号
|
||
self.sequence_number += 1;
|
||
|
||
// 计算 RTP 时间戳
|
||
let rtp_timestamp = self.rtp_timestamp_base
|
||
+ (self.sequence_number * 90000 / self.config.frame_rate as u64) as u32;
|
||
|
||
Ok(EncodedFrame {
|
||
data: Bytes::from(encoded_data),
|
||
is_keyframe,
|
||
timestamp: frame.timestamp,
|
||
sequence_number: self.sequence_number,
|
||
rtp_timestamp,
|
||
})
|
||
}
|
||
|
||
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
|
||
// 更新序列参数
|
||
self.seq_param.intra_period = config.keyframe_interval;
|
||
self.seq_param.bits_per_second = config.bitrate;
|
||
self.seq_param.num_units_in_tick = 90000 / config.frame_rate as u32;
|
||
|
||
self.config = config;
|
||
Ok(())
|
||
}
|
||
|
||
async fn request_keyframe(&mut self) -> Result<(), EncoderError> {
|
||
// 强制下一帧为关键帧
|
||
self.pic_param.pic_fields.idr_pic_flag = 1;
|
||
Ok(())
|
||
}
|
||
|
||
fn stats(&self) -> EncoderStats {
|
||
self.stats.clone()
|
||
}
|
||
|
||
fn capabilities(&self) -> EncoderCapabilities {
|
||
EncoderCapabilities {
|
||
hardware_accelerated: true,
|
||
supports_dma_buf: true,
|
||
max_resolution: (4096, 4096),
|
||
max_frame_rate: 120,
|
||
bitrate_range: (1000000, 50000000),
|
||
supports_dynamic_bitrate: true,
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.2.3 自适应码率控制器
|
||
|
||
**设计原理:**
|
||
自适应码率控制器根据网络状况和目标延迟动态调整编码比特率,在保证低延迟的同时最大化视频质量。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// 自适应码率控制器
|
||
pub struct AdaptiveBitrateController {
|
||
/// 目标延迟 (ms)
|
||
target_latency_ms: u32,
|
||
/// 当前比特率 (bps)
|
||
current_bitrate: u32,
|
||
/// 最大比特率 (bps)
|
||
max_bitrate: u32,
|
||
/// 最小比特率 (bps)
|
||
min_bitrate: u32,
|
||
/// 帧率
|
||
frame_rate: u32,
|
||
/// 网络质量监测
|
||
network_monitor: NetworkQualityMonitor,
|
||
/// 延迟监测
|
||
latency_monitor: LatencyMonitor,
|
||
/// 比特率调整因子
|
||
adjustment_factor: f64,
|
||
/// 调整间隔
|
||
adjustment_interval: Duration,
|
||
/// 上次调整时间
|
||
last_adjustment: Instant,
|
||
}
|
||
|
||
/// 网络质量监测器
|
||
pub struct NetworkQualityMonitor {
|
||
/// 带宽 (bps)
|
||
bandwidth: u32,
|
||
/// 丢包率 (0-1)
|
||
packet_loss_rate: f64,
|
||
/// 抖动 (ms)
|
||
jitter_ms: u32,
|
||
/// 往返时间 (ms)
|
||
rtt_ms: u32,
|
||
}
|
||
|
||
/// 延迟监测器
|
||
pub struct LatencyMonitor {
|
||
/// 测量的延迟值
|
||
measurements: VecDeque<LatencyMeasurement>,
|
||
/// 平均延迟
|
||
avg_latency_ms: f64,
|
||
/// P95 延迟
|
||
p95_latency_ms: f64,
|
||
}
|
||
|
||
/// 延迟测量
|
||
struct LatencyMeasurement {
|
||
timestamp: Instant,
|
||
latency_ms: u32,
|
||
type_: LatencyType,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy)]
|
||
enum LatencyType {
|
||
Capture,
|
||
Encode,
|
||
Transport,
|
||
Total,
|
||
}
|
||
```
|
||
|
||
**算法实现:**
|
||
|
||
```rust
|
||
impl AdaptiveBitrateController {
|
||
/// 创建新的自适应码率控制器
|
||
pub fn new(
|
||
target_latency_ms: u32,
|
||
initial_bitrate: u32,
|
||
min_bitrate: u32,
|
||
max_bitrate: u32,
|
||
frame_rate: u32,
|
||
) -> Self {
|
||
Self {
|
||
target_latency_ms,
|
||
current_bitrate: initial_bitrate,
|
||
max_bitrate,
|
||
min_bitrate,
|
||
frame_rate,
|
||
network_monitor: NetworkQualityMonitor::default(),
|
||
latency_monitor: LatencyMonitor::new(100),
|
||
adjustment_factor: 0.1, // 每次调整 10%
|
||
adjustment_interval: Duration::from_millis(500), // 每 500ms 调整一次
|
||
last_adjustment: Instant::now(),
|
||
}
|
||
}
|
||
|
||
/// 更新延迟测量
|
||
pub fn update_latency(&mut self, latency_ms: u32, type_: LatencyType) {
|
||
self.latency_monitor.add_measurement(LatencyMeasurement {
|
||
timestamp: Instant::now(),
|
||
latency_ms,
|
||
type_,
|
||
});
|
||
|
||
// 检查是否需要调整码率
|
||
self.check_and_adjust();
|
||
}
|
||
|
||
/// 更新网络质量
|
||
pub fn update_network_quality(&mut self, bandwidth: u32, packet_loss_rate: f64, jitter_ms: u32, rtt_ms: u32) {
|
||
self.network_monitor.bandwidth = bandwidth;
|
||
self.network_monitor.packet_loss_rate = packet_loss_rate;
|
||
self.network_monitor.jitter_ms = jitter_ms;
|
||
self.network_monitor.rtt_ms = rtt_ms;
|
||
|
||
self.check_and_adjust();
|
||
}
|
||
|
||
/// 检查并调整码率
|
||
fn check_and_adjust(&mut self) {
|
||
let now = Instant::now();
|
||
|
||
// 检查调整间隔
|
||
if now.duration_since(self.last_adjustment) < self.adjustment_interval {
|
||
return;
|
||
}
|
||
|
||
// 计算当前平均延迟
|
||
let avg_latency = self.latency_monitor.average_latency();
|
||
if avg_latency.is_none() {
|
||
return;
|
||
}
|
||
|
||
let avg_latency_ms = avg_latency.unwrap();
|
||
|
||
// 计算延迟比率
|
||
let latency_ratio = avg_latency_ms as f64 / self.target_latency_ms as f64;
|
||
|
||
// 根据延迟比率调整码率
|
||
let new_bitrate = if latency_ratio > 1.5 {
|
||
// 延迟过高 - 激进降低码率
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.7) as u32;
|
||
} else if latency_ratio > 1.2 {
|
||
// 延迟偏高 - 降低码率
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.85) as u32;
|
||
} else if latency_ratio < 0.8 && self.network_monitor.packet_loss_rate < 0.01 {
|
||
// 延迟较低且丢包率低 - 可以增加码率
|
||
self.current_bitrate = (self.current_bitrate as f64 * 1.1) as u32;
|
||
}
|
||
|
||
// 考虑网络带宽限制
|
||
if self.current_bitrate > self.network_monitor.bandwidth * 9 / 10 {
|
||
self.current_bitrate = self.network_monitor.bandwidth * 9 / 10;
|
||
}
|
||
|
||
// 考虑丢包率
|
||
if self.network_monitor.packet_loss_rate > 0.05 {
|
||
// 丢包率高,降低码率
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.8) as u32;
|
||
}
|
||
|
||
// 限制在范围内
|
||
self.current_bitrate = self.current_bitrate.clamp(self.min_bitrate, self.max_bitrate);
|
||
|
||
self.last_adjustment = now;
|
||
}
|
||
|
||
/// 获取当前码率
|
||
pub fn current_bitrate(&self) -> u32 {
|
||
self.current_bitrate
|
||
}
|
||
|
||
/// 获取推荐的编码器配置
|
||
pub fn get_encoder_config(&self, base_config: EncoderConfig) -> EncoderConfig {
|
||
let mut config = base_config.clone();
|
||
config.bitrate = self.current_bitrate;
|
||
config.max_bitrate = self.current_bitrate * 11 / 10; // +10%
|
||
config.min_bitrate = self.current_bitrate * 9 / 10; // -10%
|
||
config
|
||
}
|
||
}
|
||
|
||
impl LatencyMonitor {
|
||
/// 创建新的延迟监测器
|
||
fn new(max_samples: usize) -> Self {
|
||
Self {
|
||
measurements: VecDeque::with_capacity(max_samples),
|
||
avg_latency_ms: 0.0,
|
||
p95_latency_ms: 0.0,
|
||
}
|
||
}
|
||
|
||
/// 添加延迟测量
|
||
fn add_measurement(&mut self, measurement: LatencyMeasurement) {
|
||
self.measurements.push_back(measurement);
|
||
|
||
if self.measurements.len() > self.measurements.capacity() {
|
||
self.measurements.pop_front();
|
||
}
|
||
|
||
self.update_statistics();
|
||
}
|
||
|
||
/// 更新统计信息
|
||
fn update_statistics(&mut self) {
|
||
if self.measurements.is_empty() {
|
||
return;
|
||
}
|
||
|
||
let total: u32 = self.measurements.iter().map(|m| m.latency_ms).sum();
|
||
self.avg_latency_ms = total as f64 / self.measurements.len() as f64;
|
||
|
||
// 计算 P95
|
||
let mut sorted: Vec<u32> = self.measurements.iter().map(|m| m.latency_ms).collect();
|
||
sorted.sort();
|
||
let p95_index = (sorted.len() * 95 / 100).min(sorted.len() - 1);
|
||
self.p95_latency_ms = sorted[p95_index] as f64;
|
||
}
|
||
|
||
/// 获取平均延迟
|
||
fn average_latency(&self) -> Option<f64> {
|
||
if self.measurements.is_empty() {
|
||
None
|
||
} else {
|
||
Some(self.avg_latency_ms)
|
||
}
|
||
}
|
||
|
||
/// 获取 P95 延迟
|
||
fn p95_latency(&self) -> Option<f64> {
|
||
if self.measurements.is_empty() {
|
||
None
|
||
} else {
|
||
Some(self.p95_latency_ms)
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.3 WebRTC 传输模块
|
||
|
||
#### 3.3.1 对等连接管理器
|
||
|
||
**设计原理:**
|
||
对等连接管理器负责创建和管理 WebRTC 对等连接,处理 SDP 交换、ICE 候选、媒体轨道和数据通道。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// 对等连接管理器
|
||
pub struct PeerConnectionManager {
|
||
/// WebRTC API
|
||
api: Arc<webrtc::API>,
|
||
/// 所有对等连接
|
||
peer_connections: Arc<RwLock<HashMap<String, PeerConnection>>>,
|
||
/// 配置
|
||
config: WebRtcConfig,
|
||
/// 统计信息
|
||
stats: ConnectionStats,
|
||
}
|
||
|
||
/// 对等连接
|
||
pub struct PeerConnection {
|
||
/// WebRTC 对等连接
|
||
pc: RTCPeerConnection,
|
||
/// 视频轨道
|
||
video_track: Arc<TrackLocalStaticSample>,
|
||
/// 数据通道
|
||
data_channel: Option<Arc<RTCDataChannel>>,
|
||
/// 连接状态
|
||
state: RTCPeerConnectionState,
|
||
/// 会话 ID
|
||
session_id: String,
|
||
}
|
||
|
||
/// WebRTC 配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct WebRtcConfig {
|
||
/// ICE 服务器
|
||
pub ice_servers: Vec<IceServer>,
|
||
/// ICE 传输策略
|
||
pub ice_transport_policy: IceTransportPolicy,
|
||
/// 视频编解码器
|
||
pub video_codecs: Vec<VideoCodecConfig>,
|
||
/// 音频编解码器
|
||
pub audio_codecs: Vec<AudioCodecConfig>,
|
||
/// 最大比特率 (bps)
|
||
pub max_bitrate: u32,
|
||
/// 最小比特率 (bps)
|
||
pub min_bitrate: u32,
|
||
/// 起始比特率 (bps)
|
||
pub start_bitrate: u32,
|
||
}
|
||
|
||
/// ICE 服务器
|
||
#[derive(Debug, Clone)]
|
||
pub struct IceServer {
|
||
pub urls: Vec<String>,
|
||
pub username: Option<String>,
|
||
pub credential: Option<String>,
|
||
}
|
||
|
||
/// 视频编解码器配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct VideoCodecConfig {
|
||
pub name: String,
|
||
pub clock_rate: u32,
|
||
pub num_channels: u16,
|
||
pub parameters: CodecParameters,
|
||
}
|
||
|
||
/// 编解码器参数
|
||
#[derive(Debug, Clone)]
|
||
pub struct CodecParameters {
|
||
pub profile_level_id: String,
|
||
pub packetization_mode: u8,
|
||
pub level_asymmetry_allowed: bool,
|
||
}
|
||
|
||
/// 连接统计信息
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct ConnectionStats {
|
||
/// 总连接数
|
||
pub total_connections: u64,
|
||
/// 活跃连接数
|
||
pub active_connections: u64,
|
||
/// 总发送字节数
|
||
pub total_bytes_sent: u64,
|
||
/// 总接收字节数
|
||
pub total_bytes_received: u64,
|
||
/// 总丢包数
|
||
pub total_packets_lost: u64,
|
||
}
|
||
```
|
||
|
||
**状态机:**
|
||
|
||
```
|
||
[新建]
|
||
│
|
||
│ createOffer()
|
||
▼
|
||
[检查本地状态]
|
||
│
|
||
│ setLocalDescription()
|
||
▼
|
||
[本地描述已设置]
|
||
│
|
||
│ 等待远程描述
|
||
│ setRemoteDescription()
|
||
▼
|
||
[远程描述已设置]
|
||
│
|
||
│ ICE 收集
|
||
▼
|
||
[ICE 收集中]
|
||
│
|
||
│ ICE 连接完成
|
||
▼
|
||
[已连接]
|
||
│
|
||
│ 传输中
|
||
▼
|
||
[活跃传输中]
|
||
│
|
||
│ 断开/错误
|
||
▼
|
||
[已断开]
|
||
```
|
||
|
||
**接口实现:**
|
||
|
||
```rust
|
||
impl PeerConnectionManager {
|
||
/// 创建新的对等连接管理器
|
||
pub fn new(config: WebRtcConfig) -> Result<Self, WebRtcError> {
|
||
let mut setting_engine = webrtc::api::SettingEngine::default();
|
||
|
||
// 配置 ICE 服务器
|
||
let ice_servers: Vec<RTCIceServer> = config.ice_servers.iter()
|
||
.map(|s| RTCIceServer {
|
||
urls: s.urls.clone(),
|
||
username: s.username.clone().unwrap_or_default(),
|
||
credential: s.credential.clone().unwrap_or_default(),
|
||
..Default::default()
|
||
})
|
||
.collect();
|
||
|
||
// 创建 WebRTC API
|
||
let api = Arc::new(
|
||
webrtc::api::APIBuilder::new()
|
||
.with_setting_engine(setting_engine)
|
||
.build()
|
||
);
|
||
|
||
Ok(Self {
|
||
api,
|
||
peer_connections: Arc::new(RwLock::new(HashMap::new())),
|
||
config,
|
||
stats: ConnectionStats::default(),
|
||
})
|
||
}
|
||
|
||
/// 创建新的对等连接
|
||
pub async fn create_peer_connection(
|
||
&self,
|
||
session_id: String,
|
||
) -> Result<String, WebRtcError> {
|
||
// 配置 RTC
|
||
let rtc_config = RTCConfiguration {
|
||
ice_servers: self.config.ice_servers.iter()
|
||
.map(|s| RTCIceServer {
|
||
urls: s.urls.clone(),
|
||
username: s.username.clone().unwrap_or_default(),
|
||
credential: s.credential.clone().unwrap_or_default(),
|
||
..Default::default()
|
||
})
|
||
.collect(),
|
||
ice_transport_policy: self.config.ice_transport_policy,
|
||
..Default::default()
|
||
};
|
||
|
||
// 创建对等连接
|
||
let pc = self.api.new_peer_connection(rtc_config).await?;
|
||
|
||
// 创建视频轨道
|
||
let video_track = Arc::new(
|
||
TrackLocalStaticSample::new(
|
||
RTCRtpCodecCapability {
|
||
mime_type: "video/H264".to_string(),
|
||
clock_rate: 90000,
|
||
channels: 1,
|
||
sdp_fmtp_line: "profile-level-id=42e01f;packetization-mode=1".to_string(),
|
||
..Default::default()
|
||
},
|
||
"video".to_string(),
|
||
)
|
||
);
|
||
|
||
// 添加视频轨道
|
||
let rtp_transceiver = pc.add_track(Arc::clone(&video_track)).await?;
|
||
|
||
// 设置 ICE 候选处理
|
||
let pc_clone = pc.clone();
|
||
pc.on_ice_candidate(Box::new(move |candidate| {
|
||
let pc_clone = pc_clone.clone();
|
||
Box::pin(async move {
|
||
if let Some(candidate) = candidate {
|
||
debug!("ICE 候选: {:?}", candidate);
|
||
// 发送候选到信令服务器
|
||
}
|
||
})
|
||
})).await;
|
||
|
||
// 设置连接状态变化
|
||
let session_id_clone = session_id.clone();
|
||
pc.on_peer_connection_state_change(Box::new(move |state| {
|
||
debug!("对等连接 {} 状态变化: {:?}", session_id_clone, state);
|
||
Box::pin(async move {})
|
||
})).await;
|
||
|
||
// 存储对等连接
|
||
let peer_connection = PeerConnection::new(session_id.clone(), pc, video_track);
|
||
self.peer_connections.write().await.insert(session_id.clone(), peer_connection);
|
||
|
||
self.stats.total_connections += 1;
|
||
|
||
Ok(session_id)
|
||
}
|
||
|
||
/// 创建 Offer
|
||
pub async fn create_offer(&self, session_id: &str) -> Result<RTCSessionDescription, WebRtcError> {
|
||
let peer_connections = self.peer_connections.read().await;
|
||
let peer_connection = peer_connections.get(session_id)
|
||
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
|
||
|
||
peer_connection.create_offer().await
|
||
}
|
||
|
||
/// 设置远程描述
|
||
pub async fn set_remote_description(
|
||
&self,
|
||
session_id: &str,
|
||
desc: RTCSessionDescription,
|
||
) -> Result<(), WebRtcError> {
|
||
let peer_connections = self.peer_connections.read().await;
|
||
let peer_connection = peer_connections.get(session_id)
|
||
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
|
||
|
||
peer_connection.set_remote_description(desc).await
|
||
}
|
||
|
||
/// 发送视频帧
|
||
pub async fn send_video_frame(
|
||
&self,
|
||
session_id: &str,
|
||
frame: EncodedFrame,
|
||
) -> Result<(), WebRtcError> {
|
||
let peer_connections = self.peer_connections.read().await;
|
||
let peer_connection = peer_connections.get(session_id)
|
||
.ok_or(WebRtcError::SessionNotFound(session_id.to_string()))?;
|
||
|
||
peer_connection.write_sample(frame).await?;
|
||
self.stats.total_bytes_sent += frame.data.len() as u64;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 关闭对等连接
|
||
pub async fn close_peer_connection(&self, session_id: &str) -> Result<(), WebRtcError> {
|
||
let mut peer_connections = self.peer_connections.write().await;
|
||
if let Some(peer_connection) = peer_connections.remove(session_id) {
|
||
peer_connection.close().await;
|
||
self.stats.active_connections -= 1;
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn stats(&self) -> &ConnectionStats {
|
||
&self.stats
|
||
}
|
||
}
|
||
|
||
impl PeerConnection {
|
||
/// 创建新的对等连接
|
||
fn new(
|
||
session_id: String,
|
||
pc: RTCPeerConnection,
|
||
video_track: Arc<TrackLocalStaticSample>,
|
||
) -> Self {
|
||
Self {
|
||
pc,
|
||
video_track,
|
||
data_channel: None,
|
||
state: RTCPeerConnectionState::New,
|
||
session_id,
|
||
}
|
||
}
|
||
|
||
/// 创建 Offer
|
||
async fn create_offer(&self) -> Result<RTCSessionDescription, WebRtcError> {
|
||
let offer = self.pc.create_offer(None).await?;
|
||
self.pc.set_local_description(offer.clone()).await?;
|
||
Ok(offer)
|
||
}
|
||
|
||
/// 创建 Answer
|
||
async fn create_answer(&self) -> Result<RTCSessionDescription, WebRtcError> {
|
||
let answer = self.pc.create_answer(None).await?;
|
||
self.pc.set_local_description(answer.clone()).await?;
|
||
Ok(answer)
|
||
}
|
||
|
||
/// 设置远程描述
|
||
async fn set_remote_description(&self, desc: RTCSessionDescription) -> Result<(), WebRtcError> {
|
||
self.pc.set_remote_description(desc).await
|
||
}
|
||
|
||
/// 添加 ICE 候选
|
||
async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<(), WebRtcError> {
|
||
self.pc.add_ice_candidate(candidate).await
|
||
}
|
||
|
||
/// 写入视频样本
|
||
async fn write_sample(&self, frame: EncodedFrame) -> Result<(), WebRtcError> {
|
||
let sample = Sample {
|
||
data: frame.data.to_vec(),
|
||
duration: Duration::from_millis(1000 / 60), // 假设 60 FPS
|
||
..Default::default()
|
||
};
|
||
|
||
self.video_track.write_sample(&sample).await
|
||
}
|
||
|
||
/// 关闭连接
|
||
async fn close(self) {
|
||
self.pc.close().await;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.3.2 RTP 打包器
|
||
|
||
**设计原理:**
|
||
RTP 打包器将编码后的视频帧分片为符合 MTU 限制的 RTP 数据包,并添加必要的 RTP 头部信息。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// RTP 打包器
|
||
pub struct RtpPacketizer {
|
||
/// 最大载荷大小
|
||
max_payload_size: usize,
|
||
/// 序列号
|
||
sequence_number: u16,
|
||
/// 时间戳基数
|
||
timestamp_base: u32,
|
||
/// SSRC
|
||
ssrc: u32,
|
||
/// 统计信息
|
||
stats: PacketizerStats,
|
||
}
|
||
|
||
/// RTP 数据包
|
||
#[derive(Debug, Clone)]
|
||
pub struct RtpPacket {
|
||
/// RTP 头部
|
||
pub header: RtpHeader,
|
||
/// 载荷
|
||
pub payload: Bytes,
|
||
}
|
||
|
||
/// RTP 头部
|
||
#[derive(Debug, Clone, Copy)]
|
||
#[repr(C)]
|
||
pub struct RtpHeader {
|
||
/// 版本 (2 bits) | 填充 (1 bit) | 扩展 (1 bit) | CSRC 计数 (4 bits)
|
||
pub v_p_x_cc: u8,
|
||
/// 标记 (1 bit) | 载荷类型 (7 bits)
|
||
pub m_pt: u8,
|
||
/// 序列号
|
||
pub sequence_number: u16,
|
||
/// 时间戳
|
||
pub timestamp: u32,
|
||
/// SSRC
|
||
pub ssrc: u32,
|
||
/// CSRC 列表
|
||
pub csrc: [u32; 15],
|
||
}
|
||
|
||
/// 打包器统计信息
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct PacketizerStats {
|
||
/// 总数据包数
|
||
pub total_packets: u64,
|
||
/// 总字节数
|
||
pub total_bytes: u64,
|
||
/// 最大数据包大小
|
||
pub max_packet_size: usize,
|
||
/// 最小数据包大小
|
||
pub min_packet_size: usize,
|
||
}
|
||
```
|
||
|
||
**打包算法:**
|
||
|
||
```rust
|
||
impl RtpPacketizer {
|
||
/// 创建新的 RTP 打包器
|
||
pub fn new(
|
||
max_payload_size: usize,
|
||
ssrc: u32,
|
||
timestamp_base: u32,
|
||
) -> Self {
|
||
Self {
|
||
max_payload_size: max_payload_size.min(1200), // 标准 MTU
|
||
sequence_number: rand::random(),
|
||
timestamp_base,
|
||
ssrc,
|
||
stats: PacketizerStats::default(),
|
||
}
|
||
}
|
||
|
||
/// 将编码帧打包为 RTP 数据包
|
||
pub fn packetize(&mut self, frame: &EncodedFrame) -> Result<Vec<RtpPacket>, PacketizationError> {
|
||
let mut packets = Vec::new();
|
||
|
||
// 计算需要的包数
|
||
let data_len = frame.data.len();
|
||
let num_packets = (data_len + self.max_payload_size - 1) / self.max_payload_size;
|
||
|
||
// 标记位设置
|
||
// 对于关键帧,设置 M 位
|
||
let marker = if frame.is_keyframe { 0x80 } else { 0x00 };
|
||
|
||
// 分片数据
|
||
for i in 0..num_packets {
|
||
let offset = i * self.max_payload_size;
|
||
let len = self.max_payload_size.min(data_len - offset);
|
||
let payload = frame.data.slice(offset..offset + len);
|
||
|
||
// 创建 RTP 头部
|
||
let header = RtpHeader {
|
||
v_p_x_cc: 0x80, // Version 2, no padding, no extension, no CSRC
|
||
m_pt: marker | 96, // Payload type 96 for H.264
|
||
sequence_number: self.sequence_number,
|
||
timestamp: frame.rtp_timestamp,
|
||
ssrc: self.ssrc,
|
||
csrc: [0; 15],
|
||
};
|
||
|
||
// 创建 RTP 数据包
|
||
let packet = RtpPacket {
|
||
header,
|
||
payload,
|
||
};
|
||
|
||
packets.push(packet);
|
||
|
||
// 更新序列号
|
||
self.sequence_number = self.sequence_number.wrapping_add(1);
|
||
|
||
// 更新统计信息
|
||
self.stats.total_packets += 1;
|
||
self.stats.total_bytes += packet.payload.len();
|
||
self.stats.max_packet_size = self.stats.max_packet_size.max(packet.payload.len());
|
||
self.stats.min_packet_size = if self.stats.min_packet_size == 0 {
|
||
packet.payload.len()
|
||
} else {
|
||
self.stats.min_packet_size.min(packet.payload.len())
|
||
};
|
||
}
|
||
|
||
Ok(packets)
|
||
}
|
||
|
||
/// 获取序列号
|
||
pub fn sequence_number(&self) -> u16 {
|
||
self.sequence_number
|
||
}
|
||
|
||
/// 重置序列号
|
||
pub fn reset_sequence_number(&mut self) {
|
||
self.sequence_number = rand::random();
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn stats(&self) -> &PacketizerStats {
|
||
&self.stats
|
||
}
|
||
}
|
||
|
||
impl RtpHeader {
|
||
/// 序列化头部为字节
|
||
pub fn to_bytes(&self) -> [u8; 12] {
|
||
let mut bytes = [0u8; 12];
|
||
|
||
bytes[0] = self.v_p_x_cc;
|
||
bytes[1] = self.m_pt;
|
||
bytes[2..4].copy_from_slice(&self.sequence_number.to_be_bytes());
|
||
bytes[4..8].copy_from_slice(&self.timestamp.to_be_bytes());
|
||
bytes[8..12].copy_from_slice(&self.ssrc.to_be_bytes());
|
||
|
||
bytes
|
||
}
|
||
|
||
/// 从字节解析头部
|
||
pub fn from_bytes(bytes: &[u8]) -> Result<Self, PacketizationError> {
|
||
if bytes.len() < 12 {
|
||
return Err(PacketizationError::InvalidHeaderLength);
|
||
}
|
||
|
||
Ok(Self {
|
||
v_p_x_cc: bytes[0],
|
||
m_pt: bytes[1],
|
||
sequence_number: u16::from_be_bytes([bytes[2], bytes[3]]),
|
||
timestamp: u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
|
||
ssrc: u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
|
||
csrc: [0; 15],
|
||
})
|
||
}
|
||
}
|
||
|
||
/// 打包错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum PacketizationError {
|
||
#[error("无效的头部长度")]
|
||
InvalidHeaderLength,
|
||
|
||
#[error("载荷太大")]
|
||
PayloadTooLarge,
|
||
|
||
#[error("序列化失败")]
|
||
SerializationFailed,
|
||
}
|
||
```
|
||
|
||
### 3.4 缓冲管理模块
|
||
|
||
#### 3.4.1 DMA-BUF 池
|
||
|
||
**设计原理:**
|
||
DMA-BUF 池管理 GPU 内存缓冲区的分配和回收,减少频繁分配带来的开销,并防止内存泄漏。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// DMA-BUF 池
|
||
pub struct DmaBufPool {
|
||
/// 空闲缓冲区
|
||
idle_buffers: VecDeque<DmaBufHandle>,
|
||
/// 已分配缓冲区
|
||
allocated_buffers: HashMap<usize, DmaBufHandle>,
|
||
/// 缓冲区配置
|
||
config: PoolConfig,
|
||
/// 分配计数器
|
||
allocation_counter: usize,
|
||
/// 统计信息
|
||
stats: PoolStats,
|
||
}
|
||
|
||
/// 缓冲区池配置
|
||
pub struct PoolConfig {
|
||
/// 最大缓冲区数
|
||
pub max_buffers: usize,
|
||
/// 最小缓冲区数
|
||
pub min_buffers: usize,
|
||
/// 预分配缓冲区数
|
||
pub preallocated_buffers: usize,
|
||
/// 缓冲区大小
|
||
pub buffer_size: usize,
|
||
/// 跨距
|
||
pub stride: u32,
|
||
}
|
||
|
||
/// 池统计信息
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct PoolStats {
|
||
/// 总分配数
|
||
pub total_allocations: u64,
|
||
/// 总释放数
|
||
pub total_frees: u64,
|
||
/// 当前使用数
|
||
pub current_usage: usize,
|
||
/// 峰值使用数
|
||
pub peak_usage: usize,
|
||
/// 分配失败数
|
||
pub allocation_failures: u64,
|
||
}
|
||
```
|
||
|
||
**池管理算法:**
|
||
|
||
```rust
|
||
impl DmaBufPool {
|
||
/// 创建新的 DMA-BUF 池
|
||
pub fn new(config: PoolConfig) -> Result<Self, PoolError> {
|
||
let mut pool = Self {
|
||
idle_buffers: VecDeque::with_capacity(config.max_buffers),
|
||
allocated_buffers: HashMap::new(),
|
||
config,
|
||
allocation_counter: 0,
|
||
stats: PoolStats::default(),
|
||
};
|
||
|
||
// 预分配缓冲区
|
||
for _ in 0..config.preallocated_buffers.min(config.max_buffers) {
|
||
let buffer = pool.allocate_buffer()?;
|
||
pool.idle_buffers.push_back(buffer);
|
||
}
|
||
|
||
Ok(pool)
|
||
}
|
||
|
||
/// 分配缓冲区
|
||
pub fn acquire(&mut self) -> Result<DmaBufHandle, PoolError> {
|
||
// 尝试从空闲队列获取
|
||
if let Some(buffer) = self.idle_buffers.pop_front() {
|
||
self.allocation_counter += 1;
|
||
self.stats.current_usage += 1;
|
||
self.stats.peak_usage = self.stats.peak_usage.max(self.stats.current_usage);
|
||
|
||
let id = self.allocation_counter;
|
||
self.allocated_buffers.insert(id, buffer);
|
||
|
||
// 返回带 ID 的句柄
|
||
let mut handle = self.allocated_buffers.get(&id).unwrap().clone();
|
||
handle.id = Some(id);
|
||
return Ok(handle);
|
||
}
|
||
|
||
// 检查是否达到最大限制
|
||
if self.idle_buffers.len() + self.allocated_buffers.len() >= self.config.max_buffers {
|
||
self.stats.allocation_failures += 1;
|
||
return Err(PoolError::PoolExhausted);
|
||
}
|
||
|
||
// 分配新缓冲区
|
||
let buffer = self.allocate_buffer()?;
|
||
self.allocation_counter += 1;
|
||
self.stats.current_usage += 1;
|
||
self.stats.peak_usage = self.stats.peak_usage.max(self.stats.current_usage);
|
||
self.stats.total_allocations += 1;
|
||
|
||
let id = self.allocation_counter;
|
||
self.allocated_buffers.insert(id, buffer.clone());
|
||
buffer.id = Some(id);
|
||
|
||
Ok(buffer)
|
||
}
|
||
|
||
/// 释放缓冲区
|
||
pub fn release(&mut self, handle: DmaBufHandle) -> Result<(), PoolError> {
|
||
let id = handle.id.ok_or(PoolError::InvalidHandle)?;
|
||
|
||
if let Some(mut buffer) = self.allocated_buffers.remove(&id) {
|
||
// 重置句柄 ID
|
||
buffer.id = None;
|
||
|
||
// 返回到空闲队列
|
||
self.idle_buffers.push_back(buffer);
|
||
|
||
self.stats.current_usage -= 1;
|
||
self.stats.total_frees += 1;
|
||
|
||
Ok(())
|
||
} else {
|
||
Err(PoolError::InvalidHandle)
|
||
}
|
||
}
|
||
|
||
/// 分配新的 DMA-BUF
|
||
fn allocate_buffer(&self) -> Result<DmaBufHandle, PoolError> {
|
||
// 创建 DMA-BUF
|
||
// 这需要调用 DRM 或 VA-API 来分配 GPU 内存
|
||
// 这里简化实现
|
||
|
||
Ok(DmaBufHandle {
|
||
fd: -1, // 需要实际分配
|
||
size: self.config.buffer_size,
|
||
stride: self.config.stride,
|
||
offset: 0,
|
||
id: None,
|
||
})
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn stats(&self) -> &PoolStats {
|
||
&self.stats
|
||
}
|
||
|
||
/// 清理池
|
||
pub fn cleanup(&mut self) {
|
||
// 释放所有缓冲区
|
||
self.idle_buffers.clear();
|
||
self.allocated_buffers.clear();
|
||
}
|
||
}
|
||
|
||
/// 池错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum PoolError {
|
||
#[error("池已耗尽")]
|
||
PoolExhausted,
|
||
|
||
#[error("无效的句柄")]
|
||
InvalidHandle,
|
||
|
||
#[error("分配失败: {0}")]
|
||
AllocationFailed(String),
|
||
}
|
||
|
||
/// DMA-BUF 句柄
|
||
#[derive(Debug, Clone)]
|
||
pub struct DmaBufHandle {
|
||
pub fd: RawFd,
|
||
pub size: usize,
|
||
pub stride: u32,
|
||
pub offset: u32,
|
||
pub id: Option<usize>, // 池 ID
|
||
}
|
||
```
|
||
|
||
#### 3.4.2 内存泄漏防护
|
||
|
||
**设计原理:**
|
||
通过引用计数、所有权跟踪和定期检查,防止内存泄漏。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// 内存跟踪器
|
||
pub struct MemoryTracker {
|
||
/// 跟踪的分配
|
||
allocations: HashMap<usize, AllocationInfo>,
|
||
/// 分配计数器
|
||
counter: usize,
|
||
/// 泄漏检测阈值
|
||
leak_threshold: Duration,
|
||
/// 最后检查时间
|
||
last_check: Instant,
|
||
}
|
||
|
||
/// 分配信息
|
||
struct AllocationInfo {
|
||
/// 类型
|
||
allocation_type: AllocationType,
|
||
/// 大小
|
||
size: usize,
|
||
/// 分配时间
|
||
allocated_at: Instant,
|
||
/// 分配位置
|
||
location: &'static str,
|
||
}
|
||
|
||
/// 分配类型
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum AllocationType {
|
||
DmaBuf,
|
||
EncodedBuffer,
|
||
SharedMemory,
|
||
}
|
||
```
|
||
|
||
**实现:**
|
||
|
||
```rust
|
||
impl MemoryTracker {
|
||
/// 创建新的内存跟踪器
|
||
pub fn new(leak_threshold: Duration) -> Self {
|
||
Self {
|
||
allocations: HashMap::new(),
|
||
counter: 0,
|
||
leak_threshold,
|
||
last_check: Instant::now(),
|
||
}
|
||
}
|
||
|
||
/// 跟踪分配
|
||
pub fn track_allocation(
|
||
&mut self,
|
||
allocation_type: AllocationType,
|
||
size: usize,
|
||
location: &'static str,
|
||
) -> usize {
|
||
let id = self.counter;
|
||
self.counter += 1;
|
||
|
||
self.allocations.insert(id, AllocationInfo {
|
||
allocation_type,
|
||
size,
|
||
allocated_at: Instant::now(),
|
||
location,
|
||
});
|
||
|
||
id
|
||
}
|
||
|
||
/// 跟踪释放
|
||
pub fn track_free(&mut self, id: usize) -> Result<(), MemoryError> {
|
||
self.allocations.remove(&id)
|
||
.ok_or(MemoryError::InvalidAllocationId(id))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 检查泄漏
|
||
pub fn check_leaks(&mut self) -> Vec<LeakInfo> {
|
||
let now = Instant::now();
|
||
let mut leaks = Vec::new();
|
||
|
||
for (id, info) in &self.allocations {
|
||
let age = now.duration_since(info.allocated_at);
|
||
|
||
if age > self.leak_threshold {
|
||
leaks.push(LeakInfo {
|
||
id: *id,
|
||
allocation_type: info.allocation_type,
|
||
size: info.size,
|
||
age,
|
||
location: info.location,
|
||
});
|
||
}
|
||
}
|
||
|
||
leaks
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn stats(&self) -> MemoryTrackerStats {
|
||
let total_size: usize = self.allocations.values()
|
||
.map(|info| info.size)
|
||
.sum();
|
||
|
||
let by_type: HashMap<AllocationType, usize> = self.allocations.values()
|
||
.fold(HashMap::new(), |mut acc, info| {
|
||
*acc.entry(info.allocation_type).or_insert(0) += 1;
|
||
acc
|
||
});
|
||
|
||
MemoryTrackerStats {
|
||
total_allocations: self.allocations.len(),
|
||
total_size,
|
||
by_type,
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 泄漏信息
|
||
#[derive(Debug)]
|
||
pub struct LeakInfo {
|
||
pub id: usize,
|
||
pub allocation_type: AllocationType,
|
||
pub size: usize,
|
||
pub age: Duration,
|
||
pub location: &'static str,
|
||
}
|
||
|
||
/// 内存跟踪器统计
|
||
#[derive(Debug)]
|
||
pub struct MemoryTrackerStats {
|
||
pub total_allocations: usize,
|
||
pub total_size: usize,
|
||
pub by_type: HashMap<AllocationType, usize>,
|
||
}
|
||
|
||
/// 内存错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum MemoryError {
|
||
#[error("无效的分配 ID: {0}")]
|
||
InvalidAllocationId(usize),
|
||
}
|
||
```
|
||
|
||
### 3.5 信令模块
|
||
|
||
#### 3.5.1 WebSocket 服务器
|
||
|
||
**设计原理:**
|
||
WebSocket 服务器处理客户端连接,交换 SDP 和 ICE 候选,协调会话建立。
|
||
|
||
**数据结构:**
|
||
|
||
```rust
|
||
/// WebSocket 信令服务器
|
||
pub struct SignalingServer {
|
||
/// WebSocket 监听器
|
||
listener: WebSocketListener,
|
||
/// 活跃连接
|
||
connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
/// 会话管理器
|
||
session_manager: SessionManager,
|
||
/// 配置
|
||
config: SignalingConfig,
|
||
}
|
||
|
||
/// 连接
|
||
pub struct Connection {
|
||
/// 连接 ID
|
||
id: ConnectionId,
|
||
/// WebSocket 写入端
|
||
write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
|
||
/// 会话 ID
|
||
session_id: Option<String>,
|
||
}
|
||
|
||
/// 信令消息
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
#[serde(tag = "type")]
|
||
pub enum SignalingMessage {
|
||
#[serde(rename = "offer")]
|
||
Offer {
|
||
sdp: String,
|
||
session_id: String,
|
||
},
|
||
#[serde(rename = "answer")]
|
||
Answer {
|
||
sdp: String,
|
||
session_id: String,
|
||
},
|
||
#[serde(rename = "ice-candidate")]
|
||
IceCandidate {
|
||
candidate: String,
|
||
sdp_mid: String,
|
||
sdp_mline_index: u16,
|
||
session_id: String,
|
||
},
|
||
#[serde(rename = "error")]
|
||
Error {
|
||
message: String,
|
||
},
|
||
}
|
||
```
|
||
|
||
**实现:**
|
||
|
||
```rust
|
||
impl SignalingServer {
|
||
/// 创建新的信令服务器
|
||
pub async fn new(config: SignalingConfig) -> Result<Self, SignalingError> {
|
||
let listener = TcpListener::bind(&config.bind_addr).await?;
|
||
let connections = Arc::new(RwLock::new(HashMap::new()));
|
||
|
||
Ok(Self {
|
||
listener,
|
||
connections,
|
||
session_manager: SessionManager::new(),
|
||
config,
|
||
})
|
||
}
|
||
|
||
/// 启动服务器
|
||
pub async fn run(&self) -> Result<(), SignalingError> {
|
||
info!("信令服务器监听于 {}", self.config.bind_addr);
|
||
|
||
while let Ok((stream, addr)) = self.listener.accept().await {
|
||
info!("新连接来自 {}", addr);
|
||
|
||
let connections = self.connections.clone();
|
||
let session_manager = self.session_manager.clone();
|
||
|
||
tokio::spawn(async move {
|
||
if let Err(e) = Self::handle_connection(stream, connections, session_manager).await {
|
||
error!("连接处理错误: {:?}", e);
|
||
}
|
||
});
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理连接
|
||
async fn handle_connection(
|
||
stream: TcpStream,
|
||
connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
session_manager: SessionManager,
|
||
) -> Result<(), SignalingError> {
|
||
let ws_stream = accept_async(stream).await?;
|
||
let (write, mut read) = ws_stream.split();
|
||
|
||
let connection_id = ConnectionId::new();
|
||
let connection = Connection {
|
||
id: connection_id,
|
||
write,
|
||
session_id: None,
|
||
};
|
||
|
||
// 存储连接
|
||
connections.write().await.insert(connection_id, connection.clone());
|
||
|
||
// 处理消息
|
||
while let Some(msg) = read.next().await {
|
||
match msg {
|
||
Ok(Message::Text(text)) => {
|
||
if let Err(e) = Self::handle_message(
|
||
text,
|
||
&connection_id,
|
||
&connections,
|
||
&session_manager,
|
||
).await {
|
||
error!("消息处理错误: {:?}", e);
|
||
}
|
||
}
|
||
Ok(Message::Close(_)) => {
|
||
info!("连接 {} 关闭", connection_id);
|
||
break;
|
||
}
|
||
Err(e) => {
|
||
error!("WebSocket 错误: {:?}", e);
|
||
break;
|
||
}
|
||
_ => {}
|
||
}
|
||
}
|
||
|
||
// 清理连接
|
||
connections.write().await.remove(&connection_id);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理消息
|
||
async fn handle_message(
|
||
text: String,
|
||
connection_id: &ConnectionId,
|
||
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
session_manager: &SessionManager,
|
||
) -> Result<(), SignalingError> {
|
||
// 解析消息
|
||
let msg: SignalingMessage = serde_json::from_str(&text)
|
||
.map_err(|e| SignalingError::InvalidMessage(format!("{:?}", e)))?;
|
||
|
||
match msg {
|
||
SignalingMessage::Offer { sdp, session_id } => {
|
||
// 处理 Offer
|
||
Self::handle_offer(sdp, session_id, connection_id, connections, session_manager).await?;
|
||
}
|
||
SignalingMessage::Answer { sdp, session_id } => {
|
||
// 处理 Answer
|
||
Self::handle_answer(sdp, session_id, connection_id, connections, session_manager).await?;
|
||
}
|
||
SignalingMessage::IceCandidate { candidate, sdp_mid, sdp_mline_index, session_id } => {
|
||
// 处理 ICE 候选
|
||
Self::handle_ice_candidate(candidate, sdp_mid, sdp_mline_index, session_id, connections).await?;
|
||
}
|
||
_ => {}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理 Offer
|
||
async fn handle_offer(
|
||
sdp: String,
|
||
session_id: String,
|
||
connection_id: &ConnectionId,
|
||
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
session_manager: &SessionManager,
|
||
) -> Result<(), SignalingError> {
|
||
// 创建会话
|
||
session_manager.create_session(session_id.clone())?;
|
||
|
||
// 更新连接的会话 ID
|
||
{
|
||
let mut connections_guard = connections.write().await;
|
||
if let Some(conn) = connections_guard.get_mut(connection_id) {
|
||
conn.session_id = Some(session_id.clone());
|
||
}
|
||
}
|
||
|
||
// 调用后端创建 Answer
|
||
let answer = session_manager.create_answer(session_id.clone(), sdp).await?;
|
||
|
||
// 发送 Answer
|
||
let response = SignalingMessage::Answer {
|
||
sdp: answer,
|
||
session_id,
|
||
};
|
||
|
||
let mut connections_guard = connections.write().await;
|
||
if let Some(conn) = connections_guard.get_mut(connection_id) {
|
||
let json = serde_json::to_string(&response)?;
|
||
conn.write.send(Message::Text(json)).await?;
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理 Answer
|
||
async fn handle_answer(
|
||
sdp: String,
|
||
session_id: String,
|
||
connection_id: &ConnectionId,
|
||
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
session_manager: &SessionManager,
|
||
) -> Result<(), SignalingError> {
|
||
// 设置远程描述
|
||
session_manager.set_remote_description(session_id, sdp).await?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理 ICE 候选
|
||
async fn handle_ice_candidate(
|
||
candidate: String,
|
||
sdp_mid: String,
|
||
sdp_mline_index: u16,
|
||
session_id: String,
|
||
connections: &Arc<RwLock<HashMap<ConnectionId, Connection>>>,
|
||
) -> Result<(), SignalingError> {
|
||
// 转发 ICE 候选到后端
|
||
// ...
|
||
Ok(())
|
||
}
|
||
|
||
/// 广播消息
|
||
pub async fn broadcast(&self, msg: SignalingMessage, session_id: &str) -> Result<(), SignalingError> {
|
||
let json = serde_json::to_string(&msg)?;
|
||
|
||
let connections = self.connections.read().await;
|
||
for connection in connections.values() {
|
||
if connection.session_id.as_deref() == Some(session_id) {
|
||
connection.write.send(Message::Text(json.clone())).await?;
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
由于文档篇幅较长,我将继续在下一部分完成剩余章节...
|
||
|
||
**(待续)**
|
||
## 4. 数据结构设计
|
||
|
||
### 4.1 核心数据结构
|
||
|
||
#### 4.1.1 帧相关结构
|
||
|
||
```rust
|
||
/// 捕获的帧
|
||
#[derive(Debug, Clone)]
|
||
pub struct CapturedFrame {
|
||
/// DMA-BUF 句柄
|
||
pub dma_buf: DmaBufHandle,
|
||
/// 宽度
|
||
pub width: u32,
|
||
/// 高度
|
||
pub height: u32,
|
||
/// 像素格式
|
||
pub format: PixelFormat,
|
||
/// 时间戳 (ns)
|
||
pub timestamp: u64,
|
||
/// 帧编号
|
||
pub frame_number: u64,
|
||
/// 损坏区域
|
||
pub damaged_regions: Vec<ScreenRegion>,
|
||
}
|
||
|
||
/// 编码后的帧
|
||
#[derive(Debug, Clone)]
|
||
pub struct EncodedFrame {
|
||
/// 编码数据 (零拷贝 Bytes)
|
||
pub data: Bytes,
|
||
/// 是否为关键帧
|
||
pub is_keyframe: bool,
|
||
/// 时间戳 (ns)
|
||
pub timestamp: u64,
|
||
/// 序列号
|
||
pub sequence_number: u64,
|
||
/// RTP 时间戳
|
||
pub rtp_timestamp: u32,
|
||
/// 帧类型
|
||
pub frame_type: FrameType,
|
||
/// 编码参数
|
||
pub encoding_params: EncodingParams,
|
||
}
|
||
|
||
/// 帧类型
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum FrameType {
|
||
I, // I 帧 (关键帧)
|
||
P, // P 帧 (预测帧)
|
||
B, // B 帧 (双向预测帧)
|
||
}
|
||
|
||
/// 编码参数
|
||
#[derive(Debug, Clone)]
|
||
pub struct EncodingParams {
|
||
/// 使用的比特率
|
||
pub bitrate: u32,
|
||
/// 量化参数
|
||
pub qp: u8,
|
||
/// 编码延迟 (ms)
|
||
pub encode_latency_ms: f64,
|
||
}
|
||
|
||
/// 像素格式
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum PixelFormat {
|
||
RGBA,
|
||
RGB,
|
||
YUV420,
|
||
YUV422,
|
||
YUV444,
|
||
NV12,
|
||
}
|
||
|
||
impl PixelFormat {
|
||
/// 获取每个像素的字节数
|
||
pub fn bytes_per_pixel(&self) -> u32 {
|
||
match self {
|
||
PixelFormat::RGBA => 4,
|
||
PixelFormat::RGB => 3,
|
||
PixelFormat::YUV420 => 3 / 2,
|
||
PixelFormat::YUV422 => 2,
|
||
PixelFormat::YUV444 => 3,
|
||
PixelFormat::NV12 => 3 / 2,
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 4.1.2 会话相关结构
|
||
|
||
```rust
|
||
/// 会话信息
|
||
#[derive(Debug, Clone)]
|
||
pub struct SessionInfo {
|
||
/// 会话 ID
|
||
pub session_id: String,
|
||
/// 客户端 ID
|
||
pub client_id: String,
|
||
/// 会话状态
|
||
pub state: SessionState,
|
||
/// 创建时间
|
||
pub created_at: Instant,
|
||
/// 最后活动时间
|
||
pub last_activity: Instant,
|
||
/// 视频配置
|
||
pub video_config: VideoConfig,
|
||
/// 网络配置
|
||
pub network_config: NetworkConfig,
|
||
/// 统计信息
|
||
pub stats: SessionStats,
|
||
}
|
||
|
||
/// 会话状态
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum SessionState {
|
||
Initializing,
|
||
Connecting,
|
||
Connected,
|
||
Streaming,
|
||
Paused,
|
||
Disconnected,
|
||
Error,
|
||
}
|
||
|
||
/// 视频配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct VideoConfig {
|
||
/// 编码器类型
|
||
pub encoder_type: EncoderType,
|
||
/// 分辨率
|
||
pub resolution: (u32, u32),
|
||
/// 帧率
|
||
pub frame_rate: u32,
|
||
/// 目标比特率
|
||
pub target_bitrate: u32,
|
||
/// 关键帧间隔
|
||
pub keyframe_interval: u32,
|
||
}
|
||
|
||
/// 网络配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct NetworkConfig {
|
||
/// ICE 服务器
|
||
pub ice_servers: Vec<IceServer>,
|
||
/// 最大比特率
|
||
pub max_bitrate: u32,
|
||
/// 最小比特率
|
||
pub min_bitrate: u32,
|
||
/// 拥塞控制策略
|
||
pub congestion_control: CongestionControlStrategy,
|
||
}
|
||
|
||
/// 拥塞控制策略
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum CongestionControlStrategy {
|
||
// Google Congestion Control
|
||
GCC,
|
||
// Transport Wide Congestion Control
|
||
TWCC,
|
||
// Fixed Rate
|
||
Fixed,
|
||
}
|
||
|
||
/// 会话统计信息
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct SessionStats {
|
||
/// 总帧数
|
||
pub total_frames: u64,
|
||
/// 关键帧数
|
||
pub keyframes: u64,
|
||
/// 丢帧数
|
||
pub dropped_frames: u64,
|
||
/// 总发送字节数
|
||
pub total_bytes_sent: u64,
|
||
/// 实际比特率
|
||
pub actual_bitrate: u32,
|
||
/// 平均延迟 (ms)
|
||
pub avg_latency_ms: f64,
|
||
/// P99 延迟 (ms)
|
||
pub p99_latency_ms: f64,
|
||
/// 丢包率
|
||
pub packet_loss_rate: f64,
|
||
}
|
||
```
|
||
|
||
### 4.2 内存管理
|
||
|
||
#### 4.2.1 内存池策略
|
||
|
||
**设计原则:**
|
||
1. 预分配策略:启动时预分配大部分缓冲区
|
||
2. 对象池模式:重用对象避免频繁分配
|
||
3. 所有权管理:使用 Rust 类型系统确保安全
|
||
4. 引用计数:Bytes 和 Arc 管理共享数据
|
||
|
||
**池类型:**
|
||
|
||
```rust
|
||
/// 通用对象池
|
||
pub struct ObjectPool<T> {
|
||
idle: VecDeque<T>,
|
||
factory: fn() -> T,
|
||
resetter: fn(&mut T),
|
||
max_size: usize,
|
||
min_size: usize,
|
||
}
|
||
|
||
impl<T> ObjectPool<T> {
|
||
pub fn new(
|
||
factory: fn() -> T,
|
||
resetter: fn(&mut T),
|
||
min_size: usize,
|
||
max_size: usize,
|
||
) -> Self {
|
||
let mut pool = Self {
|
||
idle: VecDeque::with_capacity(max_size),
|
||
factory,
|
||
resetter,
|
||
max_size,
|
||
min_size,
|
||
};
|
||
|
||
// 预分配到最小大小
|
||
for _ in 0..min_size {
|
||
pool.idle.push_back((pool.factory)());
|
||
}
|
||
|
||
pool
|
||
}
|
||
|
||
pub fn acquire(&mut self) -> T {
|
||
self.idle.pop_front()
|
||
.unwrap_or_else(|| (self.factory)())
|
||
}
|
||
|
||
pub fn release(&mut self, mut item: T) {
|
||
if self.idle.len() < self.max_size {
|
||
(self.resetter)(&mut item);
|
||
self.idle.push_back(item);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 4.2.2 内存监控
|
||
|
||
```rust
|
||
/// 内存监控器
|
||
pub struct MemoryMonitor {
|
||
/// 分配跟踪
|
||
allocations: HashMap<usize, AllocationRecord>,
|
||
/// 统计信息
|
||
stats: MemoryStats,
|
||
/// 告警阈值
|
||
thresholds: MemoryThresholds,
|
||
}
|
||
|
||
/// 分配记录
|
||
struct AllocationRecord {
|
||
size: usize,
|
||
type_: AllocationType,
|
||
location: &'static str,
|
||
allocated_at: Instant,
|
||
}
|
||
|
||
/// 内存统计
|
||
#[derive(Debug, Clone)]
|
||
pub struct MemoryStats {
|
||
/// 总分配字节数
|
||
pub total_allocated_bytes: u64,
|
||
/// 当前使用字节数
|
||
pub current_used_bytes: u64,
|
||
/// 峰值使用字节数
|
||
pub peak_used_bytes: u64,
|
||
/// 分配次数
|
||
pub allocation_count: u64,
|
||
/// 释放次数
|
||
pub free_count: u64,
|
||
}
|
||
|
||
impl MemoryMonitor {
|
||
pub fn new(thresholds: MemoryThresholds) -> Self {
|
||
Self {
|
||
allocations: HashMap::new(),
|
||
stats: MemoryStats::default(),
|
||
thresholds,
|
||
}
|
||
}
|
||
|
||
pub fn track_allocation(
|
||
&mut self,
|
||
size: usize,
|
||
type_: AllocationType,
|
||
location: &'static str,
|
||
) {
|
||
let id = self.stats.allocation_count as usize;
|
||
self.stats.allocation_count += 1;
|
||
self.stats.total_allocated_bytes += size as u64;
|
||
self.stats.current_used_bytes += size as u64;
|
||
|
||
if self.stats.current_used_bytes > self.stats.peak_used_bytes {
|
||
self.stats.peak_used_bytes = self.stats.current_used_bytes;
|
||
}
|
||
|
||
self.allocations.insert(id, AllocationRecord {
|
||
size,
|
||
type_,
|
||
location,
|
||
allocated_at: Instant::now(),
|
||
});
|
||
|
||
// 检查阈值
|
||
self.check_thresholds();
|
||
}
|
||
|
||
pub fn track_free(&mut self, size: usize) {
|
||
self.stats.free_count += 1;
|
||
self.stats.current_used_bytes -= size as u64;
|
||
}
|
||
|
||
fn check_thresholds(&self) {
|
||
if self.stats.current_used_bytes > self.thresholds.warning_bytes {
|
||
warn!("内存使用超过警告阈值: {} / {}",
|
||
self.stats.current_used_bytes,
|
||
self.thresholds.warning_bytes);
|
||
}
|
||
|
||
if self.stats.current_used_bytes > self.thresholds.critical_bytes {
|
||
error!("内存使用超过临界阈值: {} / {}",
|
||
self.stats.current_used_bytes,
|
||
self.thresholds.critical_bytes);
|
||
}
|
||
}
|
||
|
||
pub fn stats(&self) -> &MemoryStats {
|
||
&self.stats
|
||
}
|
||
}
|
||
```
|
||
|
||
### 4.3 状态管理
|
||
|
||
#### 4.3.1 状态机实现
|
||
|
||
```rust
|
||
/// 状态机 trait
|
||
pub trait StateMachine: Send + Sync {
|
||
type State;
|
||
type Event;
|
||
type Error;
|
||
|
||
/// 处理事件,返回新状态
|
||
fn handle_event(&self, state: Self::State, event: Self::Event)
|
||
-> Result<Self::State, Self::Error>;
|
||
|
||
/// 检查状态是否有效
|
||
fn is_valid(&self, state: &Self::State) -> bool;
|
||
|
||
/// 获取允许的转换
|
||
fn allowed_transitions(&self, state: &Self::State) -> Vec<Self::State>;
|
||
}
|
||
|
||
/// 捕获器状态机
|
||
pub struct CaptureStateMachine;
|
||
|
||
impl StateMachine for CaptureStateMachine {
|
||
type State = CaptureState;
|
||
type Event = CaptureEvent;
|
||
type Error = CaptureError;
|
||
|
||
fn handle_event(&self, state: Self::State, event: Self::Event)
|
||
-> Result<Self::State, Self::Error> {
|
||
match (state, event) {
|
||
(CaptureState::Idle, CaptureEvent::Start) => {
|
||
Ok(CaptureState::Connecting)
|
||
}
|
||
(CaptureState::Connecting, CaptureEvent::Connected) => {
|
||
Ok(CaptureState::Streaming)
|
||
}
|
||
(CaptureState::Streaming, CaptureEvent::Stop) => {
|
||
Ok(CaptureState::Stopping)
|
||
}
|
||
(CaptureState::Stopping, CaptureEvent::Stopped) => {
|
||
Ok(CaptureState::Idle)
|
||
}
|
||
(state, CaptureEvent::Error) => {
|
||
Ok(CaptureState::Error(state))
|
||
}
|
||
_ => Err(CaptureError::InvalidTransition),
|
||
}
|
||
}
|
||
|
||
fn is_valid(&self, state: &Self::State) -> bool {
|
||
!matches!(state, CaptureState::Error(_))
|
||
}
|
||
|
||
fn allowed_transitions(&self, state: &Self::State) -> Vec<Self::State> {
|
||
match state {
|
||
CaptureState::Idle => vec![
|
||
CaptureState::Connecting,
|
||
],
|
||
CaptureState::Connecting => vec![
|
||
CaptureState::Streaming,
|
||
CaptureState::Error(CaptureState::Connecting),
|
||
],
|
||
CaptureState::Streaming => vec![
|
||
CaptureState::Stopping,
|
||
CaptureState::Error(CaptureState::Streaming),
|
||
],
|
||
CaptureState::Stopping => vec![
|
||
CaptureState::Idle,
|
||
CaptureState::Error(CaptureState::Stopping),
|
||
],
|
||
CaptureState::Error(_) => vec![
|
||
CaptureState::Idle,
|
||
],
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum CaptureState {
|
||
Idle,
|
||
Connecting,
|
||
Streaming,
|
||
Stopping,
|
||
Error(CaptureState),
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
pub enum CaptureEvent {
|
||
Start,
|
||
Connected,
|
||
Stop,
|
||
Stopped,
|
||
Error,
|
||
}
|
||
```
|
||
|
||
## 5. 接口设计
|
||
|
||
### 5.1 公共 API
|
||
|
||
#### 5.1.1 主控制器接口
|
||
|
||
```rust
|
||
/// 远程桌面控制器
|
||
pub struct RemoteDesktopController {
|
||
/// 捕获管理器
|
||
capture_manager: Arc<Mutex<CaptureManager>>,
|
||
/// 编码器管道
|
||
encoder_pipeline: Arc<Mutex<EncoderPipeline>>,
|
||
/// WebRTC 传输
|
||
webrtc_transport: Arc<Mutex<WebRtcTransport>>,
|
||
/// 会话管理器
|
||
session_manager: Arc<Mutex<SessionManager>>,
|
||
}
|
||
|
||
impl RemoteDesktopController {
|
||
/// 创建新的控制器
|
||
pub async fn new(config: ControllerConfig) -> Result<Self, ControllerError> {
|
||
let capture_manager = Arc::new(Mutex::new(
|
||
CaptureManager::new(config.capture_config).await?
|
||
));
|
||
|
||
let encoder_pipeline = Arc::new(Mutex::new(
|
||
EncoderPipeline::new(config.encoder_config)?
|
||
));
|
||
|
||
let webrtc_transport = Arc::new(Mutex::new(
|
||
WebRtcTransport::new(config.webrtc_config).await?
|
||
));
|
||
|
||
let session_manager = Arc::new(Mutex::new(
|
||
SessionManager::new(config.session_config)
|
||
));
|
||
|
||
Ok(Self {
|
||
capture_manager,
|
||
encoder_pipeline,
|
||
webrtc_transport,
|
||
session_manager,
|
||
})
|
||
}
|
||
|
||
/// 启动会话
|
||
pub async fn start_session(&self, session_id: String) -> Result<(), ControllerError> {
|
||
// 启动捕获
|
||
self.capture_manager.lock().await.start().await?;
|
||
|
||
// 启动编码器
|
||
self.encoder_pipeline.lock().await.start()?;
|
||
|
||
// 创建 WebRTC 连接
|
||
self.webrtc_transport.lock().await
|
||
.create_peer_connection(session_id.clone()).await?;
|
||
|
||
// 启动处理循环
|
||
self.start_processing_loop(session_id).await?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 停止会话
|
||
pub async fn stop_session(&self, session_id: String) -> Result<(), ControllerError> {
|
||
// 停止捕获
|
||
self.capture_manager.lock().await.stop().await?;
|
||
|
||
// 停止编码器
|
||
self.encoder_pipeline.lock().await.stop()?;
|
||
|
||
// 关闭 WebRTC 连接
|
||
self.webrtc_transport.lock().await
|
||
.close_peer_connection(&session_id).await?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 更新配置
|
||
pub async fn update_config(
|
||
&self,
|
||
session_id: &str,
|
||
config: VideoConfig,
|
||
) -> Result<(), ControllerError> {
|
||
// 更新编码器配置
|
||
self.encoder_pipeline.lock().await
|
||
.reconfigure(config.into()).await?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 获取会话统计
|
||
pub async fn get_session_stats(&self, session_id: &str) -> Result<SessionStats, ControllerError> {
|
||
Ok(self.session_manager.lock().await
|
||
.get_stats(session_id)?)
|
||
}
|
||
}
|
||
|
||
/// 控制器配置
|
||
#[derive(Debug, Clone)]
|
||
pub struct ControllerConfig {
|
||
pub capture_config: CaptureConfig,
|
||
pub encoder_config: EncoderConfig,
|
||
pub webrtc_config: WebRtcConfig,
|
||
pub session_config: SessionConfig,
|
||
}
|
||
```
|
||
|
||
### 5.2 内部接口
|
||
|
||
#### 5.2.1 模块间通信接口
|
||
|
||
```rust
|
||
/// 帧接收接口
|
||
#[async_trait]
|
||
pub trait FrameReceiver: Send + Sync {
|
||
async fn receive_frame(&self) -> Result<CapturedFrame, ReceiverError>;
|
||
}
|
||
|
||
/// 帧发送接口
|
||
#[async_trait]
|
||
pub trait FrameSender: Send + Sync {
|
||
async fn send_frame(&self, frame: EncodedFrame) -> Result<(), SenderError>;
|
||
}
|
||
|
||
/// 编码接口
|
||
#[async_trait]
|
||
pub trait Encoder: Send + Sync {
|
||
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError>;
|
||
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError>;
|
||
}
|
||
|
||
/// 编码后的帧接收接口
|
||
#[async_trait]
|
||
pub trait EncodedFrameReceiver: Send + Sync {
|
||
async fn receive(&self) -> Result<EncodedFrame, ReceiverError>;
|
||
}
|
||
|
||
/// 会话管理接口
|
||
#[async_trait]
|
||
pub trait SessionManager: Send + Sync {
|
||
async fn create_session(&self, id: String) -> Result<(), SessionManagerError>;
|
||
async fn get_session(&self, id: &str) -> Option<SessionInfo>;
|
||
async fn update_session(&self, id: &str, info: SessionInfo) -> Result<(), SessionManagerError>;
|
||
async fn remove_session(&self, id: &str) -> Result<(), SessionManagerError>;
|
||
}
|
||
```
|
||
|
||
### 5.3 错误处理接口
|
||
|
||
#### 5.3.1 错误类型定义
|
||
|
||
```rust
|
||
/// 控制器错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum ControllerError {
|
||
#[error("捕获错误: {0}")]
|
||
Capture(#[from] CaptureError),
|
||
|
||
#[error("编码器错误: {0}")]
|
||
Encoder(#[from] EncoderError),
|
||
|
||
#[error("WebRTC 错误: {0}")]
|
||
WebRtc(#[from] WebRtcError),
|
||
|
||
#[error("会话错误: {0}")]
|
||
Session(#[from] SessionManagerError),
|
||
|
||
#[error("配置错误: {0}")]
|
||
Config(String),
|
||
|
||
#[error("内部错误: {0}")]
|
||
Internal(String),
|
||
}
|
||
|
||
/// 捕获错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum CaptureError {
|
||
#[error("PipeWire 初始化失败: {0}")]
|
||
PipeWireInitFailed(String),
|
||
|
||
#[error("权限被拒绝")]
|
||
PermissionDenied,
|
||
|
||
#[error("缓冲区获取失败")]
|
||
BufferAcquisitionFailed,
|
||
|
||
#[error("流状态错误: {0:?}")]
|
||
InvalidStreamState(StreamState),
|
||
|
||
#[error("超时")]
|
||
Timeout,
|
||
}
|
||
|
||
/// 编码器错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum EncoderError {
|
||
#[error("初始化失败: {0}")]
|
||
InitFailed(String),
|
||
|
||
#[error("编码失败: {0}")]
|
||
EncodeFailed(String),
|
||
|
||
#[error("重新配置失败: {0}")]
|
||
ReconfigureFailed(String),
|
||
|
||
#[error("不支持的格式: {0}")]
|
||
UnsupportedFormat(String),
|
||
|
||
#[error("硬件加速失败")]
|
||
HardwareAccelerationFailed,
|
||
}
|
||
|
||
/// WebRTC 错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum WebRtcError {
|
||
#[error("对等连接创建失败")]
|
||
PeerConnectionCreationFailed,
|
||
|
||
#[error("SDP 协商失败")]
|
||
SdpNegotiationFailed,
|
||
|
||
#[error("ICE 连接失败")]
|
||
IceConnectionFailed,
|
||
|
||
#[error("会话不存在: {0}")]
|
||
SessionNotFound(String),
|
||
|
||
#[error("数据通道错误")]
|
||
DataChannelError(String),
|
||
}
|
||
|
||
/// 会话管理错误
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum SessionManagerError {
|
||
#[error("会话已存在: {0}")]
|
||
SessionExists(String),
|
||
|
||
#[error("会话不存在: {0}")]
|
||
SessionNotFound(String),
|
||
|
||
#[error("无效的状态转换")]
|
||
InvalidStateTransition,
|
||
}
|
||
```
|
||
|
||
## 6. 性能优化
|
||
|
||
### 6.1 性能指标
|
||
|
||
#### 6.1.1 指标定义
|
||
|
||
```rust
|
||
/// 性能指标收集器
|
||
pub struct PerformanceMetrics {
|
||
/// 延迟指标
|
||
latency_metrics: LatencyMetrics,
|
||
/// 吞吐量指标
|
||
throughput_metrics: ThroughputMetrics,
|
||
/// 资源指标
|
||
resource_metrics: ResourceMetrics,
|
||
/// 质量指标
|
||
quality_metrics: QualityMetrics,
|
||
}
|
||
|
||
/// 延迟指标
|
||
#[derive(Debug, Clone)]
|
||
pub struct LatencyMetrics {
|
||
/// 端到端延迟 (P50)
|
||
pub end_to_end_p50_ms: f64,
|
||
/// 端到端延迟 (P95)
|
||
pub end_to_end_p95_ms: f64,
|
||
/// 端到端延迟 (P99)
|
||
pub end_to_end_p99_ms: f64,
|
||
/// 捕获延迟
|
||
pub capture_latency_ms: f64,
|
||
/// 编码延迟
|
||
pub encode_latency_ms: f64,
|
||
/// 传输延迟
|
||
pub transport_latency_ms: f64,
|
||
/// 渲染延迟
|
||
pub render_latency_ms: f64,
|
||
}
|
||
|
||
/// 吞吐量指标
|
||
#[derive(Debug, Clone)]
|
||
pub struct ThroughputMetrics {
|
||
/// 实际比特率 (bps)
|
||
pub actual_bitrate: u32,
|
||
/// 目标比特率 (bps)
|
||
pub target_bitrate: u32,
|
||
/// 帧率 (FPS)
|
||
pub frame_rate: f64,
|
||
/// 丢包率
|
||
pub packet_loss_rate: f64,
|
||
/// 丢帧率
|
||
pub frame_drop_rate: f64,
|
||
}
|
||
|
||
/// 资源指标
|
||
#[derive(Debug, Clone)]
|
||
pub struct ResourceMetrics {
|
||
/// CPU 使用率 (%)
|
||
pub cpu_usage: f64,
|
||
/// 内存使用量 (bytes)
|
||
pub memory_usage: u64,
|
||
/// GPU 使用率 (%)
|
||
pub gpu_usage: f64,
|
||
/// 网络带宽使用 (bps)
|
||
pub network_usage: u32,
|
||
}
|
||
|
||
/// 质量指标
|
||
#[derive(Debug, Clone)]
|
||
pub struct QualityMetrics {
|
||
/// PSNR
|
||
pub psnr: f64,
|
||
/// SSIM
|
||
pub ssim: f64,
|
||
/// 平均量化参数
|
||
pub avg_qp: f64,
|
||
/// 关键帧间隔
|
||
pub keyframe_interval: u32,
|
||
}
|
||
|
||
impl PerformanceMetrics {
|
||
pub fn new() -> Self {
|
||
Self {
|
||
latency_metrics: LatencyMetrics::default(),
|
||
throughput_metrics: ThroughputMetrics::default(),
|
||
resource_metrics: ResourceMetrics::default(),
|
||
quality_metrics: QualityMetrics::default(),
|
||
}
|
||
}
|
||
|
||
/// 更新延迟指标
|
||
pub fn update_latency(&mut self, category: LatencyCategory, value_ms: f64) {
|
||
match category {
|
||
LatencyCategory::Capture => {
|
||
self.latency_metrics.capture_latency_ms = value_ms;
|
||
}
|
||
LatencyCategory::Encode => {
|
||
self.latency_metrics.encode_latency_ms = value_ms;
|
||
}
|
||
LatencyCategory::Transport => {
|
||
self.latency_metrics.transport_latency_ms = value_ms;
|
||
}
|
||
LatencyCategory::Render => {
|
||
self.latency_metrics.render_latency_ms = value_ms;
|
||
}
|
||
LatencyCategory::EndToEnd => {
|
||
self.latency_metrics.end_to_end_p50_ms = value_ms;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 获取总体评分
|
||
pub fn overall_score(&self) -> PerformanceScore {
|
||
let latency_score = self.calculate_latency_score();
|
||
let throughput_score = self.calculate_throughput_score();
|
||
let resource_score = self.calculate_resource_score();
|
||
let quality_score = self.calculate_quality_score();
|
||
|
||
PerformanceScore {
|
||
latency: latency_score,
|
||
throughput: throughput_score,
|
||
resource: resource_score,
|
||
quality: quality_score,
|
||
overall: (latency_score + throughput_score + resource_score + quality_score) / 4.0,
|
||
}
|
||
}
|
||
|
||
fn calculate_latency_score(&self) -> f64 {
|
||
// 目标延迟 20ms,计算得分
|
||
let target = 20.0;
|
||
let current = self.latency_metrics.end_to_end_p95_ms;
|
||
(target / current).min(1.0) * 100.0
|
||
}
|
||
|
||
fn calculate_throughput_score(&self) -> f64 {
|
||
let efficiency = self.throughput_metrics.actual_bitrate as f64
|
||
/ self.throughput_metrics.target_bitrate as f64;
|
||
efficiency.min(1.0) * 100.0
|
||
}
|
||
|
||
fn calculate_resource_score(&self) -> f64 {
|
||
// CPU 和内存使用率的倒数
|
||
let cpu_score = (1.0 - self.resource_metrics.cpu_usage / 100.0) * 100.0;
|
||
let memory_score = (1.0 - self.resource_metrics.memory_usage as f64 / 512_000_000.0) * 100.0;
|
||
(cpu_score + memory_score) / 2.0
|
||
}
|
||
|
||
fn calculate_quality_score(&self) -> f64 {
|
||
self.quality_metrics.ssim * 100.0
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
pub struct PerformanceScore {
|
||
pub latency: f64,
|
||
pub throughput: f64,
|
||
pub resource: f64,
|
||
pub quality: f64,
|
||
pub overall: f64,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum LatencyCategory {
|
||
Capture,
|
||
Encode,
|
||
Transport,
|
||
Render,
|
||
EndToEnd,
|
||
}
|
||
```
|
||
|
||
### 6.2 优化策略
|
||
|
||
#### 6.2.1 延迟优化策略
|
||
|
||
```rust
|
||
/// 延迟优化器
|
||
pub struct LatencyOptimizer {
|
||
/// 当前策略
|
||
strategy: LatencyOptimizationStrategy,
|
||
/// 性能监控
|
||
monitor: PerformanceMonitor,
|
||
/// 调整间隔
|
||
adjustment_interval: Duration,
|
||
}
|
||
|
||
/// 延迟优化策略
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum LatencyOptimizationStrategy {
|
||
/// 极低延迟模式
|
||
UltraLow,
|
||
/// 低延迟模式
|
||
Low,
|
||
/// 平衡模式
|
||
Balanced,
|
||
/// 高质量模式
|
||
HighQuality,
|
||
}
|
||
|
||
impl LatencyOptimizer {
|
||
pub fn new(strategy: LatencyOptimizationStrategy) -> Self {
|
||
Self {
|
||
strategy,
|
||
monitor: PerformanceMonitor::new(),
|
||
adjustment_interval: Duration::from_millis(500),
|
||
}
|
||
}
|
||
|
||
/// 根据性能调整策略
|
||
pub async fn optimize(&mut self, metrics: &PerformanceMetrics) -> OptimizationAction {
|
||
let current_latency = metrics.latency_metrics.end_to_end_p95_ms;
|
||
|
||
match self.strategy {
|
||
LatencyOptimizationStrategy::UltraLow => {
|
||
if current_latency > 25.0 {
|
||
OptimizationAction::ReduceBuffering
|
||
} else if current_latency < 15.0 {
|
||
OptimizationAction::IncreaseQuality
|
||
} else {
|
||
OptimizationAction::None
|
||
}
|
||
}
|
||
LatencyOptimizationStrategy::Low => {
|
||
if current_latency > 40.0 {
|
||
OptimizationAction::ReduceFrameRate
|
||
} else if current_latency < 30.0 {
|
||
OptimizationAction::IncreaseFrameRate
|
||
} else {
|
||
OptimizationAction::None
|
||
}
|
||
}
|
||
LatencyOptimizationStrategy::Balanced => {
|
||
if current_latency > 60.0 {
|
||
OptimizationAction::ReduceBitrate
|
||
} else if current_latency < 40.0 {
|
||
OptimizationAction::IncreaseBitrate
|
||
} else {
|
||
OptimizationAction::None
|
||
}
|
||
}
|
||
LatencyOptimizationStrategy::HighQuality => {
|
||
if current_latency > 100.0 {
|
||
OptimizationAction::ReduceQuality
|
||
} else {
|
||
OptimizationAction::None
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 优化动作
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum OptimizationAction {
|
||
None,
|
||
ReduceBuffering,
|
||
IncreaseQuality,
|
||
ReduceFrameRate,
|
||
IncreaseFrameRate,
|
||
ReduceBitrate,
|
||
IncreaseBitrate,
|
||
ReduceQuality,
|
||
SwitchCodec,
|
||
}
|
||
```
|
||
|
||
#### 6.2.2 缓冲优化
|
||
|
||
```rust
|
||
/// 自适应缓冲控制器
|
||
pub struct AdaptiveBufferController {
|
||
/// 当前缓冲区大小
|
||
current_buffer_size: usize,
|
||
/// 最小缓冲区大小
|
||
min_buffer_size: usize,
|
||
/// 最大缓冲区大小
|
||
max_buffer_size: usize,
|
||
/// 延迟历史
|
||
latency_history: VecDeque<f64>,
|
||
}
|
||
|
||
impl AdaptiveBufferController {
|
||
pub fn new(initial_size: usize, min_size: usize, max_size: usize) -> Self {
|
||
Self {
|
||
current_buffer_size: initial_size,
|
||
min_buffer_size: min_size,
|
||
max_buffer_size: max_size,
|
||
latency_history: VecDeque::with_capacity(100),
|
||
}
|
||
}
|
||
|
||
/// 根据延迟调整缓冲区大小
|
||
pub fn adjust_buffer_size(&mut self, current_latency_ms: f64) -> BufferAdjustment {
|
||
self.latency_history.push_back(current_latency_ms);
|
||
|
||
if self.latency_history.len() > 100 {
|
||
self.latency_history.pop_front();
|
||
}
|
||
|
||
let avg_latency: f64 = self.latency_history.iter().sum::<f64>()
|
||
/ self.latency_history.len() as f64;
|
||
|
||
let target_latency = 25.0;
|
||
let adjustment_factor = (avg_latency / target_latency).ln();
|
||
|
||
let new_size = if adjustment_factor > 0.5 {
|
||
// 延迟太高,减少缓冲
|
||
self.current_buffer_size.saturating_sub(1).max(self.min_buffer_size)
|
||
} else if adjustment_factor < -0.5 {
|
||
// 延迟较低,可以增加缓冲
|
||
self.current_buffer_size.saturating_add(1).min(self.max_buffer_size)
|
||
} else {
|
||
self.current_buffer_size
|
||
};
|
||
|
||
BufferAdjustment {
|
||
old_size: self.current_buffer_size,
|
||
new_size,
|
||
action: if new_size > self.current_buffer_size {
|
||
BufferAction::Increase
|
||
} else if new_size < self.current_buffer_size {
|
||
BufferAction::Decrease
|
||
} else {
|
||
BufferAction::Maintain
|
||
},
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 缓冲调整
|
||
#[derive(Debug, Clone)]
|
||
pub struct BufferAdjustment {
|
||
pub old_size: usize,
|
||
pub new_size: usize,
|
||
pub action: BufferAction,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum BufferAction {
|
||
Increase,
|
||
Decrease,
|
||
Maintain,
|
||
}
|
||
```
|
||
|
||
### 6.3 性能监控
|
||
|
||
#### 6.3.1 实时监控
|
||
|
||
```rust
|
||
/// 性能监控器
|
||
pub struct PerformanceMonitor {
|
||
/// 延迟跟踪器
|
||
latency_tracker: LatencyTracker,
|
||
/// 吞吐量跟踪器
|
||
throughput_tracker: ThroughputTracker,
|
||
/// 资源跟踪器
|
||
resource_tracker: ResourceTracker,
|
||
}
|
||
|
||
/// 延迟跟踪器
|
||
pub struct LatencyTracker {
|
||
/// 延迟样本
|
||
samples: VecDeque<LatencySample>,
|
||
/// 最大样本数
|
||
max_samples: usize,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct LatencySample {
|
||
timestamp: Instant,
|
||
latency_ms: f64,
|
||
category: LatencyCategory,
|
||
}
|
||
|
||
impl LatencyTracker {
|
||
pub fn new(max_samples: usize) -> Self {
|
||
Self {
|
||
samples: VecDeque::with_capacity(max_samples),
|
||
max_samples,
|
||
}
|
||
}
|
||
|
||
pub fn record_latency(&mut self, latency_ms: f64, category: LatencyCategory) {
|
||
self.samples.push_back(LatencySample {
|
||
timestamp: Instant::now(),
|
||
latency_ms,
|
||
category,
|
||
});
|
||
|
||
if self.samples.len() > self.max_samples {
|
||
self.samples.pop_front();
|
||
}
|
||
}
|
||
|
||
pub fn get_percentile(&self, percentile: f64) -> Option<f64> {
|
||
if self.samples.is_empty() {
|
||
return None;
|
||
}
|
||
|
||
let mut latencies: Vec<f64> = self.samples.iter()
|
||
.map(|s| s.latency_ms)
|
||
.collect();
|
||
latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
|
||
|
||
let index = (latencies.len() as f64 * percentile / 100.0) as usize;
|
||
latencies.get(index).copied()
|
||
}
|
||
|
||
pub fn get_average(&self) -> Option<f64> {
|
||
if self.samples.is_empty() {
|
||
return None;
|
||
}
|
||
|
||
let sum: f64 = self.samples.iter()
|
||
.map(|s| s.latency_ms)
|
||
.sum();
|
||
Some(sum / self.samples.len() as f64)
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.4 调优指南
|
||
|
||
#### 6.4.1 调优参数
|
||
|
||
```rust
|
||
/// 调优参数集合
|
||
pub struct TuningParameters {
|
||
/// 编码器参数
|
||
pub encoder: EncoderTuningParams,
|
||
/// 网络参数
|
||
pub network: NetworkTuningParams,
|
||
/// 缓冲参数
|
||
pub buffer: BufferTuningParams,
|
||
}
|
||
|
||
/// 编码器调优参数
|
||
#[derive(Debug, Clone)]
|
||
pub struct EncoderTuningParams {
|
||
/// 比特率调整步长 (bps)
|
||
pub bitrate_step: u32,
|
||
/// 最小比特率
|
||
pub min_bitrate: u32,
|
||
/// 最大比特率
|
||
pub max_bitrate: u32,
|
||
/// GOP 大小
|
||
pub gop_size: u32,
|
||
/// B 帧数量
|
||
pub b_frames: u32,
|
||
/// 预设
|
||
pub preset: EncodePreset,
|
||
/// 调优
|
||
pub tune: EncodeTune,
|
||
}
|
||
|
||
/// 网络调优参数
|
||
#[derive(Debug, Clone)]
|
||
pub struct NetworkTuningParams {
|
||
/// NACK 窗口大小 (ms)
|
||
pub nack_window_ms: u32,
|
||
/// FEC 开关
|
||
pub fec_enabled: bool,
|
||
/// 最大重传次数
|
||
pub max_retransmissions: u32,
|
||
/// 拥塞控制算法
|
||
pub congestion_algorithm: CongestionControlAlgorithm,
|
||
}
|
||
|
||
/// 缓冲调优参数
|
||
#[derive(Debug, Clone)]
|
||
pub struct BufferTuningParams {
|
||
/// 最小缓冲区大小
|
||
pub min_buffer_size: usize,
|
||
/// 最大缓冲区大小
|
||
pub max_buffer_size: usize,
|
||
/// 初始缓冲区大小
|
||
pub initial_buffer_size: usize,
|
||
}
|
||
|
||
impl TuningParameters {
|
||
/// 根据场景创建调优参数
|
||
pub fn for_scenario(scenario: Scenario) -> Self {
|
||
match scenario {
|
||
Scenario::LowLatency => Self {
|
||
encoder: EncoderTuningParams {
|
||
bitrate_step: 500_000,
|
||
min_bitrate: 1_000_000,
|
||
max_bitrate: 8_000_000,
|
||
gop_size: 15,
|
||
b_frames: 0,
|
||
preset: EncodePreset::Ultrafast,
|
||
tune: EncodeTune::Zerolatency,
|
||
},
|
||
network: NetworkTuningParams {
|
||
nack_window_ms: 20,
|
||
fec_enabled: false,
|
||
max_retransmissions: 1,
|
||
congestion_algorithm: CongestionControlAlgorithm::GCC,
|
||
},
|
||
buffer: BufferTuningParams {
|
||
min_buffer_size: 2,
|
||
max_buffer_size: 5,
|
||
initial_buffer_size: 3,
|
||
},
|
||
},
|
||
Scenario::HighQuality => Self {
|
||
encoder: EncoderTuningParams {
|
||
bitrate_step: 1_000_000,
|
||
min_bitrate: 2_000_000,
|
||
max_bitrate: 15_000_000,
|
||
gop_size: 30,
|
||
b_frames: 2,
|
||
preset: EncodePreset::Faster,
|
||
tune: EncodeTune::Film,
|
||
},
|
||
network: NetworkTuningParams {
|
||
nack_window_ms: 100,
|
||
fec_enabled: true,
|
||
max_retransmissions: 3,
|
||
congestion_algorithm: CongestionControlAlgorithm::TWCC,
|
||
},
|
||
buffer: BufferTuningParams {
|
||
min_buffer_size: 5,
|
||
max_buffer_size: 10,
|
||
initial_buffer_size: 7,
|
||
},
|
||
},
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum Scenario {
|
||
LowLatency,
|
||
HighQuality,
|
||
Balanced,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum CongestionControlAlgorithm {
|
||
GCC,
|
||
TWCC,
|
||
}
|
||
```
|
||
|
||
## 7. 并发设计
|
||
|
||
### 7.1 线程模型
|
||
|
||
#### 7.1.1 线程架构
|
||
|
||
```
|
||
主线程 (Tokio Runtime)
|
||
├── 网络线程池 (处理 I/O)
|
||
│ ├── WebSocket 接收
|
||
│ ├── WebSocket 发送
|
||
│ ├── UDP 接收
|
||
│ └── UDP 发送
|
||
├── 编码线程池 (处理编码)
|
||
│ ├── 编码线程 1
|
||
│ ├── 编码线程 2
|
||
│ └── 编码线程 3
|
||
├── 处理线程 (处理帧)
|
||
│ ├── 帧处理任务
|
||
│ ┍ 统计收集
|
||
│ └── 配置更新
|
||
└── 监控线程 (后台任务)
|
||
├── 性能监控
|
||
├── 健康检查
|
||
└── 日志轮转
|
||
```
|
||
|
||
#### 7.1.2 线程分配
|
||
|
||
```rust
|
||
/// 线程池配置
|
||
pub struct ThreadPoolConfig {
|
||
/// 网络线程数
|
||
pub network_threads: usize,
|
||
/// 编码线程数
|
||
pub encoder_threads: usize,
|
||
/// 处理线程数
|
||
pub processing_threads: usize,
|
||
/// 监控线程数
|
||
pub monitoring_threads: usize,
|
||
}
|
||
|
||
impl ThreadPoolConfig {
|
||
/// 根据系统配置
|
||
pub fn auto() -> Self {
|
||
let num_cpus = num_cpus::get();
|
||
|
||
Self {
|
||
network_threads: 2,
|
||
encoder_threads: (num_cpus / 2).max(1),
|
||
processing_threads: (num_cpus / 4).max(1),
|
||
monitoring_threads: 1,
|
||
}
|
||
}
|
||
|
||
/// 创建 Tokio 运行时
|
||
pub fn create_runtime(&self) -> tokio::runtime::Runtime {
|
||
tokio::runtime::Builder::new_multi_thread()
|
||
.worker_threads(self.encoder_threads + self.processing_threads)
|
||
.thread_name("wl-webrtc-worker")
|
||
.enable_all()
|
||
.build()
|
||
.expect("Failed to create runtime")
|
||
}
|
||
}
|
||
```
|
||
|
||
### 7.2 同步机制
|
||
|
||
#### 7.2.1 锁的选择
|
||
|
||
| 场景 | 锁类型 | 原因 |
|
||
|------|--------|------|
|
||
| 配置读取 | `RwLock` | 读取频繁,写入很少 |
|
||
| 会话映射 | `RwLock` | 需要并发读取 |
|
||
| 编码器状态 | `Mutex` | 临界区短,简单 |
|
||
| 统计数据 | `RwLock` | 读取频繁 |
|
||
| 缓冲区池 | `Mutex` | 需要原子操作 |
|
||
| 无锁队列 | `crossbeam::queue` | 高性能通道 |
|
||
|
||
```rust
|
||
/// 使用 RwLock 的配置管理
|
||
pub struct ConfigManager {
|
||
config: Arc<RwLock<Config>>,
|
||
}
|
||
|
||
impl ConfigManager {
|
||
pub fn read_config(&self) -> Config {
|
||
self.config.read().unwrap().clone()
|
||
}
|
||
|
||
pub fn update_config(&self, new_config: Config) {
|
||
let mut config = self.config.write().unwrap();
|
||
*config = new_config;
|
||
}
|
||
}
|
||
|
||
/// 使用 Mutex 的编码器状态
|
||
pub struct EncoderState {
|
||
state: Arc<Mutex<EncoderStateInner>>,
|
||
}
|
||
|
||
struct EncoderStateInner {
|
||
is_encoding: bool,
|
||
current_bitrate: u32,
|
||
frame_count: u64,
|
||
}
|
||
|
||
impl EncoderState {
|
||
pub async fn update_state(&self, update: StateUpdate) {
|
||
let mut state = self.state.lock().await;
|
||
match update {
|
||
StateUpdate::SetEncoding(v) => state.is_encoding = v,
|
||
StateUpdate::SetBitrate(b) => state.current_bitrate = b,
|
||
StateUpdate::IncrementFrameCount => state.frame_count += 1,
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
pub enum StateUpdate {
|
||
SetEncoding(bool),
|
||
SetBitrate(u32),
|
||
IncrementFrameCount,
|
||
}
|
||
```
|
||
|
||
#### 7.2.2 无锁数据结构
|
||
|
||
```rust
|
||
/// 使用无锁队列的帧通道
|
||
pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
|
||
|
||
/// 无锁帧通道
|
||
pub type FrameSender = Sender<CapturedFrame>;
|
||
pub type FrameReceiver = Receiver<CapturedFrame>;
|
||
|
||
/// 创建帧通道
|
||
pub fn create_frame_channel(capacity: usize) -> (FrameSender, FrameReceiver) {
|
||
bounded(capacity)
|
||
}
|
||
|
||
/// 使用无锁队列的统计收集
|
||
pub struct LockFreeMetrics {
|
||
/// 帧计数
|
||
frame_count: AtomicU64,
|
||
/// 字节计数
|
||
byte_count: AtomicU64,
|
||
/// 延迟总和
|
||
latency_sum: AtomicU64,
|
||
/// 样本计数
|
||
sample_count: AtomicU64,
|
||
}
|
||
|
||
impl LockFreeMetrics {
|
||
pub fn new() -> Self {
|
||
Self {
|
||
frame_count: AtomicU64::new(0),
|
||
byte_count: AtomicU64::new(0),
|
||
latency_sum: AtomicU64::new(0),
|
||
sample_count: AtomicU64::new(0),
|
||
}
|
||
}
|
||
|
||
pub fn record_frame(&self, frame_size: usize, latency_ms: u64) {
|
||
self.frame_count.fetch_add(1, Ordering::Relaxed);
|
||
self.byte_count.fetch_add(frame_size as u64, Ordering::Relaxed);
|
||
self.latency_sum.fetch_add(latency_ms, Ordering::Relaxed);
|
||
self.sample_count.fetch_add(1, Ordering::Relaxed);
|
||
}
|
||
|
||
pub fn get_metrics(&self) -> MetricsSnapshot {
|
||
MetricsSnapshot {
|
||
frame_count: self.frame_count.load(Ordering::Relaxed),
|
||
byte_count: self.byte_count.load(Ordering::Relaxed),
|
||
avg_latency_ms: {
|
||
let sum = self.latency_sum.load(Ordering::Relaxed);
|
||
let count = self.sample_count.load(Ordering::Relaxed);
|
||
if count > 0 {
|
||
sum / count
|
||
} else {
|
||
0
|
||
}
|
||
},
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 7.3 任务调度
|
||
|
||
#### 7.3.1 优先级调度
|
||
|
||
```rust
|
||
/// 优先级任务调度器
|
||
pub struct PriorityScheduler {
|
||
/// 高优先级队列
|
||
high_priority: tokio::sync::mpsc::Sender<Task>,
|
||
/// 中优先级队列
|
||
medium_priority: tokio::sync::mpsc::Sender<Task>,
|
||
/// 低优先级队列
|
||
low_priority: tokio::sync::mpsc::Sender<Task>,
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
pub struct Task {
|
||
pub id: TaskId,
|
||
pub priority: TaskPriority,
|
||
pub operation: Operation,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||
pub enum TaskPriority {
|
||
High,
|
||
Medium,
|
||
Low,
|
||
}
|
||
|
||
impl PriorityScheduler {
|
||
pub fn new() -> Self {
|
||
let (high_tx, mut high_rx) = tokio::sync::mpsc::channel(100);
|
||
let (medium_tx, mut medium_rx) = tokio::sync::mpsc::channel(100);
|
||
let (low_tx, mut low_rx) = tokio::sync::mpsc::channel(100);
|
||
|
||
// 启动调度循环
|
||
tokio::spawn(async move {
|
||
loop {
|
||
tokio::select! {
|
||
biased;
|
||
|
||
// 高优先级
|
||
Some(task) = high_rx.recv() => {
|
||
Self::execute_task(task).await;
|
||
}
|
||
// 中优先级
|
||
Some(task) = medium_rx.recv() => {
|
||
Self::execute_task(task).await;
|
||
}
|
||
// 低优先级
|
||
Some(task) = low_rx.recv() => {
|
||
Self::execute_task(task).await;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
Self {
|
||
high_priority: high_tx,
|
||
medium_priority: medium_tx,
|
||
low_priority: low_tx,
|
||
}
|
||
}
|
||
|
||
pub async fn schedule(&self, task: Task) -> Result<(), ScheduleError> {
|
||
match task.priority {
|
||
TaskPriority::High => {
|
||
self.high_priority.send(task).await?;
|
||
}
|
||
TaskPriority::Medium => {
|
||
self.medium_priority.send(task).await?;
|
||
}
|
||
TaskPriority::Low => {
|
||
self.low_priority.send(task).await?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
async fn execute_task(task: Task) {
|
||
debug!("执行任务 {:?}, 优先级: {:?}", task.id, task.priority);
|
||
// 执行任务...
|
||
}
|
||
}
|
||
```
|
||
|
||
### 7.4 锁策略
|
||
|
||
#### 7.4.1 死锁预防
|
||
|
||
```rust
|
||
/// 锁顺序管理器
|
||
/// 通过强制锁获取顺序来预防死锁
|
||
pub struct LockOrderManager;
|
||
|
||
impl LockOrderManager {
|
||
/// 定义锁顺序
|
||
/// 较小的数字应该先获取
|
||
pub const LOCK_ORDER: &[LockId] = &[
|
||
LockId::Config,
|
||
LockId::Session,
|
||
LockId::Encoder,
|
||
LockId::Buffer,
|
||
];
|
||
|
||
/// 检查锁顺序是否正确
|
||
pub fn validate_order(held_locks: &[LockId], requested_lock: LockId) -> bool {
|
||
if held_locks.is_empty() {
|
||
return true;
|
||
}
|
||
|
||
let held_max = held_locks.iter().max().unwrap();
|
||
*held_max < requested_lock
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||
pub enum LockId {
|
||
Config = 1,
|
||
Session = 2,
|
||
Encoder = 3,
|
||
Buffer = 4,
|
||
}
|
||
|
||
/// RAII 锁保护
|
||
pub struct LockGuard<'a, T> {
|
||
data: &'a T,
|
||
acquired: bool,
|
||
}
|
||
|
||
impl<'a, T> LockGuard<'a, T> {
|
||
pub fn new(data: &'a T) -> Self {
|
||
Self {
|
||
data,
|
||
acquired: true,
|
||
}
|
||
}
|
||
|
||
pub fn get(&self) -> &T {
|
||
self.data
|
||
}
|
||
}
|
||
|
||
impl<'a, T> Drop for LockGuard<'a, T> {
|
||
fn drop(&mut self) {
|
||
if self.acquired {
|
||
// 自动释放锁
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 8. 网络设计
|
||
|
||
### 8.1 协议栈
|
||
|
||
#### 8.1.1 WebRTC 协议层次
|
||
|
||
```
|
||
┌─────────────────────────────────────┐
|
||
│ 应用层 │
|
||
│ (视频/音频/数据通道) │
|
||
└─────────────────────────────────────┘
|
||
┌─────────────────────────────────────┐
|
||
│ RTP/RTCP 层 │
|
||
│ (数据包/控制包) │
|
||
└─────────────────────────────────────┘
|
||
┌─────────────────────────────────────┐
|
||
│ DTLS/SRTP 层 │
|
||
│ (加密/认证) │
|
||
└─────────────────────────────────────┘
|
||
┌─────────────────────────────────────┐
|
||
│ ICE 层 │
|
||
│ (连接建立/候选选择) │
|
||
└─────────────────────────────────────┘
|
||
┌─────────────────────────────────────┐
|
||
│ UDP 层 │
|
||
│ (传输层) │
|
||
└─────────────────────────────────────┘
|
||
┌─────────────────────────────────────┐
|
||
│ IP 层 │
|
||
│ (网络层) │
|
||
└─────────────────────────────────────┘
|
||
```
|
||
|
||
### 8.2 数据格式
|
||
|
||
#### 8.2.1 RTP 数据包格式
|
||
|
||
```rust
|
||
/// RTP 数据包
|
||
#[derive(Debug, Clone)]
|
||
pub struct RtpPacket {
|
||
/// RTP 头部
|
||
pub header: RtpHeader,
|
||
/// 扩展头 (可选)
|
||
pub extension: Option<RtpExtension>,
|
||
/// 载荷
|
||
pub payload: Bytes,
|
||
}
|
||
|
||
/// RTP 头部 (12 字节)
|
||
#[derive(Debug, Clone, Copy)]
|
||
#[repr(C, packed)]
|
||
pub struct RtpHeader {
|
||
/// V:2, P, X, CC:4
|
||
pub byte0: u8,
|
||
/// M, PT:7
|
||
pub byte1: u8,
|
||
pub sequence_number: u16,
|
||
pub timestamp: u32,
|
||
pub ssrc: u32,
|
||
}
|
||
|
||
impl RtpHeader {
|
||
pub fn new(sequence_number: u16, timestamp: u32, ssrc: u32) -> Self {
|
||
Self {
|
||
byte0: 0x80, // Version 2
|
||
byte1: 0x00, // No marker, default PT
|
||
sequence_number,
|
||
timestamp,
|
||
ssrc,
|
||
}
|
||
}
|
||
|
||
pub fn version(&self) -> u8 {
|
||
(self.byte0 & 0xC0) >> 6
|
||
}
|
||
|
||
pub fn padding(&self) -> bool {
|
||
(self.byte0 & 0x20) != 0
|
||
}
|
||
|
||
pub fn extension(&self) -> bool {
|
||
(self.byte0 & 0x10) != 0
|
||
}
|
||
|
||
pub fn csrc_count(&self) -> u8 {
|
||
self.byte0 & 0x0F
|
||
}
|
||
|
||
pub fn marker(&self) -> bool {
|
||
(self.byte1 & 0x80) != 0
|
||
}
|
||
|
||
pub fn payload_type(&self) -> u8 {
|
||
self.byte1 & 0x7F
|
||
}
|
||
|
||
pub fn set_marker(&mut self, marker: bool) {
|
||
if marker {
|
||
self.byte1 |= 0x80;
|
||
} else {
|
||
self.byte1 &= 0x7F;
|
||
}
|
||
}
|
||
|
||
pub fn set_payload_type(&mut self, pt: u8) {
|
||
self.byte1 = (self.byte1 & 0x80) | (pt & 0x7F);
|
||
}
|
||
|
||
pub fn to_bytes(&self) -> [u8; 12] {
|
||
[
|
||
self.byte0,
|
||
self.byte1,
|
||
(self.sequence_number >> 8) as u8,
|
||
self.sequence_number as u8,
|
||
(self.timestamp >> 24) as u8,
|
||
(self.timestamp >> 16) as u8,
|
||
(self.timestamp >> 8) as u8,
|
||
self.timestamp as u8,
|
||
(self.ssrc >> 24) as u8,
|
||
(self.ssrc >> 16) as u8,
|
||
(self.ssrc >> 8) as u8,
|
||
self.ssrc as u8,
|
||
]
|
||
}
|
||
}
|
||
|
||
/// RTP 扩展头
|
||
#[derive(Debug, Clone)]
|
||
pub struct RtpExtension {
|
||
/// 扩展类型
|
||
pub extension_type: u16,
|
||
/// 扩展数据
|
||
pub data: Bytes,
|
||
}
|
||
|
||
/// RTP 载荷类型
|
||
pub const RTP_PAYLOAD_TYPE_H264: u8 = 96;
|
||
pub const RTP_PAYLOAD_TYPE_H265: u8 = 97;
|
||
pub const RTP_PAYLOAD_TYPE_VP9: u8 = 98;
|
||
pub const RTP_PAYLOAD_TYPE_AV1: u8 = 99;
|
||
```
|
||
|
||
#### 8.2.2 H.264 RTP 打包
|
||
|
||
```rust
|
||
/// H.264 打包器
|
||
pub struct H264RtpPacketizer {
|
||
/// 最大载荷大小
|
||
max_payload_size: usize,
|
||
/// 序列号
|
||
sequence_number: u16,
|
||
}
|
||
|
||
impl H264RtpPacketizer {
|
||
pub fn new(max_payload_size: usize) -> Self {
|
||
Self {
|
||
max_payload_size: max_payload_size - 2, // 留出 FU 头
|
||
sequence_number: rand::random(),
|
||
}
|
||
}
|
||
|
||
/// 打包 NAL 单元
|
||
pub fn packetize(&mut self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
|
||
-> Vec<RtpPacket>
|
||
{
|
||
let nalu_type = nalu[0] & 0x1F;
|
||
|
||
// NALU 太大,需要分片
|
||
if nalu.len() > self.max_payload_size {
|
||
self.packetize_fu_a(nalu, timestamp, is_keyframe)
|
||
} else {
|
||
// 单 NAL 单元包
|
||
vec![self.single_nalu_packet(nalu, timestamp, is_keyframe)]
|
||
}
|
||
}
|
||
|
||
/// 单 NAL 单元包
|
||
fn single_nalu_packet(&self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
|
||
-> RtpPacket
|
||
{
|
||
let mut header = RtpHeader::new(self.sequence_number, timestamp, 0);
|
||
header.set_marker(true);
|
||
header.set_payload_type(RTP_PAYLOAD_TYPE_H264);
|
||
|
||
RtpPacket {
|
||
header,
|
||
extension: None,
|
||
payload: Bytes::copy_from_slice(nalu),
|
||
}
|
||
}
|
||
|
||
/// FU-A 分片打包
|
||
fn packetize_fu_a(&mut self, nalu: &[u8], timestamp: u32, is_keyframe: bool)
|
||
-> Vec<RtpPacket>
|
||
{
|
||
let nalu_type = nalu[0] & 0x1F;
|
||
let nalu_payload = &nalu[1..];
|
||
let payload_size = nalu_payload.len();
|
||
let num_packets = (payload_size + self.max_payload_size - 1) / self.max_payload_size;
|
||
|
||
let mut packets = Vec::new();
|
||
|
||
for i in 0..num_packets {
|
||
let offset = i * self.max_payload_size;
|
||
let size = self.max_payload_size.min(payload_size - offset);
|
||
let payload = &nalu_payload[offset..offset + size];
|
||
|
||
let mut header = RtpHeader::new(self.sequence_number, timestamp, 0);
|
||
|
||
// 只有最后一个包设置 marker
|
||
header.set_marker(i == num_packets - 1);
|
||
header.set_payload_type(RTP_PAYLOAD_TYPE_H264);
|
||
|
||
// 创建 FU-A 头
|
||
let fu_indicator = (28 << 5) | nalu_type;
|
||
let fu_header = {
|
||
let mut h = 0;
|
||
if i == 0 {
|
||
h |= 0x80; // Start bit
|
||
}
|
||
if i == num_packets - 1 {
|
||
h |= 0x40; // End bit
|
||
}
|
||
h
|
||
};
|
||
|
||
let mut packet_payload = Vec::with_capacity(2 + size);
|
||
packet_payload.push(fu_indicator);
|
||
packet_payload.push(fu_header);
|
||
packet_payload.extend_from_slice(payload);
|
||
|
||
packets.push(RtpPacket {
|
||
header,
|
||
extension: None,
|
||
payload: Bytes::from(packet_payload),
|
||
});
|
||
|
||
self.sequence_number = self.sequence_number.wrapping_add(1);
|
||
}
|
||
|
||
packets
|
||
}
|
||
}
|
||
```
|
||
|
||
### 8.3 网络优化
|
||
|
||
#### 8.3.1 NAT 穿透策略
|
||
|
||
```rust
|
||
/// NAT 穿透管理器
|
||
pub struct NatTraversalManager {
|
||
/// STUN 服务器
|
||
stun_servers: Vec<String>,
|
||
/// TURN 服务器
|
||
turn_servers: Vec<TurnServerConfig>,
|
||
/// ICE 候选收集状态
|
||
ice_state: IceState,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
pub struct TurnServerConfig {
|
||
pub url: String,
|
||
pub username: String,
|
||
pub credential: String,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||
pub enum IceState {
|
||
Gathering,
|
||
GComplete,
|
||
Checking,
|
||
Connected,
|
||
Failed,
|
||
}
|
||
|
||
impl NatTraversalManager {
|
||
pub fn new(
|
||
stun_servers: Vec<String>,
|
||
turn_servers: Vec<TurnServerConfig>,
|
||
) -> Self {
|
||
Self {
|
||
stun_servers,
|
||
turn_servers,
|
||
ice_state: IceState::Gathering,
|
||
}
|
||
}
|
||
|
||
/// 获取 ICE 配置
|
||
pub fn get_ice_servers(&self) -> Vec<RTCIceServer> {
|
||
let mut servers = Vec::new();
|
||
|
||
// STUN 服务器
|
||
for stun in &self.stun_servers {
|
||
servers.push(RTCIceServer {
|
||
urls: vec![stun.clone()],
|
||
..Default::default()
|
||
});
|
||
}
|
||
|
||
// TURN 服务器
|
||
for turn in &self.turn_servers {
|
||
servers.push(RTCIceServer {
|
||
urls: vec![turn.url.clone()],
|
||
username: turn.username.clone(),
|
||
credential: turn.credential.clone(),
|
||
..Default::default()
|
||
});
|
||
}
|
||
|
||
servers
|
||
}
|
||
|
||
/// 选择最佳候选
|
||
pub fn select_best_candidate(&self, candidates: &[RTCIceCandidate]) -> Option<RTCIceCandidate> {
|
||
// 优先选择中继候选
|
||
if let Some(relay) = candidates.iter()
|
||
.find(|c| c.candidate_type == Some("relay".to_string()))
|
||
{
|
||
return Some(relay.clone());
|
||
}
|
||
|
||
// 其次选择 srflx (server reflexive)
|
||
if let Some(srflx) = candidates.iter()
|
||
.find(|c| c.candidate_type == Some("srflx".to_string()))
|
||
{
|
||
return Some(srflx.clone());
|
||
}
|
||
|
||
// 最后选择 host 候选
|
||
candidates.iter().find(|c| c.candidate_type == Some("host".to_string())).cloned()
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 8.3.2 拥塞控制
|
||
|
||
```rust
|
||
/// 拥塞控制器
|
||
pub trait CongestionController: Send + Sync {
|
||
/// 更新丢包信息
|
||
fn on_packet_loss(&mut self, loss_rate: f64);
|
||
|
||
/// 更新往返时间
|
||
fn on_rtt_update(&mut self, rtt_ms: u32);
|
||
|
||
/// 获取推荐比特率
|
||
fn get_target_bitrate(&self) -> u32;
|
||
|
||
/// 是否应该减少比特率
|
||
fn should_reduce_bitrate(&self) -> bool;
|
||
}
|
||
|
||
/// Google 拥塞控制 (GCC)
|
||
pub struct GccCongestionController {
|
||
/// 当前比特率
|
||
current_bitrate: u32,
|
||
/// 最小比特率
|
||
min_bitrate: u32,
|
||
/// 最大比特率
|
||
max_bitrate: u32,
|
||
/// 最近丢包率
|
||
recent_loss_rate: f64,
|
||
/// 最近 RTT
|
||
recent_rtt_ms: u32,
|
||
}
|
||
|
||
impl CongestionController for GccCongestionController {
|
||
fn on_packet_loss(&mut self, loss_rate: f64) {
|
||
self.recent_loss_rate = loss_rate;
|
||
|
||
// 丢包率 > 10%,显著降低比特率
|
||
if loss_rate > 0.10 {
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.8) as u32;
|
||
} else if loss_rate > 0.02 {
|
||
// 丢包率 2-10%,适度降低
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.9) as u32;
|
||
}
|
||
}
|
||
|
||
fn on_rtt_update(&mut self, rtt_ms: u32) {
|
||
self.recent_rtt_ms = rtt_ms;
|
||
|
||
// RTT > 200ms,降低比特率
|
||
if rtt_ms > 200 {
|
||
self.current_bitrate = (self.current_bitrate as f64 * 0.9) as u32;
|
||
}
|
||
}
|
||
|
||
fn get_target_bitrate(&self) -> u32 {
|
||
self.current_bitrate.clamp(self.min_bitrate, self.max_bitrate)
|
||
}
|
||
|
||
fn should_reduce_bitrate(&self) -> bool {
|
||
self.recent_loss_rate > 0.05 || self.recent_rtt_ms > 150
|
||
}
|
||
}
|
||
```
|
||
|
||
### 8.4 错误处理
|
||
|
||
#### 8.4.1 网络错误恢复
|
||
|
||
```rust
|
||
/// 网络错误处理器
|
||
pub struct NetworkErrorHandler {
|
||
/// 最大重试次数
|
||
max_retries: u32,
|
||
/// 重试延迟
|
||
retry_delay: Duration,
|
||
/// 当前重试次数
|
||
retry_count: AtomicU32,
|
||
}
|
||
|
||
impl NetworkErrorHandler {
|
||
pub fn new(max_retries: u32, retry_delay: Duration) -> Self {
|
||
Self {
|
||
max_retries,
|
||
retry_delay,
|
||
retry_count: AtomicU32::new(0),
|
||
}
|
||
}
|
||
|
||
/// 处理网络错误
|
||
pub async fn handle_error<T, F, Fut>(&self, operation: F) -> Result<T, NetworkError>
|
||
where
|
||
F: Fn() -> Fut,
|
||
Fut: Future<Output = Result<T, NetworkError>>,
|
||
{
|
||
loop {
|
||
match operation().await {
|
||
Ok(result) => {
|
||
// 成功,重置重试计数
|
||
self.retry_count.store(0, Ordering::Relaxed);
|
||
return Ok(result);
|
||
}
|
||
Err(e) => {
|
||
let count = self.retry_count.fetch_add(1, Ordering::Relaxed);
|
||
|
||
if count >= self.max_retries {
|
||
error!("超过最大重试次数: {}", count);
|
||
return Err(e);
|
||
}
|
||
|
||
warn!("网络错误,重试 ({}/{}): {:?}", count + 1, self.max_retries, e);
|
||
tokio::time::sleep(self.retry_delay).await;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 判断错误是否可恢复
|
||
pub fn is_recoverable(&self, error: &NetworkError) -> bool {
|
||
match error {
|
||
NetworkError::ConnectionReset => true,
|
||
NetworkError::Timeout => true,
|
||
NetworkError::TemporaryFailure => true,
|
||
_ => false,
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum NetworkError {
|
||
#[error("连接重置")]
|
||
ConnectionReset,
|
||
|
||
#[error("连接超时")]
|
||
Timeout,
|
||
|
||
#[error("临时故障")]
|
||
TemporaryFailure,
|
||
|
||
#[error("永久故障")]
|
||
PermanentFailure,
|
||
|
||
#[error("解析失败: {0}")]
|
||
ParseError(String),
|
||
}
|
||
```
|
||
|
||
## 9. 安全设计
|
||
|
||
### 9.1 认证授权
|
||
|
||
#### 9.1.1 会话认证
|
||
|
||
```rust
|
||
/// 认证管理器
|
||
pub struct AuthenticationManager {
|
||
/// 密钥管理
|
||
key_manager: KeyManager,
|
||
/// 会话令牌
|
||
tokens: Arc<RwLock<HashMap<String, SessionToken>>>,
|
||
}
|
||
|
||
impl AuthenticationManager {
|
||
pub fn new(key_manager: KeyManager) -> Self {
|
||
Self {
|
||
key_manager,
|
||
tokens: Arc::new(RwLock::new(HashMap::new())),
|
||
}
|
||
}
|
||
|
||
/// 生成会话令牌
|
||
pub async fn generate_token(&self, session_id: &str, ttl: Duration)
|
||
-> Result<String, AuthError>
|
||
{
|
||
let secret = self.key_manager.generate_secret()?;
|
||
|
||
let token = SessionToken {
|
||
session_id: session_id.to_string(),
|
||
secret,
|
||
issued_at: Instant::now(),
|
||
expires_at: Instant::now() + ttl,
|
||
};
|
||
|
||
let token_id = Uuid::new_v4().to_string();
|
||
self.tokens.write().await.insert(token_id.clone(), token);
|
||
|
||
Ok(token_id)
|
||
}
|
||
|
||
/// 验证令牌
|
||
pub async fn verify_token(&self, token_id: &str) -> Result<SessionToken, AuthError> {
|
||
let tokens = self.tokens.read().await;
|
||
let token = tokens.get(token_id)
|
||
.ok_or(AuthError::InvalidToken)?;
|
||
|
||
if token.expires_at < Instant::now() {
|
||
return Err(AuthError::TokenExpired);
|
||
}
|
||
|
||
Ok(token.clone())
|
||
}
|
||
|
||
/// 撤销令牌
|
||
pub async fn revoke_token(&self, token_id: &str) {
|
||
self.tokens.write().await.remove(token_id);
|
||
}
|
||
}
|
||
|
||
/// 会话令牌
|
||
#[derive(Debug, Clone)]
|
||
pub struct SessionToken {
|
||
pub session_id: String,
|
||
pub secret: Vec<u8>,
|
||
pub issued_at: Instant,
|
||
pub expires_at: Instant,
|
||
}
|
||
|
||
/// 密钥管理器
|
||
pub struct KeyManager {
|
||
master_key: Vec<u8>,
|
||
}
|
||
|
||
impl KeyManager {
|
||
pub fn new() -> Result<Self, CryptoError> {
|
||
let master_key = Self::generate_key()?;
|
||
Ok(Self { master_key })
|
||
}
|
||
|
||
fn generate_key() -> Result<Vec<u8>, CryptoError> {
|
||
let mut key = vec![0u8; 32];
|
||
getrandom::getrandom(&mut key)
|
||
.map_err(|e| CryptoError::KeyGenerationFailed(e.to_string()))?;
|
||
Ok(key)
|
||
}
|
||
|
||
pub fn generate_secret(&self) -> Result<Vec<u8>, CryptoError> {
|
||
Self::generate_key()
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum AuthError {
|
||
#[error("无效的令牌")]
|
||
InvalidToken,
|
||
|
||
#[error("令牌已过期")]
|
||
TokenExpired,
|
||
|
||
#[error("加密错误: {0}")]
|
||
CryptoError(#[from] CryptoError),
|
||
}
|
||
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum CryptoError {
|
||
#[error("密钥生成失败: {0}")]
|
||
KeyGenerationFailed(String),
|
||
|
||
#[error("加密失败: {0}")]
|
||
EncryptionFailed(String),
|
||
|
||
#[error("解密失败: {0}")]
|
||
DecryptionFailed(String),
|
||
}
|
||
```
|
||
|
||
### 9.2 数据加密
|
||
|
||
#### 9.2.1 SRTP 加密
|
||
|
||
```rust
|
||
/// SRTP 加密器
|
||
pub struct SrtpEncryptor {
|
||
/// 加密密钥
|
||
key: Vec<u8>,
|
||
/// 盐
|
||
salt: Vec<u8>,
|
||
/// 序列号
|
||
sequence_number: u32,
|
||
}
|
||
|
||
impl SrtpEncryptor {
|
||
pub fn new(key: Vec<u8>, salt: Vec<u8>) -> Self {
|
||
Self {
|
||
key,
|
||
salt,
|
||
sequence_number: 0,
|
||
}
|
||
}
|
||
|
||
/// 加密 RTP 数据包
|
||
pub fn encrypt(&mut self, packet: &mut RtpPacket) -> Result<(), CryptoError> {
|
||
// 使用 AES-GCM 加密
|
||
let nonce = self.generate_nonce(packet.header.sequence_number);
|
||
|
||
let cipher = aes_gcm::AesGcm::<aes::Aes256>::new_from_slice(&self.key)?;
|
||
|
||
let ciphertext = cipher.encrypt(&nonce, packet.payload.as_ref(), &[])
|
||
.map_err(|e| CryptoError::EncryptionFailed(e.to_string()))?;
|
||
|
||
packet.payload = Bytes::from(ciphertext);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 解密 RTP 数据包
|
||
pub fn decrypt(&mut self, packet: &mut RtpPacket) -> Result<(), CryptoError> {
|
||
let nonce = self.generate_nonce(packet.header.sequence_number);
|
||
|
||
let cipher = aes_gcm::AesGcm::<aes::Aes256>::new_from_slice(&self.key)?;
|
||
|
||
let plaintext = cipher.decrypt(&nonce, packet.payload.as_ref())
|
||
.map_err(|e| CryptoError::DecryptionFailed(e.to_string()))?;
|
||
|
||
packet.payload = Bytes::from(plaintext);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn generate_nonce(&self, sequence_number: u16) -> Vec<u8> {
|
||
// 结合盐和序列号生成 nonce
|
||
let mut nonce = self.salt.clone();
|
||
let seq_bytes = sequence_number.to_be_bytes();
|
||
nonce.extend_from_slice(&seq_bytes);
|
||
nonce.truncate(12); // AES-GCM 需要 12 字节 nonce
|
||
nonce
|
||
}
|
||
}
|
||
```
|
||
|
||
### 9.3 安全审计
|
||
|
||
#### 9.3.1 审计日志
|
||
|
||
```rust
|
||
/// 审计日志记录器
|
||
pub struct AuditLogger {
|
||
/// 日志写入器
|
||
writer: AuditLogWriter,
|
||
/// 配置
|
||
config: AuditConfig,
|
||
}
|
||
|
||
/// 审计日志条目
|
||
#[derive(Debug, Serialize)]
|
||
pub struct AuditEntry {
|
||
/// 时间戳
|
||
pub timestamp: DateTime<Utc>,
|
||
/// 会话 ID
|
||
pub session_id: String,
|
||
/// 用户 ID
|
||
pub user_id: String,
|
||
/// 事件类型
|
||
pub event_type: AuditEventType,
|
||
/// 事件详情
|
||
pub details: serde_json::Value,
|
||
/// IP 地址
|
||
pub ip_address: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
#[serde(rename_all = "snake_case")]
|
||
pub enum AuditEventType {
|
||
/// 会话创建
|
||
SessionCreated,
|
||
/// 会话结束
|
||
SessionEnded,
|
||
/// 认证成功
|
||
AuthenticationSuccess,
|
||
/// 认证失败
|
||
AuthenticationFailure,
|
||
/// 权限拒绝
|
||
AccessDenied,
|
||
/// 配置更改
|
||
ConfigChanged,
|
||
/// 错误
|
||
Error,
|
||
}
|
||
|
||
impl AuditLogger {
|
||
pub fn new(config: AuditConfig) -> Result<Self, AuditError> {
|
||
let writer = AuditLogWriter::new(&config.log_path)?;
|
||
Ok(Self { writer, config })
|
||
}
|
||
|
||
/// 记录审计事件
|
||
pub fn log(&self, entry: AuditEntry) -> Result<(), AuditError> {
|
||
let json = serde_json::to_string(&entry)?;
|
||
self.writer.write(&json)?;
|
||
|
||
// 同时输出到标准错误
|
||
eprintln!("[AUDIT] {}", json);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 记录会话创建
|
||
pub fn log_session_created(
|
||
&self,
|
||
session_id: String,
|
||
user_id: String,
|
||
ip_address: Option<String>,
|
||
) {
|
||
let entry = AuditEntry {
|
||
timestamp: Utc::now(),
|
||
session_id,
|
||
user_id,
|
||
event_type: AuditEventType::SessionCreated,
|
||
details: serde_json::json!({}),
|
||
ip_address,
|
||
};
|
||
|
||
if let Err(e) = self.log(entry) {
|
||
error!("审计日志写入失败: {:?}", e);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 9.4 防护措施
|
||
|
||
#### 9.4.1 速率限制
|
||
|
||
```rust
|
||
/// 速率限制器
|
||
pub struct RateLimiter {
|
||
/// 令牌桶
|
||
bucket: TokenBucket,
|
||
/// 最大突发
|
||
max_burst: u32,
|
||
}
|
||
|
||
/// 令牌桶
|
||
struct TokenBucket {
|
||
/// 容量
|
||
capacity: u32,
|
||
/// 令牌数
|
||
tokens: u32,
|
||
/// 速率 (tokens/秒)
|
||
rate: u32,
|
||
/// 最后更新
|
||
last_update: Instant,
|
||
}
|
||
|
||
impl RateLimiter {
|
||
pub fn new(rate: u32, max_burst: u32) -> Self {
|
||
Self {
|
||
bucket: TokenBucket {
|
||
capacity: max_burst,
|
||
tokens: max_burst,
|
||
rate,
|
||
last_update: Instant::now(),
|
||
},
|
||
max_burst,
|
||
}
|
||
}
|
||
|
||
/// 检查是否允许
|
||
pub fn check(&mut self) -> bool {
|
||
self.bucket.try_consume(1)
|
||
}
|
||
|
||
/// 检查是否允许消耗多个令牌
|
||
pub fn check_n(&mut self, n: u32) -> bool {
|
||
self.bucket.try_consume(n)
|
||
}
|
||
}
|
||
|
||
impl TokenBucket {
|
||
fn try_consume(&mut self, n: u32) -> bool {
|
||
// 更新令牌数
|
||
self.refill();
|
||
|
||
if self.tokens >= n {
|
||
self.tokens -= n;
|
||
true
|
||
} else {
|
||
false
|
||
}
|
||
}
|
||
|
||
fn refill(&mut self) {
|
||
let now = Instant::now();
|
||
let elapsed = now.duration_since(self.last_update).as_secs_f64();
|
||
self.last_update = now;
|
||
|
||
let new_tokens = (elapsed * self.rate as f64) as u32;
|
||
self.tokens = (self.tokens + new_tokens).min(self.capacity);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 10. 测试策略
|
||
|
||
### 10.1 测试层次
|
||
|
||
#### 10.1.1 测试金字塔
|
||
|
||
```
|
||
/\
|
||
/ \
|
||
/ E2E \ (端到端测试, 10%)
|
||
/--------\
|
||
/ 集成 \ (集成测试, 20%)
|
||
/------------\
|
||
/ 单元 \ (单元测试, 70%)
|
||
/----------------\
|
||
```
|
||
|
||
### 10.2 测试用例
|
||
|
||
#### 10.2.1 单元测试示例
|
||
|
||
```rust
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn test_dma_buf_handle_creation() {
|
||
let handle = DmaBufHandle {
|
||
fd: 42,
|
||
size: 1920 * 1080 * 4,
|
||
stride: 1920 * 4,
|
||
offset: 0,
|
||
id: None,
|
||
};
|
||
|
||
assert_eq!(handle.fd, 42);
|
||
assert_eq!(handle.size, 1920 * 1080 * 4);
|
||
}
|
||
|
||
#[test]
|
||
fn test_damage_tracker_full_frame() {
|
||
let mut tracker = DamageTracker::new(100, 10);
|
||
let frame = create_test_frame(1920, 1080);
|
||
|
||
let regions = tracker.update(&frame);
|
||
|
||
assert_eq!(regions.len(), 1);
|
||
assert_eq!(regions[0].width, 1920);
|
||
assert_eq!(regions[0].height, 1080);
|
||
}
|
||
|
||
#[test]
|
||
fn test_frame_pool_allocation() {
|
||
let mut pool = ObjectPool::new(
|
||
|| Vec::with_capacity(1024),
|
||
|v: &mut Vec<u8>| v.clear(),
|
||
10,
|
||
20,
|
||
);
|
||
|
||
let vec1 = pool.acquire();
|
||
let vec2 = pool.acquire();
|
||
|
||
pool.release(vec1);
|
||
pool.release(vec2);
|
||
|
||
assert_eq!(pool.idle.len(), 2);
|
||
}
|
||
|
||
fn create_test_frame(width: u32, height: u32) -> CapturedFrame {
|
||
CapturedFrame {
|
||
dma_buf: DmaBufHandle {
|
||
fd: 0,
|
||
size: (width * height * 4) as usize,
|
||
stride: width * 4,
|
||
offset: 0,
|
||
id: None,
|
||
},
|
||
width,
|
||
height,
|
||
format: PixelFormat::RGBA,
|
||
timestamp: 0,
|
||
frame_number: 0,
|
||
damaged_regions: vec![],
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 10.3 性能测试
|
||
|
||
#### 10.3.1 基准测试
|
||
|
||
```rust
|
||
#[cfg(test)]
|
||
mod benchmarks {
|
||
use super::*;
|
||
use std::time::Instant;
|
||
|
||
#[bench]
|
||
fn bench_encode_frame(b: &mut Bencher) {
|
||
let mut encoder = VaapiEncoder::new(create_test_config()).unwrap();
|
||
let frame = create_test_frame(1920, 1080);
|
||
|
||
b.iter(|| {
|
||
encoder.encode(frame.clone()).unwrap()
|
||
});
|
||
}
|
||
|
||
#[bench]
|
||
fn bench_rtp_packetize(b: &mut Bencher) {
|
||
let mut packetizer = H264RtpPacketizer::new(1200);
|
||
let frame = create_encoded_frame(50000);
|
||
|
||
b.iter(|| {
|
||
packetizer.packetize(&frame.data, 0, false)
|
||
});
|
||
}
|
||
|
||
fn create_test_config() -> EncoderConfig {
|
||
EncoderConfig {
|
||
encoder_type: EncoderType::H264_VAAPI,
|
||
width: 1920,
|
||
height: 1080,
|
||
frame_rate: 60,
|
||
bitrate: 4000000,
|
||
max_bitrate: 8000000,
|
||
min_bitrate: 1000000,
|
||
keyframe_interval: 30,
|
||
preset: EncodePreset::Ultrafast,
|
||
tune: EncodeTune::Zerolatency,
|
||
}
|
||
}
|
||
|
||
fn create_encoded_frame(size: usize) -> EncodedFrame {
|
||
EncodedFrame {
|
||
data: Bytes::from(vec![0u8; size]),
|
||
is_keyframe: false,
|
||
timestamp: 0,
|
||
sequence_number: 0,
|
||
rtp_timestamp: 0,
|
||
frame_type: FrameType::P,
|
||
encoding_params: EncodingParams {
|
||
bitrate: 4000000,
|
||
qp: 26,
|
||
encode_latency_ms: 5.0,
|
||
},
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 10.4 持续集成
|
||
|
||
#### 10.4.1 CI 配置示例
|
||
|
||
```yaml
|
||
# .github/workflows/ci.yml
|
||
name: CI
|
||
|
||
on:
|
||
push:
|
||
branches: [main, develop]
|
||
pull_request:
|
||
branches: [main]
|
||
|
||
jobs:
|
||
test:
|
||
runs-on: ubuntu-latest
|
||
|
||
steps:
|
||
- uses: actions/checkout@v3
|
||
|
||
- name: Install dependencies
|
||
run: |
|
||
sudo apt-get update
|
||
sudo apt-get install -y libva-dev vainfo
|
||
|
||
- name: Cache Rust
|
||
uses: actions/cache@v3
|
||
with:
|
||
path: ~/.cargo/registry
|
||
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
|
||
|
||
- name: Run tests
|
||
run: cargo test --all --verbose
|
||
|
||
- name: Run benchmarks
|
||
run: cargo bench --no-run
|
||
|
||
lint:
|
||
runs-on: ubuntu-latest
|
||
|
||
steps:
|
||
- uses: actions/checkout@v3
|
||
|
||
- name: Run Clippy
|
||
run: cargo clippy -- -D warnings
|
||
|
||
- name: Format check
|
||
run: cargo fmt -- --check
|
||
|
||
build:
|
||
runs-on: ubuntu-latest
|
||
|
||
steps:
|
||
- uses: actions/checkout@v3
|
||
|
||
- name: Build release
|
||
run: cargo build --release --all
|
||
```
|
||
|
||
## 11. 部署运维
|
||
|
||
### 11.1 部署方案
|
||
|
||
#### 11.1.1 生产环境架构
|
||
|
||
```
|
||
┌────────────────┐
|
||
│ 负载均衡器 │
|
||
│ (Nginx/HAProxy)│
|
||
└────────┬───────┘
|
||
│
|
||
┌────────────────────┼────────────────────┐
|
||
▼ ▼ ▼
|
||
┌───────────┐ ┌───────────┐ ┌───────────┐
|
||
│ 实例 1 │ │ 实例 2 │ │ 实例 3 │
|
||
│ (Rust后端) │ │ (Rust后端) │ │ (Rust后端) │
|
||
└───────────┘ └───────────┘ └───────────┘
|
||
│ │ │
|
||
└────────────────────┼────────────────────┘
|
||
│
|
||
┌────────▼───────┐
|
||
│ Redis 缓存 │
|
||
│ (会话状态) │
|
||
└────────────────┘
|
||
│
|
||
┌────────▼───────┐
|
||
│ PostgreSQL │
|
||
│ (持久化数据) │
|
||
└────────────────┘
|
||
```
|
||
|
||
### 11.2 配置管理
|
||
|
||
#### 11.2.1 配置文件格式
|
||
|
||
```toml
|
||
# config.toml
|
||
|
||
[server]
|
||
bind_addr = "0.0.0.0:8080"
|
||
signaling_addr = "0.0.0.0:8443"
|
||
max_sessions = 20
|
||
|
||
[encoder]
|
||
default_encoder_type = "h264_vaapi"
|
||
default_bitrate = 4000000
|
||
max_bitrate = 8000000
|
||
min_bitrate = 1000000
|
||
keyframe_interval = 30
|
||
|
||
[webrtc]
|
||
stun_servers = ["stun:stun.l.google.com:19302"]
|
||
# turn_servers = []
|
||
max_bitrate = 8000000
|
||
min_bitrate = 500000
|
||
start_bitrate = 4000000
|
||
|
||
[performance]
|
||
target_latency_ms = 25
|
||
max_buffer_size = 10
|
||
min_buffer_size = 2
|
||
|
||
[logging]
|
||
level = "info"
|
||
file = "/var/log/wl-webrtc/app.log"
|
||
max_size = "100MB"
|
||
max_files = 10
|
||
|
||
[monitoring]
|
||
enabled = true
|
||
metrics_port = 9090
|
||
health_check_port = 8081
|
||
|
||
[security]
|
||
enable_authentication = true
|
||
token_ttl_hours = 24
|
||
enable_audit_log = true
|
||
audit_log_path = "/var/log/wl-webrtc/audit.log"
|
||
```
|
||
|
||
### 11.3 监控告警
|
||
|
||
#### 11.3.1 Prometheus 指标
|
||
|
||
```rust
|
||
/// 指标收集器
|
||
pub struct MetricsCollector {
|
||
/// HTTP 请求计数器
|
||
http_requests_total: IntCounterVec,
|
||
/// 请求延迟直方图
|
||
request_duration_seconds: HistogramVec,
|
||
/// 活跃会话数
|
||
active_sessions: IntGauge,
|
||
/// 编码帧计数器
|
||
frames_encoded_total: IntCounterVec,
|
||
/// 延迟指标
|
||
latency_seconds: HistogramVec,
|
||
}
|
||
|
||
impl MetricsCollector {
|
||
pub fn new() -> Self {
|
||
let opts = Opts::new("wl_webrtc", "Wayland WebRTC Remote Desktop");
|
||
|
||
let http_requests_total = register_int_counter_vec!(
|
||
opts.clone(),
|
||
&["method", "endpoint", "status"],
|
||
"Total HTTP requests"
|
||
).unwrap();
|
||
|
||
let request_duration_seconds = register_histogram_vec!(
|
||
histogram_opts!("request_duration_seconds", "Request duration in seconds"),
|
||
&["method", "endpoint"]
|
||
).unwrap();
|
||
|
||
let active_sessions = register_int_gauge!(
|
||
"active_sessions",
|
||
"Number of active sessions"
|
||
).unwrap();
|
||
|
||
let frames_encoded_total = register_int_counter_vec!(
|
||
opts.clone(),
|
||
&["encoder_type", "frame_type"],
|
||
"Total frames encoded"
|
||
).unwrap();
|
||
|
||
let latency_seconds = register_histogram_vec!(
|
||
histogram_opts!("latency_seconds", "Latency in seconds"),
|
||
&["category"]
|
||
).unwrap();
|
||
|
||
Self {
|
||
http_requests_total,
|
||
request_duration_seconds,
|
||
active_sessions,
|
||
frames_encoded_total,
|
||
latency_seconds,
|
||
}
|
||
}
|
||
|
||
pub fn increment_active_sessions(&self) {
|
||
self.active_sessions.inc();
|
||
}
|
||
|
||
pub fn decrement_active_sessions(&self) {
|
||
self.active_sessions.dec();
|
||
}
|
||
|
||
pub fn record_frame_encoded(&self, encoder_type: &str, frame_type: &str) {
|
||
self.frames_encoded_total
|
||
.with_label_values(&[encoder_type, frame_type])
|
||
.inc();
|
||
}
|
||
|
||
pub fn record_latency(&self, category: &str, seconds: f64) {
|
||
self.latency_seconds
|
||
.with_label_values(&[category])
|
||
.observe(seconds);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 11.4 日志管理
|
||
|
||
#### 11.4.1 日志配置
|
||
|
||
```rust
|
||
/// 初始化日志系统
|
||
pub fn init_logging(config: &LoggingConfig) -> Result<(), LogError> {
|
||
let level = config.level.parse::<LevelFilter>()?;
|
||
|
||
// 控制台输出
|
||
let console_layer = fmt::layer()
|
||
.with_target(false)
|
||
.with_level(true)
|
||
.with_filter(level);
|
||
|
||
// 文件输出
|
||
let file_appender = RollingFileAppender::new(
|
||
Rotation::DAILY,
|
||
config.log_path.as_path(),
|
||
)?;
|
||
let (non_blocking, _guard) = non_blocking(file_appender);
|
||
let file_layer = fmt::layer()
|
||
.with_writer(non_blocking)
|
||
.with_filter(level);
|
||
|
||
// 性能指标
|
||
let perf_layer = tracing_subscriber::filter::Targets::new()
|
||
.with_target("wl_webrtc::performance", Level::DEBUG);
|
||
|
||
// 初始化
|
||
Registry::default()
|
||
.with(console_layer)
|
||
.with(file_layer)
|
||
.with(perf_layer)
|
||
.init();
|
||
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
### 11.5 故障处理
|
||
|
||
#### 11.5.1 常见问题排查
|
||
|
||
```markdown
|
||
## 常见问题排查手册
|
||
|
||
### 1. 高延迟问题
|
||
|
||
**症状:** 端到端延迟超过 100ms
|
||
|
||
**可能原因:**
|
||
- 网络带宽不足
|
||
- 编码器负载过高
|
||
- 缓冲区过大
|
||
|
||
**排查步骤:**
|
||
1. 检查网络带宽使用情况
|
||
```bash
|
||
iftop -i eth0
|
||
```
|
||
2. 查看编码器统计信息
|
||
```bash
|
||
curl http://localhost:9090/metrics | grep encode_latency
|
||
```
|
||
3. 调整缓冲区大小
|
||
```toml
|
||
[performance]
|
||
max_buffer_size = 5 # 从 10 减少到 5
|
||
```
|
||
|
||
### 2. 画面卡顿
|
||
|
||
**症状:** 帧率不稳定,画面卡顿
|
||
|
||
**可能原因:**
|
||
- 丢包率过高
|
||
- CPU/GPU 负载过高
|
||
- 编码器配置不当
|
||
|
||
**排查步骤:**
|
||
1. 检查丢包率
|
||
```bash
|
||
curl http://localhost:9090/metrics | grep packet_loss_rate
|
||
```
|
||
2. 检查系统资源
|
||
```bash
|
||
top -p $(pgrep wl-webrtc)
|
||
```
|
||
3. 检查编码器状态
|
||
```bash
|
||
journalctl -u wl-webrtc | grep encoder
|
||
```
|
||
|
||
### 3. 无法建立连接
|
||
|
||
**症状:** WebSocket 连接失败
|
||
|
||
**可能原因:**
|
||
- 端口被占用
|
||
- 防火墙阻止
|
||
- 证书问题
|
||
|
||
**排查步骤:**
|
||
1. 检查端口监听
|
||
```bash
|
||
netstat -tlnp | grep 8443
|
||
```
|
||
2. 检查防火墙
|
||
```bash
|
||
sudo iptables -L -n | grep 8443
|
||
```
|
||
3. 查看应用日志
|
||
```bash
|
||
journalctl -u wl-webrtc -f
|
||
```
|
||
```
|
||
|
||
## 12. 扩展设计
|
||
|
||
### 12.1 插件机制
|
||
|
||
#### 12.1.1 插件接口
|
||
|
||
```rust
|
||
/// 插件 trait
|
||
pub trait Plugin: Send + Sync {
|
||
/// 插件名称
|
||
fn name(&self) -> &str;
|
||
|
||
/// 插件版本
|
||
fn version(&self) -> &str;
|
||
|
||
/// 初始化插件
|
||
fn init(&mut self, context: &PluginContext) -> Result<(), PluginError>;
|
||
|
||
/// 关闭插件
|
||
fn shutdown(&mut self) -> Result<(), PluginError>;
|
||
|
||
/// 处理帧前回调
|
||
fn on_frame_before_encode(&self, frame: &mut CapturedFrame) -> Result<(), PluginError>;
|
||
|
||
/// 处理帧后回调
|
||
fn on_frame_after_encode(&self, frame: &EncodedFrame) -> Result<(), PluginError>;
|
||
}
|
||
|
||
/// 插件上下文
|
||
pub struct PluginContext {
|
||
/// 配置
|
||
pub config: Arc<RwLock<Config>>,
|
||
/// 会话管理器
|
||
pub session_manager: Arc<SessionManager>,
|
||
}
|
||
|
||
/// 插件管理器
|
||
pub struct PluginManager {
|
||
/// 加载的插件
|
||
plugins: Vec<Box<dyn Plugin>>,
|
||
/// 上下文
|
||
context: PluginContext,
|
||
}
|
||
|
||
impl PluginManager {
|
||
pub fn new(context: PluginContext) -> Self {
|
||
Self {
|
||
plugins: Vec::new(),
|
||
context,
|
||
}
|
||
}
|
||
|
||
/// 加载插件
|
||
pub fn load_plugin(&mut self, plugin: Box<dyn Plugin>) -> Result<(), PluginError> {
|
||
plugin.init(&self.context)?;
|
||
self.plugins.push(plugin);
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理帧前回调
|
||
pub fn on_frame_before_encode(&self, frame: &mut CapturedFrame) -> Result<(), PluginError> {
|
||
for plugin in &self.plugins {
|
||
plugin.on_frame_before_encode(frame)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// 处理帧后回调
|
||
pub fn on_frame_after_encode(&self, frame: &EncodedFrame) -> Result<(), PluginError> {
|
||
for plugin in &self.plugins {
|
||
plugin.on_frame_after_encode(frame)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
}
|
||
```
|
||
|
||
### 12.2 版本管理
|
||
|
||
#### 12.2.1 版本兼容性
|
||
|
||
```rust
|
||
/// 版本信息
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct VersionInfo {
|
||
/// 主版本
|
||
pub major: u8,
|
||
/// 次版本
|
||
pub minor: u8,
|
||
/// 补丁版本
|
||
pub patch: u8,
|
||
/// 预发布标识
|
||
pub prerelease: Option<String>,
|
||
/// 构建元数据
|
||
pub build_metadata: Option<String>,
|
||
}
|
||
|
||
impl VersionInfo {
|
||
pub fn current() -> Self {
|
||
Self {
|
||
major: env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
|
||
minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
|
||
patch: env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
|
||
prerelease: None,
|
||
build_metadata: None,
|
||
}
|
||
}
|
||
|
||
/// 检查兼容性
|
||
pub fn is_compatible_with(&self, other: &VersionInfo) -> bool {
|
||
// 主版本相同,次版本 >=
|
||
self.major == other.major && self.minor >= other.minor
|
||
}
|
||
}
|
||
|
||
/// 版本协商
|
||
pub struct VersionNegotiator;
|
||
|
||
impl VersionNegotiator {
|
||
/// 选择最佳版本
|
||
pub fn select_version(
|
||
client_versions: &[VersionInfo],
|
||
server_version: &VersionInfo,
|
||
) -> Option<VersionInfo> {
|
||
client_versions
|
||
.iter()
|
||
.filter(|v| server_version.is_compatible_with(v))
|
||
.max_by_key(|v| (v.major, v.minor, v.patch))
|
||
.cloned()
|
||
}
|
||
}
|
||
```
|
||
|
||
### 12.3 扩展点
|
||
|
||
#### 12.3.1 自定义编码器
|
||
|
||
```rust
|
||
/// 自定义编码器工厂
|
||
pub trait EncoderFactory: Send + Sync {
|
||
/// 创建编码器实例
|
||
fn create_encoder(&self, config: EncoderConfig) -> Result<Box<dyn VideoEncoder>, EncoderError>;
|
||
|
||
/// 检查是否支持
|
||
fn is_supported(&self) -> bool;
|
||
}
|
||
|
||
/// 自定义编码器示例
|
||
pub struct CustomEncoder {
|
||
// ...
|
||
}
|
||
|
||
#[async_trait]
|
||
impl VideoEncoder for CustomEncoder {
|
||
async fn encode(&mut self, frame: CapturedFrame) -> Result<EncodedFrame, EncoderError> {
|
||
// 自定义编码逻辑
|
||
Ok(EncodedFrame {
|
||
data: Bytes::new(),
|
||
is_keyframe: false,
|
||
timestamp: frame.timestamp,
|
||
sequence_number: 0,
|
||
rtp_timestamp: 0,
|
||
frame_type: FrameType::P,
|
||
encoding_params: EncodingParams {
|
||
bitrate: 0,
|
||
qp: 0,
|
||
encode_latency_ms: 0.0,
|
||
},
|
||
})
|
||
}
|
||
|
||
async fn reconfigure(&mut self, config: EncoderConfig) -> Result<(), EncoderError> {
|
||
Ok(())
|
||
}
|
||
|
||
fn stats(&self) -> EncoderStats {
|
||
EncoderStats::default()
|
||
}
|
||
|
||
fn capabilities(&self) -> EncoderCapabilities {
|
||
EncoderCapabilities::default()
|
||
}
|
||
}
|
||
```
|
||
|
||
## 附录
|
||
|
||
### A. 术语表
|
||
|
||
| 术语 | 全称 | 说明 |
|
||
|------|------|------|
|
||
| DMA-BUF | Direct Memory Access Buffer | Linux 内核提供的零拷贝缓冲区机制 |
|
||
| VA-API | Video Acceleration API | 视频加速 API,用于硬件加速 |
|
||
| NVENC | NVIDIA Encoder | NVIDIA GPU 硬件编码器 |
|
||
| WebRTC | Web Real-Time Communication | Web 实时通信标准 |
|
||
| SDP | Session Description Protocol | 会话描述协议 |
|
||
| ICE | Interactive Connectivity Establishment | 交互式连接建立 |
|
||
| STUN | Session Traversal Utilities for NAT | NAT 穿透工具 |
|
||
| TURN | Traversal Using Relays around NAT | 使用中继的 NAT 穿透 |
|
||
| RTP | Real-time Transport Protocol | 实时传输协议 |
|
||
| RTCP | Real-time Control Protocol | 实时控制协议 |
|
||
| DTLS | Datagram Transport Layer Security | 数据报传输层安全 |
|
||
| SRTP | Secure Real-time Transport Protocol | 安全实时传输协议 |
|
||
| NACK | Negative Acknowledgment | 否定确认 |
|
||
| FEC | Forward Error Correction | 前向纠错 |
|
||
| GOP | Group of Pictures | 图像组 |
|
||
| PSNR | Peak Signal-to-Noise Ratio | 峰值信噪比 |
|
||
| SSIM | Structural Similarity Index | 结构相似性指数 |
|
||
|
||
### B. 参考资料
|
||
|
||
#### 技术规范
|
||
- [WebRTC 规范](https://www.w3.org/TR/webrtc/)
|
||
- [RTP/RTCP 规范](https://tools.ietf.org/html/rfc3550)
|
||
- [H.264 规范](https://www.itu.int/rec/T-REC-H.264)
|
||
- [Wayland 协议](https://wayland.freedesktop.org/)
|
||
- [PipeWire 文档](https://pipewire.org/)
|
||
|
||
#### 库和框架
|
||
- [webrtc-rs](https://github.com/webrtc-rs/webrtc)
|
||
- [tokio](https://tokio.rs/)
|
||
- [tracing](https://docs.rs/tracing)
|
||
- [bytes](https://docs.rs/bytes)
|
||
|
||
#### 相关项目
|
||
- [Deskreen](https://deskreen.com/)
|
||
- [RustDesk](https://rustdesk.com/)
|
||
- [Sunshine](https://github.com/LizardByte/Sunshine)
|
||
|
||
### C. 配置示例
|
||
|
||
#### C.1 完整配置文件
|
||
|
||
```toml
|
||
# wl-webrtc.toml
|
||
|
||
[server]
|
||
# 服务器绑定地址
|
||
bind_addr = "0.0.0.0:8080"
|
||
# 信令服务器地址
|
||
signaling_addr = "0.0.0.0:8443"
|
||
# 最大并发会话数
|
||
max_sessions = 20
|
||
# 会话超时时间 (秒)
|
||
session_timeout = 300
|
||
|
||
[capture]
|
||
# 目标帧率
|
||
target_frame_rate = 60
|
||
# 损坏跟踪
|
||
enable_damage_tracking = true
|
||
# 损坏阈值 (像素)
|
||
damage_threshold = 100
|
||
# 最大损坏区域数
|
||
max_damaged_regions = 4
|
||
|
||
[encoder]
|
||
# 默认编码器: h264_vaapi, h264_nvenc, h264_x264
|
||
default_encoder_type = "h264_vaapi"
|
||
# 默认比特率 (bps)
|
||
default_bitrate = 4000000
|
||
# 最大比特率 (bps)
|
||
max_bitrate = 8000000
|
||
# 最小比特率 (bps)
|
||
min_bitrate = 1000000
|
||
# 关键帧间隔
|
||
keyframe_interval = 30
|
||
# 编码预设: ultrafast, superfast, veryfast
|
||
preset = "veryfast"
|
||
# 编码调优: zerolatency, film, animation
|
||
tune = "zerolatency"
|
||
|
||
[webrtc]
|
||
# STUN 服务器
|
||
stun_servers = [
|
||
"stun:stun.l.google.com:19302",
|
||
]
|
||
# TURN 服务器 (可选)
|
||
turn_servers = []
|
||
# 最大比特率
|
||
max_bitrate = 8000000
|
||
# 最小比特率
|
||
min_bitrate = 500000
|
||
# 起始比特率
|
||
start_bitrate = 4000000
|
||
# 播放延迟最小值 (ms)
|
||
playout_delay_min_ms = 0
|
||
# 播放延迟最大值 (ms)
|
||
playout_delay_max_ms = 20
|
||
# 启用 NACK
|
||
nack_enabled = true
|
||
# 启用 FEC
|
||
fec_enabled = false
|
||
# 拥塞控制: gcc, twcc
|
||
congestion_control = "gcc"
|
||
|
||
[performance]
|
||
# 目标延迟 (ms)
|
||
target_latency_ms = 25
|
||
# 最大缓冲区大小
|
||
max_buffer_size = 10
|
||
# 最小缓冲区大小
|
||
min_buffer_size = 2
|
||
# 初始缓冲区大小
|
||
initial_buffer_size = 3
|
||
# 性能监控间隔 (ms)
|
||
monitoring_interval_ms = 500
|
||
|
||
[logging]
|
||
# 日志级别: trace, debug, info, warn, error
|
||
level = "info"
|
||
# 日志文件路径
|
||
file = "/var/log/wl-webrtc/app.log"
|
||
# 最大日志文件大小
|
||
max_size = "100MB"
|
||
# 最大日志文件数量
|
||
max_files = 10
|
||
# 控制台输出
|
||
console = true
|
||
|
||
[monitoring]
|
||
# 启用监控
|
||
enabled = true
|
||
# Prometheus 指标端口
|
||
metrics_port = 9090
|
||
# 健康检查端口
|
||
health_check_port = 8081
|
||
# 性能分析端口
|
||
profiling_port = 0 # 0 表示禁用
|
||
|
||
[security]
|
||
# 启用认证
|
||
enable_authentication = true
|
||
# 令牌 TTL (小时)
|
||
token_ttl_hours = 24
|
||
# 启用审计日志
|
||
enable_audit_log = true
|
||
# 审计日志路径
|
||
audit_log_path = "/var/log/wl-webrtc/audit.log"
|
||
# 启用速率限制
|
||
enable_rate_limit = true
|
||
# 每秒最大请求数
|
||
max_requests_per_second = 100
|
||
|
||
[hardware]
|
||
# 启用硬件加速
|
||
hardware_acceleration = true
|
||
# VA-API 设备
|
||
va_device = "/dev/dri/renderD128"
|
||
# 首选编码器优先级
|
||
encoder_priority = ["h264_nvenc", "h264_vaapi", "h264_x264"]
|
||
|
||
[experimental]
|
||
# 启用实验性功能
|
||
enable_experimental = false
|
||
# 使用自定义 WebRTC 实现
|
||
use_custom_webrtc = false
|
||
# 启用性能分析
|
||
enable_profiling = false
|
||
```
|
||
|
||
---
|
||
|
||
**文档版本**: 1.0.0
|
||
**最后更新**: 2026-02-02
|
||
**维护者**: wl-webrtc 团队
|