Compare commits

..

2 Commits

Author SHA1 Message Date
dailz
d5679be3a4 fix(state_portal): replace expect() with bail-style error propagation (closes #9) 2026-06-06 20:19:51 +08:00
dailz
36f07c92e9 fix(state_portal): prevent shutdown deadlock on full bounded channel (closes #8)
shutdown() calls enc.flush() → drain_encoder() → tx.send() on a
crossbeam bounded(32) channel.  If the channel is full and the
receiver (webrtc_rx) is alive but not being drained, send() blocks
forever — a self-deadlock since both ends belong to the same struct.

Two-layer fix:
- avhw.rs: replace tx.send() with tx.try_send(); handle Full (drop
  frame) and Disconnected (set flag) separately.
- state_portal.rs: drop webrtc_rx before flushing in shutdown() so
  try_send returns Disconnected immediately.

Regression tests added for the channel semantics.
2026-06-06 20:02:09 +08:00
2 changed files with 66 additions and 9 deletions

View File

@@ -935,13 +935,22 @@ impl SwEncState {
let data: &[u8] = unsafe {
std::slice::from_raw_parts(raw.data, raw.size as usize)
};
if let Err(e) = tx.send(data.to_vec()) {
tracing::warn!(
"WebRTC channel send failed (receiver dropped): {} bytes lost",
e.0.len()
);
self.webrtc_disconnected = true;
break;
match tx.try_send(data.to_vec()) {
Ok(()) => {}
Err(crossbeam_channel::TrySendError::Full(frame)) => {
tracing::warn!(
"WebRTC channel full, dropping frame: {} bytes lost",
frame.len()
);
}
Err(crossbeam_channel::TrySendError::Disconnected(frame)) => {
tracing::warn!(
"WebRTC channel disconnected: {} bytes lost",
frame.len()
);
self.webrtc_disconnected = true;
break;
}
}
}
}

View File

@@ -164,7 +164,8 @@ impl StatePortal {
// 根据是否启用 WebRTC 选择不同的编码器构造方式
let enc = if let Some(ref tx) = self.webrtc_tx {
let paused = self.webrtc_paused.as_ref().expect("webrtc_paused must exist when webrtc_tx exists");
let paused = self.webrtc_paused.as_ref()
.ok_or_else(|| anyhow::anyhow!("internal invariant broken: webrtc_paused missing while WebRTC mode is active"))?;
avhw::SwEncState::new_webrtc(
&drm_path,
frame.width,
@@ -179,9 +180,11 @@ impl StatePortal {
)?
} else {
// MP4 模式:编码输出写入文件
let output_path = self.args.output.as_deref()
.ok_or_else(|| anyhow::anyhow!("--output is required in MP4 file output mode; use --port > 0 for WebRTC mode"))?;
avhw::SwEncState::new(
&drm_path,
std::path::Path::new(self.args.output.as_deref().expect("output required for MP4 mode")),
std::path::Path::new(output_path),
frame.width,
frame.height,
enc_width,
@@ -320,6 +323,9 @@ impl StatePortal {
///
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
pub fn shutdown(&mut self) {
// 先 drop receiver使 flush() 中的 try_send() 立即返回 Disconnected
// 而非在满通道上阻塞(修复 issue #8 死锁)
self.webrtc_rx = None;
if let Some(mut enc) = self.enc.take() {
if let Err(e) = enc.flush() {
tracing::error!("Flush error during shutdown: {e}");
@@ -553,4 +559,46 @@ mod tests {
assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4);
}
// ── issue #8 regression ──
#[test]
fn try_send_full_channel_returns_full_not_block() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
tx.send(vec![1]).unwrap();
tx.send(vec![2]).unwrap();
assert!(matches!(
tx.try_send(vec![3]),
Err(crossbeam_channel::TrySendError::Full(_))
));
assert_eq!(rx.len(), 2);
}
#[test]
fn try_send_after_rx_dropped_returns_disconnected() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
drop(rx);
assert!(matches!(
tx.try_send(vec![1]),
Err(crossbeam_channel::TrySendError::Disconnected(_))
));
}
// given: full bounded channel
// when: rx is dropped, then try_send
// expect: Disconnected, not blocking
#[test]
fn shutdown_rx_drop_prevents_deadlock_on_full_channel() {
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
tx.send(vec![1]).unwrap();
tx.send(vec![2]).unwrap();
drop(rx);
assert!(matches!(
tx.try_send(vec![3]),
Err(crossbeam_channel::TrySendError::Disconnected(_))
));
}
}