fix(state_portal): prevent shutdown deadlock on full bounded channel (closes #8)
shutdown() calls enc.flush() → drain_encoder() → tx.send() on a crossbeam bounded(32) channel. If the channel is full and the receiver (webrtc_rx) is alive but not being drained, send() blocks forever — a self-deadlock since both ends belong to the same struct. Two-layer fix: - avhw.rs: replace tx.send() with tx.try_send(); handle Full (drop frame) and Disconnected (set flag) separately. - state_portal.rs: drop webrtc_rx before flushing in shutdown() so try_send returns Disconnected immediately. Regression tests added for the channel semantics.
This commit is contained in:
23
src/avhw.rs
23
src/avhw.rs
@@ -935,13 +935,22 @@ impl SwEncState {
|
||||
let data: &[u8] = unsafe {
|
||||
std::slice::from_raw_parts(raw.data, raw.size as usize)
|
||||
};
|
||||
if let Err(e) = tx.send(data.to_vec()) {
|
||||
tracing::warn!(
|
||||
"WebRTC channel send failed (receiver dropped): {} bytes lost",
|
||||
e.0.len()
|
||||
);
|
||||
self.webrtc_disconnected = true;
|
||||
break;
|
||||
match tx.try_send(data.to_vec()) {
|
||||
Ok(()) => {}
|
||||
Err(crossbeam_channel::TrySendError::Full(frame)) => {
|
||||
tracing::warn!(
|
||||
"WebRTC channel full, dropping frame: {} bytes lost",
|
||||
frame.len()
|
||||
);
|
||||
}
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(frame)) => {
|
||||
tracing::warn!(
|
||||
"WebRTC channel disconnected: {} bytes lost",
|
||||
frame.len()
|
||||
);
|
||||
self.webrtc_disconnected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,6 +320,9 @@ impl StatePortal {
|
||||
///
|
||||
/// 使用 `enc.take()` 确保编码器只被 flush 一次,即使多次调用也安全(幂等)。
|
||||
pub fn shutdown(&mut self) {
|
||||
// 先 drop receiver,使 flush() 中的 try_send() 立即返回 Disconnected
|
||||
// 而非在满通道上阻塞(修复 issue #8 死锁)
|
||||
self.webrtc_rx = None;
|
||||
if let Some(mut enc) = self.enc.take() {
|
||||
if let Err(e) = enc.flush() {
|
||||
tracing::error!("Flush error during shutdown: {e}");
|
||||
@@ -553,4 +556,46 @@ mod tests {
|
||||
assert_eq!(desc.layers[0].planes[0].pitch, 3840 * 4);
|
||||
}
|
||||
|
||||
// ── issue #8 regression ──
|
||||
|
||||
#[test]
|
||||
fn try_send_full_channel_returns_full_not_block() {
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
|
||||
tx.send(vec![1]).unwrap();
|
||||
tx.send(vec![2]).unwrap();
|
||||
|
||||
assert!(matches!(
|
||||
tx.try_send(vec![3]),
|
||||
Err(crossbeam_channel::TrySendError::Full(_))
|
||||
));
|
||||
assert_eq!(rx.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_send_after_rx_dropped_returns_disconnected() {
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
|
||||
drop(rx);
|
||||
|
||||
assert!(matches!(
|
||||
tx.try_send(vec![1]),
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(_))
|
||||
));
|
||||
}
|
||||
|
||||
// given: full bounded channel
|
||||
// when: rx is dropped, then try_send
|
||||
// expect: Disconnected, not blocking
|
||||
#[test]
|
||||
fn shutdown_rx_drop_prevents_deadlock_on_full_channel() {
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(2);
|
||||
tx.send(vec![1]).unwrap();
|
||||
tx.send(vec![2]).unwrap();
|
||||
drop(rx);
|
||||
|
||||
assert!(matches!(
|
||||
tx.try_send(vec![3]),
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(_))
|
||||
));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user