From 0d88e933e66e753930272ef7daf5aebf5c7122e1 Mon Sep 17 00:00:00 2001 From: dailz Date: Tue, 9 Jun 2026 15:14:37 +0800 Subject: [PATCH] fix(io): replace blocking channel sends with cancel-aware alternatives (closes #16) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background worker threads used blocking tx.send() on bounded channels. If the consumer stopped draining, threads hung forever with no way to reach the cancel check. Drop-issued cancellation was ineffective. Changes: - Progress messages: tx.try_send() (discard if full, never blocks loop) - Terminal messages (Complete/Error): new send_cancelable() helper using crossbeam select! — sleeps efficiently until send succeeds or cancel arrives - Drop cancellation: tx.try_send() — Drop must never block - spawn_visual_height_rebuild: same fix for its bounded(1) channel - 5 new tests covering full-channel + cancel scenarios --- crates/core/src/io/progressive_reader.rs | 151 ++++++++++++++++++++--- 1 file changed, 134 insertions(+), 17 deletions(-) diff --git a/crates/core/src/io/progressive_reader.rs b/crates/core/src/io/progressive_reader.rs index 782aac6..e0ba5ed 100644 --- a/crates/core/src/io/progressive_reader.rs +++ b/crates/core/src/io/progressive_reader.rs @@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex; use crate::io::line_sampler::sample_line_count; use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN}; +// ─── Cancel-aware channel helpers ──────────────────────────────────────────── + +/// Send a message on `tx`, but abort if `cancel_rx` fires first. +/// +/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead +/// of busy-looping. Used for terminal messages (Complete / Error) that must +/// not be silently dropped while the receiver is still alive. +fn send_cancelable( + tx: &crossbeam_channel::Sender, + msg: T, + cancel_rx: &crossbeam_channel::Receiver<()>, +) { + crossbeam_channel::select! { + send(tx, msg) -> _ => {} + recv(cancel_rx) -> _ => {} + } +} + // ─── IndexerMessage ────────────────────────────────────────────────────────── pub enum IndexerMessage { @@ -270,20 +288,20 @@ pub fn spawn_indexer( let file = match std::fs::File::open(&path) { Ok(f) => f, Err(e) => { - let _ = tx.send(IndexerMessage::Error { + send_cancelable(&tx, IndexerMessage::Error { generation, message: e.to_string(), - }); + }, &cancel_rx); return; } }; let target_len = match file.metadata() { Ok(m) => m.len(), Err(e) => { - let _ = tx.send(IndexerMessage::Error { + send_cancelable(&tx, IndexerMessage::Error { generation, message: e.to_string(), - }); + }, &cancel_rx); return; } }; @@ -300,10 +318,10 @@ pub fn spawn_indexer( let buf = match buf_reader.fill_buf() { Ok(b) => b, Err(e) => { - let _ = tx.send(IndexerMessage::Error { + send_cancelable(&tx, IndexerMessage::Error { generation, message: e.to_string(), - }); + }, &cancel_rx); return; } }; @@ -335,7 +353,7 @@ pub fn spawn_indexer( } if target_len > 0 { let percent = (chunk_offset as f64 / target_len as f64) * 100.0; - let _ = tx.send(IndexerMessage::Progress { + let _ = tx.try_send(IndexerMessage::Progress { generation, percent, lines_scanned: newline_count as u64, @@ -386,18 +404,18 @@ pub fn spawn_indexer( Ok(_) | Err(_) => None, }, Err(e) => { - let _ = tx.send(IndexerMessage::Error { + send_cancelable(&tx, IndexerMessage::Error { generation, message: e.to_string(), - }); + }, &cancel_rx); return; } }, Err(e) => { - let _ = tx.send(IndexerMessage::Error { + send_cancelable(&tx, IndexerMessage::Error { generation, message: e.to_string(), - }); + }, &cancel_rx); return; } } @@ -416,11 +434,11 @@ pub fn spawn_indexer( None }; - let _ = tx.send(IndexerMessage::Complete { + send_cancelable(&tx, IndexerMessage::Complete { generation, reader, visual_height_index, - }); + }, &cancel_rx); }); rx @@ -480,7 +498,7 @@ pub fn spawn_visual_height_rebuild( let index = VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width); - let _ = tx.send(VisualHeightRebuildResult { generation, index }); + send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx); }); rx @@ -777,7 +795,7 @@ impl ProgressiveFileReader { pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) { if let Some(tx) = self.vh_rebuild_cancel_tx.take() { - let _ = tx.send(()); + let _ = tx.try_send(()); } let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); @@ -832,10 +850,10 @@ impl ProgressiveFileReader { impl Drop for ProgressiveFileReader { fn drop(&mut self) { if let Some(tx) = &self.cancel_tx { - let _ = tx.send(()); + let _ = tx.try_send(()); } if let Some(tx) = self.vh_rebuild_cancel_tx.take() { - let _ = tx.send(()); + let _ = tx.try_send(()); } } } @@ -1509,4 +1527,103 @@ mod tests { Ok(_) => panic!("should have been discarded due to line count mismatch"), } } + + #[test] + fn test_send_cancelable_delivers_on_empty_channel() { + let (tx, rx) = crossbeam_channel::bounded(2); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + send_cancelable(&tx, 42, &cancel_rx); + + assert_eq!(rx.try_recv(), Ok(42)); + } + + #[test] + fn test_send_cancelable_aborts_on_cancel() { + let (tx, rx) = crossbeam_channel::bounded(1); + let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + tx.send("filler").unwrap(); + + let handle = std::thread::spawn(move || { + send_cancelable(&tx, "important", &cancel_rx); + }); + + cancel_tx.send(()).unwrap(); + handle.join().unwrap(); + + assert_eq!(rx.try_recv(), Ok("filler")); + } + + #[test] + fn test_send_cancelable_drains_when_room_available() { + let (tx, rx) = crossbeam_channel::bounded(1); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + tx.send("first").unwrap(); + + let rx_clone = rx.clone(); + let handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(50)); + let _ = rx_clone.try_recv(); + }); + + send_cancelable(&tx, "second", &cancel_rx); + handle.join().unwrap(); + + assert_eq!(rx.try_recv(), Ok("second")); + } + + #[test] + fn test_progress_try_send_does_not_block_full_channel() { + let mut content = Vec::new(); + for i in 0..50_000 { + writeln!(content, "line number {:08}", i).unwrap(); + } + let f = create_temp_file(&content); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + let mut got_complete = false; + let timeout = std::time::Duration::from_secs(15); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + match rx.recv_timeout(std::time::Duration::from_secs(1)) { + Ok(IndexerMessage::Progress { .. }) => {} + Ok(IndexerMessage::Complete { .. }) => { + got_complete = true; + break; + } + Ok(IndexerMessage::Error { message, .. }) => { + panic!("unexpected error: {}", message); + } + Err(e) => panic!("recv error: {:?}", e), + } + } + assert!(got_complete, "indexer should complete even when Progress fills channel"); + } + + #[test] + fn test_indexer_cancel_with_full_channel() { + let mut content = Vec::new(); + for i in 0..500_000 { + writeln!(content, "line number {:08}", i).unwrap(); + } + let f = create_temp_file(&content); + let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + cancel_tx.send(()).unwrap(); + + let result = rx.recv_timeout(std::time::Duration::from_secs(5)); + match result { + Err(crossbeam_channel::RecvTimeoutError::Timeout) + | Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {} + Ok(IndexerMessage::Complete { .. }) => {} + Ok(IndexerMessage::Error { .. }) => {} + Ok(IndexerMessage::Progress { .. }) => {} + } + } }