From 36f07c92e96a2da62579317e8106701d0784638c Mon Sep 17 00:00:00 2001 From: dailz Date: Sat, 6 Jun 2026 20:02:09 +0800 Subject: [PATCH] fix(state_portal): prevent shutdown deadlock on full bounded channel (closes #8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit shutdown() calls enc.flush() → drain_encoder() → tx.send() on a crossbeam bounded(32) channel. If the channel is full and the receiver (webrtc_rx) is alive but not being drained, send() blocks forever — a self-deadlock since both ends belong to the same struct. Two-layer fix: - avhw.rs: replace tx.send() with tx.try_send(); handle Full (drop frame) and Disconnected (set flag) separately. - state_portal.rs: drop webrtc_rx before flushing in shutdown() so try_send returns Disconnected immediately. Regression tests added for the channel semantics. --- src/avhw.rs | 23 ++++++++++++++++------- src/state_portal.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/avhw.rs b/src/avhw.rs index 57643aa..aebc5b8 100644 --- a/src/avhw.rs +++ b/src/avhw.rs @@ -935,13 +935,22 @@ impl SwEncState { let data: &[u8] = unsafe { std::slice::from_raw_parts(raw.data, raw.size as usize) }; - if let Err(e) = tx.send(data.to_vec()) { - tracing::warn!( - "WebRTC channel send failed (receiver dropped): {} bytes lost", - e.0.len() - ); - self.webrtc_disconnected = true; - break; + match tx.try_send(data.to_vec()) { + Ok(()) => {} + Err(crossbeam_channel::TrySendError::Full(frame)) => { + tracing::warn!( + "WebRTC channel full, dropping frame: {} bytes lost", + frame.len() + ); + } + Err(crossbeam_channel::TrySendError::Disconnected(frame)) => { + tracing::warn!( + "WebRTC channel disconnected: {} bytes lost", + frame.len() + ); + self.webrtc_disconnected = true; + break; + } } } } diff --git a/src/state_portal.rs b/src/state_portal.rs index 3fba65a..9fc3148 100644 --- a/src/state_portal.rs +++ b/src/state_portal.rs @@ -320,6 +320,9 @@ impl StatePortal { /// /// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。 pub fn shutdown(&mut self) { + // 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected + // 而非在满通道上阻塞(修复 issue #8 死锁) + self.webrtc_rx = None; if let Some(mut enc) = self.enc.take() { if let Err(e) = enc.flush() { tracing::error!("Flush error during shutdown: {e}"); @@ -553,4 +556,46 @@ mod tests { assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4); } + // ── issue #8 regression ── + + #[test] + fn try_send_full_channel_returns_full_not_block() { + let (tx, rx) = crossbeam_channel::bounded::>(2); + tx.send(vec![1]).unwrap(); + tx.send(vec![2]).unwrap(); + + assert!(matches!( + tx.try_send(vec![3]), + Err(crossbeam_channel::TrySendError::Full(_)) + )); + assert_eq!(rx.len(), 2); + } + + #[test] + fn try_send_after_rx_dropped_returns_disconnected() { + let (tx, rx) = crossbeam_channel::bounded::>(2); + drop(rx); + + assert!(matches!( + tx.try_send(vec![1]), + Err(crossbeam_channel::TrySendError::Disconnected(_)) + )); + } + + // given: full bounded channel + // when: rx is dropped, then try_send + // expect: Disconnected, not blocking + #[test] + fn shutdown_rx_drop_prevents_deadlock_on_full_channel() { + let (tx, rx) = crossbeam_channel::bounded::>(2); + tx.send(vec![1]).unwrap(); + tx.send(vec![2]).unwrap(); + drop(rx); + + assert!(matches!( + tx.try_send(vec![3]), + Err(crossbeam_channel::TrySendError::Disconnected(_)) + )); + } + }