Compare commits
3 Commits
d37ed6df68
...
0d88e933e6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d88e933e6 | ||
|
|
420b853cb9 | ||
|
|
7852e92ecc |
@@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex;
|
|||||||
use crate::io::line_sampler::sample_line_count;
|
use crate::io::line_sampler::sample_line_count;
|
||||||
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
|
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
|
||||||
|
|
||||||
|
// ─── Cancel-aware channel helpers ────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Send a message on `tx`, but abort if `cancel_rx` fires first.
|
||||||
|
///
|
||||||
|
/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead
|
||||||
|
/// of busy-looping. Used for terminal messages (Complete / Error) that must
|
||||||
|
/// not be silently dropped while the receiver is still alive.
|
||||||
|
fn send_cancelable<T>(
|
||||||
|
tx: &crossbeam_channel::Sender<T>,
|
||||||
|
msg: T,
|
||||||
|
cancel_rx: &crossbeam_channel::Receiver<()>,
|
||||||
|
) {
|
||||||
|
crossbeam_channel::select! {
|
||||||
|
send(tx, msg) -> _ => {}
|
||||||
|
recv(cancel_rx) -> _ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─── IndexerMessage ──────────────────────────────────────────────────────────
|
// ─── IndexerMessage ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
pub enum IndexerMessage {
|
pub enum IndexerMessage {
|
||||||
@@ -270,20 +288,20 @@ pub fn spawn_indexer(
|
|||||||
let file = match std::fs::File::open(&path) {
|
let file = match std::fs::File::open(&path) {
|
||||||
Ok(f) => f,
|
Ok(f) => f,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = tx.send(IndexerMessage::Error {
|
send_cancelable(&tx, IndexerMessage::Error {
|
||||||
generation,
|
generation,
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
});
|
}, &cancel_rx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let target_len = match file.metadata() {
|
let target_len = match file.metadata() {
|
||||||
Ok(m) => m.len(),
|
Ok(m) => m.len(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = tx.send(IndexerMessage::Error {
|
send_cancelable(&tx, IndexerMessage::Error {
|
||||||
generation,
|
generation,
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
});
|
}, &cancel_rx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -300,10 +318,10 @@ pub fn spawn_indexer(
|
|||||||
let buf = match buf_reader.fill_buf() {
|
let buf = match buf_reader.fill_buf() {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = tx.send(IndexerMessage::Error {
|
send_cancelable(&tx, IndexerMessage::Error {
|
||||||
generation,
|
generation,
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
});
|
}, &cancel_rx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -335,7 +353,7 @@ pub fn spawn_indexer(
|
|||||||
}
|
}
|
||||||
if target_len > 0 {
|
if target_len > 0 {
|
||||||
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
|
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
|
||||||
let _ = tx.send(IndexerMessage::Progress {
|
let _ = tx.try_send(IndexerMessage::Progress {
|
||||||
generation,
|
generation,
|
||||||
percent,
|
percent,
|
||||||
lines_scanned: newline_count as u64,
|
lines_scanned: newline_count as u64,
|
||||||
@@ -386,18 +404,18 @@ pub fn spawn_indexer(
|
|||||||
Ok(_) | Err(_) => None,
|
Ok(_) | Err(_) => None,
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = tx.send(IndexerMessage::Error {
|
send_cancelable(&tx, IndexerMessage::Error {
|
||||||
generation,
|
generation,
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
});
|
}, &cancel_rx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = tx.send(IndexerMessage::Error {
|
send_cancelable(&tx, IndexerMessage::Error {
|
||||||
generation,
|
generation,
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
});
|
}, &cancel_rx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -416,11 +434,11 @@ pub fn spawn_indexer(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = tx.send(IndexerMessage::Complete {
|
send_cancelable(&tx, IndexerMessage::Complete {
|
||||||
generation,
|
generation,
|
||||||
reader,
|
reader,
|
||||||
visual_height_index,
|
visual_height_index,
|
||||||
});
|
}, &cancel_rx);
|
||||||
});
|
});
|
||||||
|
|
||||||
rx
|
rx
|
||||||
@@ -480,7 +498,7 @@ pub fn spawn_visual_height_rebuild(
|
|||||||
let index =
|
let index =
|
||||||
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
|
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
|
||||||
|
|
||||||
let _ = tx.send(VisualHeightRebuildResult { generation, index });
|
send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx);
|
||||||
});
|
});
|
||||||
|
|
||||||
rx
|
rx
|
||||||
@@ -777,7 +795,7 @@ impl ProgressiveFileReader {
|
|||||||
|
|
||||||
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
|
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
|
||||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||||
let _ = tx.send(());
|
let _ = tx.try_send(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
@@ -832,10 +850,10 @@ impl ProgressiveFileReader {
|
|||||||
impl Drop for ProgressiveFileReader {
|
impl Drop for ProgressiveFileReader {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(tx) = &self.cancel_tx {
|
if let Some(tx) = &self.cancel_tx {
|
||||||
let _ = tx.send(());
|
let _ = tx.try_send(());
|
||||||
}
|
}
|
||||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||||
let _ = tx.send(());
|
let _ = tx.try_send(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1509,4 +1527,103 @@ mod tests {
|
|||||||
Ok(_) => panic!("should have been discarded due to line count mismatch"),
|
Ok(_) => panic!("should have been discarded due to line count mismatch"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_send_cancelable_delivers_on_empty_channel() {
|
||||||
|
let (tx, rx) = crossbeam_channel::bounded(2);
|
||||||
|
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
|
|
||||||
|
send_cancelable(&tx, 42, &cancel_rx);
|
||||||
|
|
||||||
|
assert_eq!(rx.try_recv(), Ok(42));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_send_cancelable_aborts_on_cancel() {
|
||||||
|
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||||
|
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
|
|
||||||
|
tx.send("filler").unwrap();
|
||||||
|
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
send_cancelable(&tx, "important", &cancel_rx);
|
||||||
|
});
|
||||||
|
|
||||||
|
cancel_tx.send(()).unwrap();
|
||||||
|
handle.join().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(rx.try_recv(), Ok("filler"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_send_cancelable_drains_when_room_available() {
|
||||||
|
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||||
|
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
|
|
||||||
|
tx.send("first").unwrap();
|
||||||
|
|
||||||
|
let rx_clone = rx.clone();
|
||||||
|
let handle = std::thread::spawn(move || {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||||
|
let _ = rx_clone.try_recv();
|
||||||
|
});
|
||||||
|
|
||||||
|
send_cancelable(&tx, "second", &cancel_rx);
|
||||||
|
handle.join().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(rx.try_recv(), Ok("second"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_progress_try_send_does_not_block_full_channel() {
|
||||||
|
let mut content = Vec::new();
|
||||||
|
for i in 0..50_000 {
|
||||||
|
writeln!(content, "line number {:08}", i).unwrap();
|
||||||
|
}
|
||||||
|
let f = create_temp_file(&content);
|
||||||
|
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
|
|
||||||
|
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||||
|
|
||||||
|
let mut got_complete = false;
|
||||||
|
let timeout = std::time::Duration::from_secs(15);
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
while start.elapsed() < timeout {
|
||||||
|
match rx.recv_timeout(std::time::Duration::from_secs(1)) {
|
||||||
|
Ok(IndexerMessage::Progress { .. }) => {}
|
||||||
|
Ok(IndexerMessage::Complete { .. }) => {
|
||||||
|
got_complete = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(IndexerMessage::Error { message, .. }) => {
|
||||||
|
panic!("unexpected error: {}", message);
|
||||||
|
}
|
||||||
|
Err(e) => panic!("recv error: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(got_complete, "indexer should complete even when Progress fills channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_indexer_cancel_with_full_channel() {
|
||||||
|
let mut content = Vec::new();
|
||||||
|
for i in 0..500_000 {
|
||||||
|
writeln!(content, "line number {:08}", i).unwrap();
|
||||||
|
}
|
||||||
|
let f = create_temp_file(&content);
|
||||||
|
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||||
|
|
||||||
|
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||||
|
|
||||||
|
cancel_tx.send(()).unwrap();
|
||||||
|
|
||||||
|
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
|
||||||
|
match result {
|
||||||
|
Err(crossbeam_channel::RecvTimeoutError::Timeout)
|
||||||
|
| Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
|
||||||
|
Ok(IndexerMessage::Complete { .. }) => {}
|
||||||
|
Ok(IndexerMessage::Error { .. }) => {}
|
||||||
|
Ok(IndexerMessage::Progress { .. }) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ pub enum FileEvent {
|
|||||||
Truncated { new_size: u64 },
|
Truncated { new_size: u64 },
|
||||||
Rotated { new_inode: u64 },
|
Rotated { new_inode: u64 },
|
||||||
Removed,
|
Removed,
|
||||||
|
WatcherError { message: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── get_inode ──────────────────────────────────────────────────────────────
|
// ─── get_inode ──────────────────────────────────────────────────────────────
|
||||||
@@ -34,6 +35,43 @@ struct WatchState {
|
|||||||
last_inode: u64,
|
last_inode: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_event(event: Event, watch_path: &Path, state: &mut WatchState) -> Option<FileEvent> {
|
||||||
|
if !event.paths.iter().any(|p| p == watch_path) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
match event.kind {
|
||||||
|
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||||
|
EventKind::Remove(_) => return Some(FileEvent::Removed),
|
||||||
|
_ => return None,
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_inode = get_inode(watch_path).unwrap_or(0);
|
||||||
|
let current_size = std::fs::metadata(watch_path).map(|m| m.len()).unwrap_or(0);
|
||||||
|
|
||||||
|
if current_inode != 0 && state.last_inode != 0 && current_inode != state.last_inode {
|
||||||
|
state.last_inode = current_inode;
|
||||||
|
state.last_size = current_size;
|
||||||
|
Some(FileEvent::Rotated {
|
||||||
|
new_inode: current_inode,
|
||||||
|
})
|
||||||
|
} else if current_size > state.last_size {
|
||||||
|
state.last_size = current_size;
|
||||||
|
state.last_inode = current_inode;
|
||||||
|
Some(FileEvent::Appended {
|
||||||
|
new_size: current_size,
|
||||||
|
})
|
||||||
|
} else if current_size < state.last_size {
|
||||||
|
state.last_size = current_size;
|
||||||
|
state.last_inode = current_inode;
|
||||||
|
Some(FileEvent::Truncated {
|
||||||
|
new_size: current_size,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─── FileWatcher ────────────────────────────────────────────────────────────
|
// ─── FileWatcher ────────────────────────────────────────────────────────────
|
||||||
pub struct FileWatcher {
|
pub struct FileWatcher {
|
||||||
rx: Receiver<FileEvent>,
|
rx: Receiver<FileEvent>,
|
||||||
@@ -56,50 +94,22 @@ impl FileWatcher {
|
|||||||
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
|
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
|
||||||
let event = match res {
|
let event = match res {
|
||||||
Ok(e) => e,
|
Ok(e) => e,
|
||||||
Err(_) => return,
|
Err(error) => {
|
||||||
|
let _ = tx.try_send(FileEvent::WatcherError {
|
||||||
|
message: error.to_string(),
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match event.kind {
|
|
||||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
|
||||||
EventKind::Remove(_) => {
|
|
||||||
let _ = tx.try_send(FileEvent::Removed);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => return,
|
|
||||||
}
|
|
||||||
|
|
||||||
if !event.paths.iter().any(|p| p == &watch_path) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let current_inode = get_inode(&watch_path).unwrap_or(0);
|
|
||||||
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
|
|
||||||
|
|
||||||
let mut st = state.lock().unwrap_or_else(|poison| {
|
let mut st = state.lock().unwrap_or_else(|poison| {
|
||||||
// Recover from poisoned mutex — state only tracks last_size
|
// Recover from poisoned mutex — state only tracks last_size
|
||||||
// and last_inode for event dedup. Stale values at worst
|
// and last_inode for event dedup. Stale values at worst
|
||||||
// cause a duplicate event, which is harmless.
|
// cause a duplicate event, which is harmless.
|
||||||
poison.into_inner()
|
poison.into_inner()
|
||||||
});
|
});
|
||||||
|
if let Some(fe) = process_event(event, &watch_path, &mut st) {
|
||||||
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode {
|
let _ = tx.try_send(fe);
|
||||||
let _ = tx.try_send(FileEvent::Rotated {
|
|
||||||
new_inode: current_inode,
|
|
||||||
});
|
|
||||||
st.last_inode = current_inode;
|
|
||||||
st.last_size = current_size;
|
|
||||||
} else if current_size > st.last_size {
|
|
||||||
let _ = tx.try_send(FileEvent::Appended {
|
|
||||||
new_size: current_size,
|
|
||||||
});
|
|
||||||
st.last_size = current_size;
|
|
||||||
st.last_inode = current_inode;
|
|
||||||
} else if current_size < st.last_size {
|
|
||||||
let _ = tx.try_send(FileEvent::Truncated {
|
|
||||||
new_size: current_size,
|
|
||||||
});
|
|
||||||
st.last_size = current_size;
|
|
||||||
st.last_inode = current_inode;
|
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@@ -144,6 +154,56 @@ mod tests {
|
|||||||
events
|
events
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_remove_wrong_path_ignored() {
|
||||||
|
let dir = tempfile::tempdir().expect("create temp dir");
|
||||||
|
let watched = dir.path().join("watched.log");
|
||||||
|
let other = dir.path().join("other.log");
|
||||||
|
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||||
|
std::fs::write(&other, b"other\n").expect("write other");
|
||||||
|
|
||||||
|
let mut state = WatchState {
|
||||||
|
last_size: 6,
|
||||||
|
last_inode: get_inode(&watched).unwrap_or(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = Event {
|
||||||
|
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||||
|
paths: vec![other.clone()],
|
||||||
|
attrs: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = process_event(event, &watched, &mut state);
|
||||||
|
assert_eq!(
|
||||||
|
result, None,
|
||||||
|
"Remove for non-watched path should be ignored"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_remove_correct_path_emits_removed() {
|
||||||
|
let dir = tempfile::tempdir().expect("create temp dir");
|
||||||
|
let watched = dir.path().join("watched.log");
|
||||||
|
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||||
|
|
||||||
|
let mut state = WatchState {
|
||||||
|
last_size: 6,
|
||||||
|
last_inode: get_inode(&watched).unwrap_or(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = Event {
|
||||||
|
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||||
|
paths: vec![watched.clone()],
|
||||||
|
attrs: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = process_event(event, &watched, &mut state);
|
||||||
|
assert!(
|
||||||
|
matches!(result, Some(FileEvent::Removed)),
|
||||||
|
"Remove for watched path should emit Removed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_watcher_append() {
|
fn test_watcher_append() {
|
||||||
let dir = tempfile::tempdir().expect("create temp dir");
|
let dir = tempfile::tempdir().expect("create temp dir");
|
||||||
@@ -232,5 +292,19 @@ mod tests {
|
|||||||
|
|
||||||
let d = FileEvent::Rotated { new_inode: 42 };
|
let d = FileEvent::Rotated { new_inode: 42 };
|
||||||
assert_ne!(a, d);
|
assert_ne!(a, d);
|
||||||
|
|
||||||
|
let e1 = FileEvent::WatcherError {
|
||||||
|
message: "io error".into(),
|
||||||
|
};
|
||||||
|
let e2 = FileEvent::WatcherError {
|
||||||
|
message: "io error".into(),
|
||||||
|
};
|
||||||
|
assert_eq!(e1, e2);
|
||||||
|
assert_ne!(
|
||||||
|
e1,
|
||||||
|
FileEvent::WatcherError {
|
||||||
|
message: "other".into()
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -857,6 +857,7 @@ impl App {
|
|||||||
FileEvent::Removed => {
|
FileEvent::Removed => {
|
||||||
self.loading_state = AppLoadingState::Error("File has been deleted".into());
|
self.loading_state = AppLoadingState::Error("File has been deleted".into());
|
||||||
}
|
}
|
||||||
|
FileEvent::WatcherError { message: _ } => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user