fix(io): replace blocking channel sends with cancel-aware alternatives (closes #16)

Background worker threads used blocking tx.send() on bounded channels.
If the consumer stopped draining, threads hung forever with no way to
reach the cancel check. Drop-issued cancellation was ineffective.

Changes:
- Progress messages: tx.try_send() (discard if full, never blocks loop)
- Terminal messages (Complete/Error): new send_cancelable<T>() helper
  using crossbeam select! — sleeps efficiently until send succeeds or
  cancel arrives
- Drop cancellation: tx.try_send() — Drop must never block
- spawn_visual_height_rebuild: same fix for its bounded(1) channel
- 5 new tests covering full-channel + cancel scenarios
This commit is contained in:
dailz
2026-06-09 15:14:37 +08:00
parent 420b853cb9
commit 0d88e933e6

View File

@@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex;
use crate::io::line_sampler::sample_line_count;
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
// ─── Cancel-aware channel helpers ────────────────────────────────────────────
/// Send a message on `tx`, but abort if `cancel_rx` fires first.
///
/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead
/// of busy-looping. Used for terminal messages (Complete / Error) that must
/// not be silently dropped while the receiver is still alive.
fn send_cancelable<T>(
tx: &crossbeam_channel::Sender<T>,
msg: T,
cancel_rx: &crossbeam_channel::Receiver<()>,
) {
crossbeam_channel::select! {
send(tx, msg) -> _ => {}
recv(cancel_rx) -> _ => {}
}
}
// ─── IndexerMessage ──────────────────────────────────────────────────────────
pub enum IndexerMessage {
@@ -270,20 +288,20 @@ pub fn spawn_indexer(
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
send_cancelable(&tx, IndexerMessage::Error {
generation,
message: e.to_string(),
});
}, &cancel_rx);
return;
}
};
let target_len = match file.metadata() {
Ok(m) => m.len(),
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
send_cancelable(&tx, IndexerMessage::Error {
generation,
message: e.to_string(),
});
}, &cancel_rx);
return;
}
};
@@ -300,10 +318,10 @@ pub fn spawn_indexer(
let buf = match buf_reader.fill_buf() {
Ok(b) => b,
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
send_cancelable(&tx, IndexerMessage::Error {
generation,
message: e.to_string(),
});
}, &cancel_rx);
return;
}
};
@@ -335,7 +353,7 @@ pub fn spawn_indexer(
}
if target_len > 0 {
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
let _ = tx.send(IndexerMessage::Progress {
let _ = tx.try_send(IndexerMessage::Progress {
generation,
percent,
lines_scanned: newline_count as u64,
@@ -386,18 +404,18 @@ pub fn spawn_indexer(
Ok(_) | Err(_) => None,
},
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
send_cancelable(&tx, IndexerMessage::Error {
generation,
message: e.to_string(),
});
}, &cancel_rx);
return;
}
},
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
send_cancelable(&tx, IndexerMessage::Error {
generation,
message: e.to_string(),
});
}, &cancel_rx);
return;
}
}
@@ -416,11 +434,11 @@ pub fn spawn_indexer(
None
};
let _ = tx.send(IndexerMessage::Complete {
send_cancelable(&tx, IndexerMessage::Complete {
generation,
reader,
visual_height_index,
});
}, &cancel_rx);
});
rx
@@ -480,7 +498,7 @@ pub fn spawn_visual_height_rebuild(
let index =
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
let _ = tx.send(VisualHeightRebuildResult { generation, index });
send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx);
});
rx
@@ -777,7 +795,7 @@ impl ProgressiveFileReader {
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
let _ = tx.send(());
let _ = tx.try_send(());
}
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
@@ -832,10 +850,10 @@ impl ProgressiveFileReader {
impl Drop for ProgressiveFileReader {
fn drop(&mut self) {
if let Some(tx) = &self.cancel_tx {
let _ = tx.send(());
let _ = tx.try_send(());
}
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
let _ = tx.send(());
let _ = tx.try_send(());
}
}
}
@@ -1509,4 +1527,103 @@ mod tests {
Ok(_) => panic!("should have been discarded due to line count mismatch"),
}
}
#[test]
fn test_send_cancelable_delivers_on_empty_channel() {
let (tx, rx) = crossbeam_channel::bounded(2);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
send_cancelable(&tx, 42, &cancel_rx);
assert_eq!(rx.try_recv(), Ok(42));
}
#[test]
fn test_send_cancelable_aborts_on_cancel() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
tx.send("filler").unwrap();
let handle = std::thread::spawn(move || {
send_cancelable(&tx, "important", &cancel_rx);
});
cancel_tx.send(()).unwrap();
handle.join().unwrap();
assert_eq!(rx.try_recv(), Ok("filler"));
}
#[test]
fn test_send_cancelable_drains_when_room_available() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
tx.send("first").unwrap();
let rx_clone = rx.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50));
let _ = rx_clone.try_recv();
});
send_cancelable(&tx, "second", &cancel_rx);
handle.join().unwrap();
assert_eq!(rx.try_recv(), Ok("second"));
}
#[test]
fn test_progress_try_send_does_not_block_full_channel() {
let mut content = Vec::new();
for i in 0..50_000 {
writeln!(content, "line number {:08}", i).unwrap();
}
let f = create_temp_file(&content);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
let mut got_complete = false;
let timeout = std::time::Duration::from_secs(15);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
match rx.recv_timeout(std::time::Duration::from_secs(1)) {
Ok(IndexerMessage::Progress { .. }) => {}
Ok(IndexerMessage::Complete { .. }) => {
got_complete = true;
break;
}
Ok(IndexerMessage::Error { message, .. }) => {
panic!("unexpected error: {}", message);
}
Err(e) => panic!("recv error: {:?}", e),
}
}
assert!(got_complete, "indexer should complete even when Progress fills channel");
}
#[test]
fn test_indexer_cancel_with_full_channel() {
let mut content = Vec::new();
for i in 0..500_000 {
writeln!(content, "line number {:08}", i).unwrap();
}
let f = create_temp_file(&content);
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
cancel_tx.send(()).unwrap();
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
match result {
Err(crossbeam_channel::RecvTimeoutError::Timeout)
| Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
Ok(IndexerMessage::Complete { .. }) => {}
Ok(IndexerMessage::Error { .. }) => {}
Ok(IndexerMessage::Progress { .. }) => {}
}
}
}