3 Commits

Author SHA1 Message Date
dailz
0d88e933e6 fix(io): replace blocking channel sends with cancel-aware alternatives (closes #16)
Background worker threads used blocking tx.send() on bounded channels.
If the consumer stopped draining, threads hung forever with no way to
reach the cancel check. Drop-issued cancellation was ineffective.

Changes:
- Progress messages: tx.try_send() (discard if full, never blocks loop)
- Terminal messages (Complete/Error): new send_cancelable<T>() helper
  using crossbeam select! — sleeps efficiently until send succeeds or
  cancel arrives
- Drop cancellation: tx.try_send() — Drop must never block
- spawn_visual_height_rebuild: same fix for its bounded(1) channel
- 5 new tests covering full-channel + cancel scenarios
2026-06-09 15:14:37 +08:00
dailz
420b853cb9 fix(watcher): filter Remove events by path to prevent false removed reports (closes #15) 2026-06-09 13:18:23 +08:00
dailz
7852e92ecc fix(watcher): forward notify backend errors instead of silently discarding
Previously Err(_) => return in the notify callback silently dropped all
backend errors (inotify exhaustion, fs unmount, permission loss), leaving
the application unaware that file monitoring had stopped working.

Add FileEvent::WatcherError { message: String } variant to propagate
backend errors through the existing bounded channel. The TUI consumer
receives the event without disrupting the UI for transient errors.

Closes #14
2026-06-09 11:26:54 +08:00
3 changed files with 245 additions and 53 deletions

View File

@@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex;
use crate::io::line_sampler::sample_line_count; use crate::io::line_sampler::sample_line_count;
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN}; use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
// ─── Cancel-aware channel helpers ────────────────────────────────────────────
/// Send a message on `tx`, but abort if `cancel_rx` fires first.
///
/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead
/// of busy-looping. Used for terminal messages (Complete / Error) that must
/// not be silently dropped while the receiver is still alive.
fn send_cancelable<T>(
tx: &crossbeam_channel::Sender<T>,
msg: T,
cancel_rx: &crossbeam_channel::Receiver<()>,
) {
crossbeam_channel::select! {
send(tx, msg) -> _ => {}
recv(cancel_rx) -> _ => {}
}
}
// ─── IndexerMessage ────────────────────────────────────────────────────────── // ─── IndexerMessage ──────────────────────────────────────────────────────────
pub enum IndexerMessage { pub enum IndexerMessage {
@@ -270,20 +288,20 @@ pub fn spawn_indexer(
let file = match std::fs::File::open(&path) { let file = match std::fs::File::open(&path) {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { send_cancelable(&tx, IndexerMessage::Error {
generation, generation,
message: e.to_string(), message: e.to_string(),
}); }, &cancel_rx);
return; return;
} }
}; };
let target_len = match file.metadata() { let target_len = match file.metadata() {
Ok(m) => m.len(), Ok(m) => m.len(),
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { send_cancelable(&tx, IndexerMessage::Error {
generation, generation,
message: e.to_string(), message: e.to_string(),
}); }, &cancel_rx);
return; return;
} }
}; };
@@ -300,10 +318,10 @@ pub fn spawn_indexer(
let buf = match buf_reader.fill_buf() { let buf = match buf_reader.fill_buf() {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { send_cancelable(&tx, IndexerMessage::Error {
generation, generation,
message: e.to_string(), message: e.to_string(),
}); }, &cancel_rx);
return; return;
} }
}; };
@@ -335,7 +353,7 @@ pub fn spawn_indexer(
} }
if target_len > 0 { if target_len > 0 {
let percent = (chunk_offset as f64 / target_len as f64) * 100.0; let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
let _ = tx.send(IndexerMessage::Progress { let _ = tx.try_send(IndexerMessage::Progress {
generation, generation,
percent, percent,
lines_scanned: newline_count as u64, lines_scanned: newline_count as u64,
@@ -386,18 +404,18 @@ pub fn spawn_indexer(
Ok(_) | Err(_) => None, Ok(_) | Err(_) => None,
}, },
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { send_cancelable(&tx, IndexerMessage::Error {
generation, generation,
message: e.to_string(), message: e.to_string(),
}); }, &cancel_rx);
return; return;
} }
}, },
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { send_cancelable(&tx, IndexerMessage::Error {
generation, generation,
message: e.to_string(), message: e.to_string(),
}); }, &cancel_rx);
return; return;
} }
} }
@@ -416,11 +434,11 @@ pub fn spawn_indexer(
None None
}; };
let _ = tx.send(IndexerMessage::Complete { send_cancelable(&tx, IndexerMessage::Complete {
generation, generation,
reader, reader,
visual_height_index, visual_height_index,
}); }, &cancel_rx);
}); });
rx rx
@@ -480,7 +498,7 @@ pub fn spawn_visual_height_rebuild(
let index = let index =
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width); VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
let _ = tx.send(VisualHeightRebuildResult { generation, index }); send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx);
}); });
rx rx
@@ -777,7 +795,7 @@ impl ProgressiveFileReader {
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) { pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
if let Some(tx) = self.vh_rebuild_cancel_tx.take() { if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
let _ = tx.send(()); let _ = tx.try_send(());
} }
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
@@ -832,10 +850,10 @@ impl ProgressiveFileReader {
impl Drop for ProgressiveFileReader { impl Drop for ProgressiveFileReader {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(tx) = &self.cancel_tx { if let Some(tx) = &self.cancel_tx {
let _ = tx.send(()); let _ = tx.try_send(());
} }
if let Some(tx) = self.vh_rebuild_cancel_tx.take() { if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
let _ = tx.send(()); let _ = tx.try_send(());
} }
} }
} }
@@ -1509,4 +1527,103 @@ mod tests {
Ok(_) => panic!("should have been discarded due to line count mismatch"), Ok(_) => panic!("should have been discarded due to line count mismatch"),
} }
} }
#[test]
fn test_send_cancelable_delivers_on_empty_channel() {
let (tx, rx) = crossbeam_channel::bounded(2);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
send_cancelable(&tx, 42, &cancel_rx);
assert_eq!(rx.try_recv(), Ok(42));
}
#[test]
fn test_send_cancelable_aborts_on_cancel() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
tx.send("filler").unwrap();
let handle = std::thread::spawn(move || {
send_cancelable(&tx, "important", &cancel_rx);
});
cancel_tx.send(()).unwrap();
handle.join().unwrap();
assert_eq!(rx.try_recv(), Ok("filler"));
}
#[test]
fn test_send_cancelable_drains_when_room_available() {
let (tx, rx) = crossbeam_channel::bounded(1);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
tx.send("first").unwrap();
let rx_clone = rx.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50));
let _ = rx_clone.try_recv();
});
send_cancelable(&tx, "second", &cancel_rx);
handle.join().unwrap();
assert_eq!(rx.try_recv(), Ok("second"));
}
#[test]
fn test_progress_try_send_does_not_block_full_channel() {
let mut content = Vec::new();
for i in 0..50_000 {
writeln!(content, "line number {:08}", i).unwrap();
}
let f = create_temp_file(&content);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
let mut got_complete = false;
let timeout = std::time::Duration::from_secs(15);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
match rx.recv_timeout(std::time::Duration::from_secs(1)) {
Ok(IndexerMessage::Progress { .. }) => {}
Ok(IndexerMessage::Complete { .. }) => {
got_complete = true;
break;
}
Ok(IndexerMessage::Error { message, .. }) => {
panic!("unexpected error: {}", message);
}
Err(e) => panic!("recv error: {:?}", e),
}
}
assert!(got_complete, "indexer should complete even when Progress fills channel");
}
#[test]
fn test_indexer_cancel_with_full_channel() {
let mut content = Vec::new();
for i in 0..500_000 {
writeln!(content, "line number {:08}", i).unwrap();
}
let f = create_temp_file(&content);
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
cancel_tx.send(()).unwrap();
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
match result {
Err(crossbeam_channel::RecvTimeoutError::Timeout)
| Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
Ok(IndexerMessage::Complete { .. }) => {}
Ok(IndexerMessage::Error { .. }) => {}
Ok(IndexerMessage::Progress { .. }) => {}
}
}
} }

View File

@@ -13,6 +13,7 @@ pub enum FileEvent {
Truncated { new_size: u64 }, Truncated { new_size: u64 },
Rotated { new_inode: u64 }, Rotated { new_inode: u64 },
Removed, Removed,
WatcherError { message: String },
} }
// ─── get_inode ────────────────────────────────────────────────────────────── // ─── get_inode ──────────────────────────────────────────────────────────────
@@ -34,6 +35,43 @@ struct WatchState {
last_inode: u64, last_inode: u64,
} }
fn process_event(event: Event, watch_path: &Path, state: &mut WatchState) -> Option<FileEvent> {
if !event.paths.iter().any(|p| p == watch_path) {
return None;
}
match event.kind {
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
EventKind::Remove(_) => return Some(FileEvent::Removed),
_ => return None,
}
let current_inode = get_inode(watch_path).unwrap_or(0);
let current_size = std::fs::metadata(watch_path).map(|m| m.len()).unwrap_or(0);
if current_inode != 0 && state.last_inode != 0 && current_inode != state.last_inode {
state.last_inode = current_inode;
state.last_size = current_size;
Some(FileEvent::Rotated {
new_inode: current_inode,
})
} else if current_size > state.last_size {
state.last_size = current_size;
state.last_inode = current_inode;
Some(FileEvent::Appended {
new_size: current_size,
})
} else if current_size < state.last_size {
state.last_size = current_size;
state.last_inode = current_inode;
Some(FileEvent::Truncated {
new_size: current_size,
})
} else {
None
}
}
// ─── FileWatcher ──────────────────────────────────────────────────────────── // ─── FileWatcher ────────────────────────────────────────────────────────────
pub struct FileWatcher { pub struct FileWatcher {
rx: Receiver<FileEvent>, rx: Receiver<FileEvent>,
@@ -56,24 +94,13 @@ impl FileWatcher {
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| { notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
let event = match res { let event = match res {
Ok(e) => e, Ok(e) => e,
Err(_) => return, Err(error) => {
}; let _ = tx.try_send(FileEvent::WatcherError {
message: error.to_string(),
match event.kind { });
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
EventKind::Remove(_) => {
let _ = tx.try_send(FileEvent::Removed);
return; return;
} }
_ => return, };
}
if !event.paths.iter().any(|p| p == &watch_path) {
return;
}
let current_inode = get_inode(&watch_path).unwrap_or(0);
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
let mut st = state.lock().unwrap_or_else(|poison| { let mut st = state.lock().unwrap_or_else(|poison| {
// Recover from poisoned mutex — state only tracks last_size // Recover from poisoned mutex — state only tracks last_size
@@ -81,25 +108,8 @@ impl FileWatcher {
// cause a duplicate event, which is harmless. // cause a duplicate event, which is harmless.
poison.into_inner() poison.into_inner()
}); });
if let Some(fe) = process_event(event, &watch_path, &mut st) {
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode { let _ = tx.try_send(fe);
let _ = tx.try_send(FileEvent::Rotated {
new_inode: current_inode,
});
st.last_inode = current_inode;
st.last_size = current_size;
} else if current_size > st.last_size {
let _ = tx.try_send(FileEvent::Appended {
new_size: current_size,
});
st.last_size = current_size;
st.last_inode = current_inode;
} else if current_size < st.last_size {
let _ = tx.try_send(FileEvent::Truncated {
new_size: current_size,
});
st.last_size = current_size;
st.last_inode = current_inode;
} }
})?; })?;
@@ -144,6 +154,56 @@ mod tests {
events events
} }
#[test]
fn test_remove_wrong_path_ignored() {
let dir = tempfile::tempdir().expect("create temp dir");
let watched = dir.path().join("watched.log");
let other = dir.path().join("other.log");
std::fs::write(&watched, b"hello\n").expect("write watched");
std::fs::write(&other, b"other\n").expect("write other");
let mut state = WatchState {
last_size: 6,
last_inode: get_inode(&watched).unwrap_or(0),
};
let event = Event {
kind: EventKind::Remove(notify::event::RemoveKind::File),
paths: vec![other.clone()],
attrs: Default::default(),
};
let result = process_event(event, &watched, &mut state);
assert_eq!(
result, None,
"Remove for non-watched path should be ignored"
);
}
#[test]
fn test_remove_correct_path_emits_removed() {
let dir = tempfile::tempdir().expect("create temp dir");
let watched = dir.path().join("watched.log");
std::fs::write(&watched, b"hello\n").expect("write watched");
let mut state = WatchState {
last_size: 6,
last_inode: get_inode(&watched).unwrap_or(0),
};
let event = Event {
kind: EventKind::Remove(notify::event::RemoveKind::File),
paths: vec![watched.clone()],
attrs: Default::default(),
};
let result = process_event(event, &watched, &mut state);
assert!(
matches!(result, Some(FileEvent::Removed)),
"Remove for watched path should emit Removed"
);
}
#[test] #[test]
fn test_watcher_append() { fn test_watcher_append() {
let dir = tempfile::tempdir().expect("create temp dir"); let dir = tempfile::tempdir().expect("create temp dir");
@@ -232,5 +292,19 @@ mod tests {
let d = FileEvent::Rotated { new_inode: 42 }; let d = FileEvent::Rotated { new_inode: 42 };
assert_ne!(a, d); assert_ne!(a, d);
let e1 = FileEvent::WatcherError {
message: "io error".into(),
};
let e2 = FileEvent::WatcherError {
message: "io error".into(),
};
assert_eq!(e1, e2);
assert_ne!(
e1,
FileEvent::WatcherError {
message: "other".into()
}
);
} }
} }

View File

@@ -857,6 +857,7 @@ impl App {
FileEvent::Removed => { FileEvent::Removed => {
self.loading_state = AppLoadingState::Error("File has been deleted".into()); self.loading_state = AppLoadingState::Error("File has been deleted".into());
} }
FileEvent::WatcherError { message: _ } => {}
} }
} }
} }