Compare commits
3 Commits
d37ed6df68
...
0d88e933e6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d88e933e6 | ||
|
|
420b853cb9 | ||
|
|
7852e92ecc |
@@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex;
|
||||
use crate::io::line_sampler::sample_line_count;
|
||||
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
|
||||
|
||||
// ─── Cancel-aware channel helpers ────────────────────────────────────────────
|
||||
|
||||
/// Send a message on `tx`, but abort if `cancel_rx` fires first.
|
||||
///
|
||||
/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead
|
||||
/// of busy-looping. Used for terminal messages (Complete / Error) that must
|
||||
/// not be silently dropped while the receiver is still alive.
|
||||
fn send_cancelable<T>(
|
||||
tx: &crossbeam_channel::Sender<T>,
|
||||
msg: T,
|
||||
cancel_rx: &crossbeam_channel::Receiver<()>,
|
||||
) {
|
||||
crossbeam_channel::select! {
|
||||
send(tx, msg) -> _ => {}
|
||||
recv(cancel_rx) -> _ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── IndexerMessage ──────────────────────────────────────────────────────────
|
||||
|
||||
pub enum IndexerMessage {
|
||||
@@ -270,20 +288,20 @@ pub fn spawn_indexer(
|
||||
let file = match std::fs::File::open(&path) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let target_len = match file.metadata() {
|
||||
Ok(m) => m.len(),
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -300,10 +318,10 @@ pub fn spawn_indexer(
|
||||
let buf = match buf_reader.fill_buf() {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -335,7 +353,7 @@ pub fn spawn_indexer(
|
||||
}
|
||||
if target_len > 0 {
|
||||
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
|
||||
let _ = tx.send(IndexerMessage::Progress {
|
||||
let _ = tx.try_send(IndexerMessage::Progress {
|
||||
generation,
|
||||
percent,
|
||||
lines_scanned: newline_count as u64,
|
||||
@@ -386,18 +404,18 @@ pub fn spawn_indexer(
|
||||
Ok(_) | Err(_) => None,
|
||||
},
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -416,11 +434,11 @@ pub fn spawn_indexer(
|
||||
None
|
||||
};
|
||||
|
||||
let _ = tx.send(IndexerMessage::Complete {
|
||||
send_cancelable(&tx, IndexerMessage::Complete {
|
||||
generation,
|
||||
reader,
|
||||
visual_height_index,
|
||||
});
|
||||
}, &cancel_rx);
|
||||
});
|
||||
|
||||
rx
|
||||
@@ -480,7 +498,7 @@ pub fn spawn_visual_height_rebuild(
|
||||
let index =
|
||||
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
|
||||
|
||||
let _ = tx.send(VisualHeightRebuildResult { generation, index });
|
||||
send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx);
|
||||
});
|
||||
|
||||
rx
|
||||
@@ -777,7 +795,7 @@ impl ProgressiveFileReader {
|
||||
|
||||
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
|
||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
@@ -832,10 +850,10 @@ impl ProgressiveFileReader {
|
||||
impl Drop for ProgressiveFileReader {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tx) = &self.cancel_tx {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1509,4 +1527,103 @@ mod tests {
|
||||
Ok(_) => panic!("should have been discarded due to line count mismatch"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_delivers_on_empty_channel() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(2);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
send_cancelable(&tx, 42, &cancel_rx);
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_aborts_on_cancel() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
tx.send("filler").unwrap();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
send_cancelable(&tx, "important", &cancel_rx);
|
||||
});
|
||||
|
||||
cancel_tx.send(()).unwrap();
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok("filler"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_drains_when_room_available() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
tx.send("first").unwrap();
|
||||
|
||||
let rx_clone = rx.clone();
|
||||
let handle = std::thread::spawn(move || {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
let _ = rx_clone.try_recv();
|
||||
});
|
||||
|
||||
send_cancelable(&tx, "second", &cancel_rx);
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok("second"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_progress_try_send_does_not_block_full_channel() {
|
||||
let mut content = Vec::new();
|
||||
for i in 0..50_000 {
|
||||
writeln!(content, "line number {:08}", i).unwrap();
|
||||
}
|
||||
let f = create_temp_file(&content);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||
|
||||
let mut got_complete = false;
|
||||
let timeout = std::time::Duration::from_secs(15);
|
||||
let start = std::time::Instant::now();
|
||||
while start.elapsed() < timeout {
|
||||
match rx.recv_timeout(std::time::Duration::from_secs(1)) {
|
||||
Ok(IndexerMessage::Progress { .. }) => {}
|
||||
Ok(IndexerMessage::Complete { .. }) => {
|
||||
got_complete = true;
|
||||
break;
|
||||
}
|
||||
Ok(IndexerMessage::Error { message, .. }) => {
|
||||
panic!("unexpected error: {}", message);
|
||||
}
|
||||
Err(e) => panic!("recv error: {:?}", e),
|
||||
}
|
||||
}
|
||||
assert!(got_complete, "indexer should complete even when Progress fills channel");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_indexer_cancel_with_full_channel() {
|
||||
let mut content = Vec::new();
|
||||
for i in 0..500_000 {
|
||||
writeln!(content, "line number {:08}", i).unwrap();
|
||||
}
|
||||
let f = create_temp_file(&content);
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||
|
||||
cancel_tx.send(()).unwrap();
|
||||
|
||||
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
|
||||
match result {
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout)
|
||||
| Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
|
||||
Ok(IndexerMessage::Complete { .. }) => {}
|
||||
Ok(IndexerMessage::Error { .. }) => {}
|
||||
Ok(IndexerMessage::Progress { .. }) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ pub enum FileEvent {
|
||||
Truncated { new_size: u64 },
|
||||
Rotated { new_inode: u64 },
|
||||
Removed,
|
||||
WatcherError { message: String },
|
||||
}
|
||||
|
||||
// ─── get_inode ──────────────────────────────────────────────────────────────
|
||||
@@ -34,6 +35,43 @@ struct WatchState {
|
||||
last_inode: u64,
|
||||
}
|
||||
|
||||
fn process_event(event: Event, watch_path: &Path, state: &mut WatchState) -> Option<FileEvent> {
|
||||
if !event.paths.iter().any(|p| p == watch_path) {
|
||||
return None;
|
||||
}
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
EventKind::Remove(_) => return Some(FileEvent::Removed),
|
||||
_ => return None,
|
||||
}
|
||||
|
||||
let current_inode = get_inode(watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
if current_inode != 0 && state.last_inode != 0 && current_inode != state.last_inode {
|
||||
state.last_inode = current_inode;
|
||||
state.last_size = current_size;
|
||||
Some(FileEvent::Rotated {
|
||||
new_inode: current_inode,
|
||||
})
|
||||
} else if current_size > state.last_size {
|
||||
state.last_size = current_size;
|
||||
state.last_inode = current_inode;
|
||||
Some(FileEvent::Appended {
|
||||
new_size: current_size,
|
||||
})
|
||||
} else if current_size < state.last_size {
|
||||
state.last_size = current_size;
|
||||
state.last_inode = current_inode;
|
||||
Some(FileEvent::Truncated {
|
||||
new_size: current_size,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// ─── FileWatcher ────────────────────────────────────────────────────────────
|
||||
pub struct FileWatcher {
|
||||
rx: Receiver<FileEvent>,
|
||||
@@ -56,50 +94,22 @@ impl FileWatcher {
|
||||
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
|
||||
let event = match res {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
Err(error) => {
|
||||
let _ = tx.try_send(FileEvent::WatcherError {
|
||||
message: error.to_string(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
EventKind::Remove(_) => {
|
||||
let _ = tx.try_send(FileEvent::Removed);
|
||||
return;
|
||||
}
|
||||
_ => return,
|
||||
}
|
||||
|
||||
if !event.paths.iter().any(|p| p == &watch_path) {
|
||||
return;
|
||||
}
|
||||
|
||||
let current_inode = get_inode(&watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
let mut st = state.lock().unwrap_or_else(|poison| {
|
||||
// Recover from poisoned mutex — state only tracks last_size
|
||||
// and last_inode for event dedup. Stale values at worst
|
||||
// cause a duplicate event, which is harmless.
|
||||
poison.into_inner()
|
||||
});
|
||||
|
||||
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode {
|
||||
let _ = tx.try_send(FileEvent::Rotated {
|
||||
new_inode: current_inode,
|
||||
});
|
||||
st.last_inode = current_inode;
|
||||
st.last_size = current_size;
|
||||
} else if current_size > st.last_size {
|
||||
let _ = tx.try_send(FileEvent::Appended {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
} else if current_size < st.last_size {
|
||||
let _ = tx.try_send(FileEvent::Truncated {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
if let Some(fe) = process_event(event, &watch_path, &mut st) {
|
||||
let _ = tx.try_send(fe);
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -144,6 +154,56 @@ mod tests {
|
||||
events
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_wrong_path_ignored() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let watched = dir.path().join("watched.log");
|
||||
let other = dir.path().join("other.log");
|
||||
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||
std::fs::write(&other, b"other\n").expect("write other");
|
||||
|
||||
let mut state = WatchState {
|
||||
last_size: 6,
|
||||
last_inode: get_inode(&watched).unwrap_or(0),
|
||||
};
|
||||
|
||||
let event = Event {
|
||||
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||
paths: vec![other.clone()],
|
||||
attrs: Default::default(),
|
||||
};
|
||||
|
||||
let result = process_event(event, &watched, &mut state);
|
||||
assert_eq!(
|
||||
result, None,
|
||||
"Remove for non-watched path should be ignored"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_correct_path_emits_removed() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let watched = dir.path().join("watched.log");
|
||||
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||
|
||||
let mut state = WatchState {
|
||||
last_size: 6,
|
||||
last_inode: get_inode(&watched).unwrap_or(0),
|
||||
};
|
||||
|
||||
let event = Event {
|
||||
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||
paths: vec![watched.clone()],
|
||||
attrs: Default::default(),
|
||||
};
|
||||
|
||||
let result = process_event(event, &watched, &mut state);
|
||||
assert!(
|
||||
matches!(result, Some(FileEvent::Removed)),
|
||||
"Remove for watched path should emit Removed"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watcher_append() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
@@ -232,5 +292,19 @@ mod tests {
|
||||
|
||||
let d = FileEvent::Rotated { new_inode: 42 };
|
||||
assert_ne!(a, d);
|
||||
|
||||
let e1 = FileEvent::WatcherError {
|
||||
message: "io error".into(),
|
||||
};
|
||||
let e2 = FileEvent::WatcherError {
|
||||
message: "io error".into(),
|
||||
};
|
||||
assert_eq!(e1, e2);
|
||||
assert_ne!(
|
||||
e1,
|
||||
FileEvent::WatcherError {
|
||||
message: "other".into()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -857,6 +857,7 @@ impl App {
|
||||
FileEvent::Removed => {
|
||||
self.loading_state = AppLoadingState::Error("File has been deleted".into());
|
||||
}
|
||||
FileEvent::WatcherError { message: _ } => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user