Compare commits
12 Commits
fix/m23-si
...
0d88e933e6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d88e933e6 | ||
|
|
420b853cb9 | ||
|
|
7852e92ecc | ||
|
|
d37ed6df68 | ||
|
|
b58d66f2aa | ||
|
|
d4679a7543 | ||
|
|
8844e58cb4 | ||
|
|
6a2f8ecb66 | ||
|
|
f6081b9fe9 | ||
|
|
97a2c6a925 | ||
|
|
e6e0e2cc90 | ||
|
|
ffaf462bae |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2333,6 +2333,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"toml",
|
||||
"unicode-width",
|
||||
"xxhash-rust",
|
||||
]
|
||||
|
||||
|
||||
@@ -27,3 +27,4 @@ textwrap = "0.16"
|
||||
tempfile = "3"
|
||||
xxhash-rust = { version = "0.8", features = ["xxh3"] }
|
||||
bincode = "1"
|
||||
unicode-width = "0.2"
|
||||
|
||||
@@ -83,11 +83,11 @@ pub fn generate_growable_file(dir: &Path) -> std::io::Result<PathBuf> {
|
||||
|
||||
/// Append `count` lines to the file
|
||||
pub fn append_lines(path: &Path, count: usize) -> std::io::Result<()> {
|
||||
let existing_lines = count_existing_lines(path)?;
|
||||
let mut file = BufWriter::with_capacity(
|
||||
64 * 1024,
|
||||
fs::OpenOptions::new().append(true).open(path)?,
|
||||
);
|
||||
let existing_lines = count_existing_lines(path).unwrap_or(0);
|
||||
for i in 0..count {
|
||||
writeln!(
|
||||
file,
|
||||
@@ -117,7 +117,12 @@ pub fn rotate_file(path: &Path) -> std::io::Result<PathBuf> {
|
||||
fn count_existing_lines(path: &Path) -> std::io::Result<u64> {
|
||||
let file = fs::File::open(path)?;
|
||||
let reader = BufReader::new(file);
|
||||
Ok(reader.lines().count() as u64)
|
||||
let mut count = 0u64;
|
||||
for line in reader.lines() {
|
||||
line?;
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -35,7 +35,7 @@ const HANDLER_NONE: u8 = 0;
|
||||
const HANDLER_DEFAULT: u8 = 1;
|
||||
const HANDLER_IGNORE: u8 = 2;
|
||||
const HANDLER_PLAIN: u8 = 3; // extern "C" fn(c_int)
|
||||
#[expect(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")]
|
||||
#[allow(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")]
|
||||
const HANDLER_SIGACTION: u8 = 4; // extern "C" fn(c_int, *mut siginfo_t, *mut c_void)
|
||||
|
||||
/// Old SIGBUS handler type — raw atomic, async-signal-safe to read.
|
||||
|
||||
@@ -143,8 +143,7 @@ pub fn format_report(results: &[BenchmarkResult]) -> String {
|
||||
report.push_str("| Test | Variant | RSS | Peak RSS | Page Faults |\n");
|
||||
report.push_str("|------|---------|-----|----------|-------------|\n");
|
||||
|
||||
let mut mem_rows: Vec<&BenchmarkResult> =
|
||||
category_results.iter().copied().collect();
|
||||
let mut mem_rows: Vec<&BenchmarkResult> = category_results.to_vec();
|
||||
mem_rows.sort_by(|a, b| {
|
||||
(&a.test_name, &a.backend, &a.variant)
|
||||
.cmp(&(&b.test_name, &b.backend, &b.variant))
|
||||
@@ -163,7 +162,10 @@ pub fn format_report(results: &[BenchmarkResult]) -> String {
|
||||
report.push('\n');
|
||||
}
|
||||
|
||||
let mut extras: Vec<(String, String, Vec<(String, f64)>)> = category_results
|
||||
type ExtraEntry = (String, f64);
|
||||
type ExtraGroup = (String, String, Vec<ExtraEntry>);
|
||||
|
||||
let mut extras: Vec<ExtraGroup> = category_results
|
||||
.iter()
|
||||
.filter(|r| !r.extra.is_empty())
|
||||
.map(|r| {
|
||||
|
||||
@@ -101,11 +101,15 @@ fn bench_scroll_rss<B: FileReaderBackend>(
|
||||
let sample_interval = 100_000;
|
||||
let max_lines = if config.quick_mode { 100_000 } else { total };
|
||||
|
||||
let upper = max_lines.min(total);
|
||||
let mut rss_samples = Vec::new();
|
||||
let mut hwm_samples = Vec::new();
|
||||
let mut lines_read = 0usize;
|
||||
|
||||
for i in (0..max_lines).step_by(sample_interval) {
|
||||
let _ = reader.get_line(i);
|
||||
for i in (0..upper).step_by(sample_interval) {
|
||||
if reader.get_line(i).is_some() {
|
||||
lines_read += 1;
|
||||
}
|
||||
let rss = MetricsCollector::read_rss();
|
||||
rss_samples.push(rss.vm_rss_kb);
|
||||
hwm_samples.push(rss.vm_hwm_kb);
|
||||
@@ -124,7 +128,7 @@ fn bench_scroll_rss<B: FileReaderBackend>(
|
||||
"max_hwm_kb".into(),
|
||||
hwm_samples.iter().copied().fold(0u64, u64::max) as f64,
|
||||
);
|
||||
extra.insert("lines_read".into(), max_lines.min(total) as f64);
|
||||
extra.insert("lines_read".into(), lines_read as f64);
|
||||
|
||||
reader.close();
|
||||
|
||||
|
||||
@@ -24,13 +24,14 @@ fn bench_truncate_safety_mmap(
|
||||
dir: &std::path::Path,
|
||||
) -> Vec<BenchmarkResult> {
|
||||
let sub_dir = dir.join("trunc_mmap");
|
||||
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file");
|
||||
let iterations: usize = if _config.quick_mode { 3 } else { 10 };
|
||||
|
||||
let mut latencies = Vec::with_capacity(iterations);
|
||||
let mut sigbus_detected = 0usize;
|
||||
|
||||
for _ in 0..iterations {
|
||||
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file");
|
||||
|
||||
mmap_reader::reset_sigbus_flag();
|
||||
|
||||
let reader = MmapReaderPlain::open(&path).expect("Failed to open file");
|
||||
@@ -48,12 +49,6 @@ fn bench_truncate_safety_mmap(
|
||||
sigbus_detected += 1;
|
||||
}
|
||||
reader.close();
|
||||
|
||||
let mut f = std::fs::File::create(&path).expect("Failed to recreate file");
|
||||
use std::io::Write;
|
||||
for i in 0..1000u64 {
|
||||
writeln!(f, "restored line {i}").unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let rss = MetricsCollector::read_rss();
|
||||
@@ -82,13 +77,14 @@ fn bench_truncate_safety_pread(
|
||||
dir: &std::path::Path,
|
||||
) -> Vec<BenchmarkResult> {
|
||||
let sub_dir = dir.join("trunc_pread");
|
||||
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file");
|
||||
let iterations: usize = if _config.quick_mode { 3 } else { 10 };
|
||||
|
||||
let mut latencies = Vec::with_capacity(iterations);
|
||||
let mut error_count = 0usize;
|
||||
|
||||
for _ in 0..iterations {
|
||||
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file");
|
||||
|
||||
let reader = PreadReaderPlain::open(&path).expect("Failed to open file");
|
||||
let original_size = reader.file_size();
|
||||
|
||||
@@ -104,12 +100,6 @@ fn bench_truncate_safety_pread(
|
||||
error_count += 1;
|
||||
}
|
||||
reader.close();
|
||||
|
||||
let mut f = std::fs::File::create(&path).expect("Failed to recreate file");
|
||||
use std::io::Write;
|
||||
for i in 0..1000u64 {
|
||||
writeln!(f, "restored line {i}").unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let rss = MetricsCollector::read_rss();
|
||||
|
||||
@@ -17,6 +17,7 @@ memmap2.workspace = true
|
||||
directories.workspace = true
|
||||
xxhash-rust.workspace = true
|
||||
bincode.workspace = true
|
||||
unicode-width.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
insta.workspace = true
|
||||
|
||||
@@ -18,4 +18,5 @@ pub mod cache_util;
|
||||
pub mod index_cache;
|
||||
pub mod line_sampler;
|
||||
pub mod progressive_reader;
|
||||
pub mod read_cache;
|
||||
pub mod wrap;
|
||||
|
||||
@@ -10,6 +10,24 @@ use crate::io::line_index::LineIndex;
|
||||
use crate::io::line_sampler::sample_line_count;
|
||||
use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN};
|
||||
|
||||
// ─── Cancel-aware channel helpers ────────────────────────────────────────────
|
||||
|
||||
/// Send a message on `tx`, but abort if `cancel_rx` fires first.
|
||||
///
|
||||
/// Uses `crossbeam_channel::select!` so the thread sleeps efficiently instead
|
||||
/// of busy-looping. Used for terminal messages (Complete / Error) that must
|
||||
/// not be silently dropped while the receiver is still alive.
|
||||
fn send_cancelable<T>(
|
||||
tx: &crossbeam_channel::Sender<T>,
|
||||
msg: T,
|
||||
cancel_rx: &crossbeam_channel::Receiver<()>,
|
||||
) {
|
||||
crossbeam_channel::select! {
|
||||
send(tx, msg) -> _ => {}
|
||||
recv(cancel_rx) -> _ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── IndexerMessage ──────────────────────────────────────────────────────────
|
||||
|
||||
pub enum IndexerMessage {
|
||||
@@ -102,7 +120,7 @@ impl VisualHeightIndex {
|
||||
}
|
||||
}
|
||||
|
||||
fn with_params(mut self, json_format: bool, terminal_width: usize) -> Self {
|
||||
pub fn with_params(mut self, json_format: bool, terminal_width: usize) -> Self {
|
||||
self.json_format = json_format;
|
||||
self.terminal_width = terminal_width;
|
||||
self
|
||||
@@ -159,6 +177,31 @@ impl VisualHeightIndex {
|
||||
self.total_visual_rows += h as u64;
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the visual height of the last logical line. O(1).
|
||||
///
|
||||
/// Must be called **before** `extend_from_heights` so that the last line
|
||||
/// index still refers to the pre-extension line.
|
||||
pub fn replace_last_line_height(&mut self, new_height: usize) {
|
||||
let n = self.prefix_sums.len();
|
||||
if n < 2 {
|
||||
return;
|
||||
}
|
||||
let last_line = n - 2;
|
||||
let old_height = self.prefix_sums[last_line + 1] - self.prefix_sums[last_line];
|
||||
let new_height = new_height as u64;
|
||||
if new_height == old_height {
|
||||
return;
|
||||
}
|
||||
let delta = new_height.abs_diff(old_height);
|
||||
if new_height > old_height {
|
||||
self.prefix_sums[last_line + 1] += delta;
|
||||
self.total_visual_rows += delta;
|
||||
} else {
|
||||
self.prefix_sums[last_line + 1] -= delta;
|
||||
self.total_visual_rows -= delta;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── VisualHeightRebuildResult ────────────────────────────────────────────────
|
||||
@@ -245,20 +288,20 @@ pub fn spawn_indexer(
|
||||
let file = match std::fs::File::open(&path) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let target_len = match file.metadata() {
|
||||
Ok(m) => m.len(),
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -275,10 +318,10 @@ pub fn spawn_indexer(
|
||||
let buf = match buf_reader.fill_buf() {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -310,7 +353,7 @@ pub fn spawn_indexer(
|
||||
}
|
||||
if target_len > 0 {
|
||||
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
|
||||
let _ = tx.send(IndexerMessage::Progress {
|
||||
let _ = tx.try_send(IndexerMessage::Progress {
|
||||
generation,
|
||||
percent,
|
||||
lines_scanned: newline_count as u64,
|
||||
@@ -361,18 +404,18 @@ pub fn spawn_indexer(
|
||||
Ok(_) | Err(_) => None,
|
||||
},
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let _ = tx.send(IndexerMessage::Error {
|
||||
send_cancelable(&tx, IndexerMessage::Error {
|
||||
generation,
|
||||
message: e.to_string(),
|
||||
});
|
||||
}, &cancel_rx);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -391,11 +434,11 @@ pub fn spawn_indexer(
|
||||
None
|
||||
};
|
||||
|
||||
let _ = tx.send(IndexerMessage::Complete {
|
||||
send_cancelable(&tx, IndexerMessage::Complete {
|
||||
generation,
|
||||
reader,
|
||||
visual_height_index,
|
||||
});
|
||||
}, &cancel_rx);
|
||||
});
|
||||
|
||||
rx
|
||||
@@ -455,7 +498,7 @@ pub fn spawn_visual_height_rebuild(
|
||||
let index =
|
||||
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
|
||||
|
||||
let _ = tx.send(VisualHeightRebuildResult { generation, index });
|
||||
send_cancelable(&tx, VisualHeightRebuildResult { generation, index }, &cancel_rx);
|
||||
});
|
||||
|
||||
rx
|
||||
@@ -752,7 +795,7 @@ impl ProgressiveFileReader {
|
||||
|
||||
pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) {
|
||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
@@ -807,10 +850,10 @@ impl ProgressiveFileReader {
|
||||
impl Drop for ProgressiveFileReader {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tx) = &self.cancel_tx {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
if let Some(tx) = self.vh_rebuild_cancel_tx.take() {
|
||||
let _ = tx.send(());
|
||||
let _ = tx.try_send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1339,6 +1382,85 @@ mod tests {
|
||||
assert_eq!(idx.total_visual_rows(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_increase() {
|
||||
let heights = [2, 3];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
assert_eq!(idx.total_visual_rows(), 5);
|
||||
assert_eq!(idx.visual_height_of_line(1), 3);
|
||||
|
||||
idx.replace_last_line_height(7);
|
||||
|
||||
assert_eq!(idx.visual_height_of_line(0), 2);
|
||||
assert_eq!(idx.visual_height_of_line(1), 7);
|
||||
assert_eq!(idx.total_visual_rows(), 9);
|
||||
assert_eq!(idx.cursor_to_first_visual_row(0), 0);
|
||||
assert_eq!(idx.cursor_to_first_visual_row(1), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_decrease() {
|
||||
let heights = [2, 5];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
|
||||
idx.replace_last_line_height(1);
|
||||
|
||||
assert_eq!(idx.visual_height_of_line(0), 2);
|
||||
assert_eq!(idx.visual_height_of_line(1), 1);
|
||||
assert_eq!(idx.total_visual_rows(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_same_is_noop() {
|
||||
let heights = [2, 3];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
let total_before = idx.total_visual_rows();
|
||||
|
||||
idx.replace_last_line_height(3);
|
||||
|
||||
assert_eq!(idx.total_visual_rows(), total_before);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_then_extend() {
|
||||
let heights = [2, 1];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
assert_eq!(idx.total_visual_rows(), 3);
|
||||
|
||||
idx.replace_last_line_height(4);
|
||||
idx.extend_from_heights(&[3]);
|
||||
|
||||
assert_eq!(idx.line_count(), 3);
|
||||
assert_eq!(idx.visual_height_of_line(0), 2);
|
||||
assert_eq!(idx.visual_height_of_line(1), 4);
|
||||
assert_eq!(idx.visual_height_of_line(2), 3);
|
||||
assert_eq!(idx.total_visual_rows(), 9);
|
||||
assert_eq!(idx.cursor_to_first_visual_row(0), 0);
|
||||
assert_eq!(idx.cursor_to_first_visual_row(1), 2);
|
||||
assert_eq!(idx.cursor_to_first_visual_row(2), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_single_line() {
|
||||
let heights = [5];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
|
||||
idx.replace_last_line_height(2);
|
||||
|
||||
assert_eq!(idx.visual_height_of_line(0), 2);
|
||||
assert_eq!(idx.total_visual_rows(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace_last_line_height_empty_index() {
|
||||
let heights: [usize; 0] = [];
|
||||
let mut idx = VisualHeightIndex::build(&heights);
|
||||
|
||||
idx.replace_last_line_height(5);
|
||||
|
||||
assert_eq!(idx.total_visual_rows(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_spawn_indexer_file_truncated_during_scan() {
|
||||
let mut content = Vec::new();
|
||||
@@ -1405,4 +1527,103 @@ mod tests {
|
||||
Ok(_) => panic!("should have been discarded due to line count mismatch"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_delivers_on_empty_channel() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(2);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
send_cancelable(&tx, 42, &cancel_rx);
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_aborts_on_cancel() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
tx.send("filler").unwrap();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
send_cancelable(&tx, "important", &cancel_rx);
|
||||
});
|
||||
|
||||
cancel_tx.send(()).unwrap();
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok("filler"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_cancelable_drains_when_room_available() {
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
tx.send("first").unwrap();
|
||||
|
||||
let rx_clone = rx.clone();
|
||||
let handle = std::thread::spawn(move || {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
let _ = rx_clone.try_recv();
|
||||
});
|
||||
|
||||
send_cancelable(&tx, "second", &cancel_rx);
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(rx.try_recv(), Ok("second"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_progress_try_send_does_not_block_full_channel() {
|
||||
let mut content = Vec::new();
|
||||
for i in 0..50_000 {
|
||||
writeln!(content, "line number {:08}", i).unwrap();
|
||||
}
|
||||
let f = create_temp_file(&content);
|
||||
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||
|
||||
let mut got_complete = false;
|
||||
let timeout = std::time::Duration::from_secs(15);
|
||||
let start = std::time::Instant::now();
|
||||
while start.elapsed() < timeout {
|
||||
match rx.recv_timeout(std::time::Duration::from_secs(1)) {
|
||||
Ok(IndexerMessage::Progress { .. }) => {}
|
||||
Ok(IndexerMessage::Complete { .. }) => {
|
||||
got_complete = true;
|
||||
break;
|
||||
}
|
||||
Ok(IndexerMessage::Error { message, .. }) => {
|
||||
panic!("unexpected error: {}", message);
|
||||
}
|
||||
Err(e) => panic!("recv error: {:?}", e),
|
||||
}
|
||||
}
|
||||
assert!(got_complete, "indexer should complete even when Progress fills channel");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_indexer_cancel_with_full_channel() {
|
||||
let mut content = Vec::new();
|
||||
for i in 0..500_000 {
|
||||
writeln!(content, "line number {:08}", i).unwrap();
|
||||
}
|
||||
let f = create_temp_file(&content);
|
||||
let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
|
||||
|
||||
cancel_tx.send(()).unwrap();
|
||||
|
||||
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
|
||||
match result {
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout)
|
||||
| Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
|
||||
Ok(IndexerMessage::Complete { .. }) => {}
|
||||
Ok(IndexerMessage::Error { .. }) => {}
|
||||
Ok(IndexerMessage::Progress { .. }) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,9 +56,15 @@ impl LruReadCache {
|
||||
/// on a hit, or fills a cache slot on a miss. Cross-block reads go through
|
||||
/// the spill buffer and are not cached.
|
||||
pub fn get(&mut self, file: &File, offset: u64, len: usize) -> io::Result<&[u8]> {
|
||||
if len == 0 {
|
||||
return Ok(&[]);
|
||||
}
|
||||
|
||||
let aligned_key = offset & !(BLOCK_ALIGN as u64 - 1);
|
||||
let request_end = offset.saturating_add(len as u64);
|
||||
let block_end = aligned_key + BLOCK_ALIGN as u64;
|
||||
let request_end = offset.checked_add(len as u64).ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, "read range overflows u64")
|
||||
})?;
|
||||
let block_end = aligned_key.saturating_add(BLOCK_ALIGN as u64);
|
||||
|
||||
if request_end > block_end {
|
||||
self.spill_buf.resize(len, 0);
|
||||
@@ -74,7 +80,8 @@ impl LruReadCache {
|
||||
}
|
||||
|
||||
let hit_idx = self.slots.iter().position(|slot| {
|
||||
slot.block_offset == aligned_key && request_end <= slot.block_offset + slot.len as u64
|
||||
let slot_end = slot.block_offset.saturating_add(slot.len as u64);
|
||||
slot.len > 0 && slot.block_offset == aligned_key && request_end <= slot_end
|
||||
});
|
||||
|
||||
if let Some(idx) = hit_idx {
|
||||
@@ -96,8 +103,8 @@ impl LruReadCache {
|
||||
let slot = &mut self.slots[evict_idx];
|
||||
let bytes_read = file.read_at(&mut slot.buf, aligned_key)?;
|
||||
|
||||
// Note: get(file, 0, 0) on an empty file now returns Err (old code returned Ok(&[])).
|
||||
// No callers pass len == 0, so this is a safe semantic change.
|
||||
// Non-empty reads that return 0 are EOF. Zero-length reads are handled above
|
||||
// as a successful no-op.
|
||||
if bytes_read == 0 {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "read 0 bytes"));
|
||||
}
|
||||
@@ -107,7 +114,8 @@ impl LruReadCache {
|
||||
slot.last_access = self.tick;
|
||||
self.tick += 1;
|
||||
|
||||
if request_end > aligned_key + bytes_read as u64 {
|
||||
let bytes_end = aligned_key.saturating_add(bytes_read as u64);
|
||||
if request_end > bytes_end {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
|
||||
}
|
||||
|
||||
@@ -118,7 +126,9 @@ impl LruReadCache {
|
||||
/// Invalidate all cache slots and the spill buffer.
|
||||
pub fn clear(&mut self) {
|
||||
for slot in &mut self.slots {
|
||||
slot.block_offset = 0;
|
||||
slot.len = 0;
|
||||
slot.last_access = 0;
|
||||
}
|
||||
self.spill_len = 0;
|
||||
}
|
||||
@@ -314,9 +324,11 @@ mod tests {
|
||||
|
||||
cache.clear();
|
||||
|
||||
// All slots should have len == 0.
|
||||
// All slots should be fully reset.
|
||||
for slot in &cache.slots {
|
||||
assert_eq!(slot.block_offset, 0);
|
||||
assert_eq!(slot.len, 0);
|
||||
assert_eq!(slot.last_access, 0);
|
||||
}
|
||||
assert_eq!(cache.spill_len, 0);
|
||||
|
||||
@@ -432,4 +444,52 @@ mod tests {
|
||||
assert_eq!(&line2[..4090], &[b'B'; 4090]);
|
||||
assert_eq!(line2[4090], b'\n');
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zero_len_read_is_noop_on_fresh_cache() {
|
||||
let f = make_file(b"");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let result = cache.get(&file, 0, 0).unwrap();
|
||||
assert!(result.is_empty());
|
||||
assert_eq!(cache.tick, 0);
|
||||
assert!(cache.slots.iter().all(|s| s.len == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zero_len_read_is_noop_on_populated_cache() {
|
||||
let f = make_file(b"abc");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
cache.get(&file, 0, 1).unwrap();
|
||||
let tick_before = cache.tick;
|
||||
|
||||
let result = cache.get(&file, 0, 0).unwrap();
|
||||
assert!(result.is_empty());
|
||||
assert_eq!(cache.tick, tick_before);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zero_len_read_at_max_offset_is_ok() {
|
||||
let f = make_file(b"");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let result = cache.get(&file, u64::MAX, 0).unwrap();
|
||||
assert!(result.is_empty());
|
||||
assert_eq!(cache.tick, 0);
|
||||
assert!(cache.slots.iter().all(|s| s.len == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nonzero_read_range_overflow_returns_invalid_input() {
|
||||
let f = make_file(b"abc");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let err = cache.get(&file, u64::MAX, 1).unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,12 @@
|
||||
/// Lines exceeding this are returned as-is to avoid pathological cases.
|
||||
pub const MAX_WRAP_INPUT_LEN: usize = 10 * 1024 * 1024;
|
||||
|
||||
/// Split a line into chunks of exactly `width` characters (display columns).
|
||||
/// Split a line into chunks of exactly `width` display columns.
|
||||
/// For a log viewer, we want character-level wrapping, not word-level.
|
||||
/// Uses `unicode-width` for correct CJK/emoji/zero-width handling.
|
||||
pub fn wrap_line_chars(line: &str, width: usize) -> Vec<String> {
|
||||
use unicode_width::UnicodeWidthChar;
|
||||
|
||||
if width == 0 {
|
||||
return vec![String::new()];
|
||||
}
|
||||
@@ -15,7 +18,15 @@ pub fn wrap_line_chars(line: &str, width: usize) -> Vec<String> {
|
||||
let mut row = String::new();
|
||||
let mut col = 0;
|
||||
for ch in line.chars() {
|
||||
let w = if ch == '\t' { 4 } else { 1 };
|
||||
let w = if ch == '\t' {
|
||||
4
|
||||
} else if ch.is_control() {
|
||||
// Control characters (except tab): width 0, still pushed to preserve content.
|
||||
// Visible rendering is the caller's responsibility.
|
||||
0
|
||||
} else {
|
||||
ch.width().unwrap_or(0)
|
||||
};
|
||||
if col + w > width && !row.is_empty() {
|
||||
result.push(std::mem::take(&mut row));
|
||||
col = 0;
|
||||
@@ -132,4 +143,48 @@ mod tests {
|
||||
fn test_max_wrap_input_len_constant() {
|
||||
assert_eq!(MAX_WRAP_INPUT_LEN, 10 * 1024 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_cjk_chars() {
|
||||
let result = wrap_line_chars("你好", 3);
|
||||
assert_eq!(result, vec!["你", "好"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_cjk_ascii_mixed() {
|
||||
let result = wrap_line_chars("a你好", 4);
|
||||
assert_eq!(result, vec!["a你", "好"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_zero_width_char() {
|
||||
let result = wrap_line_chars("a\u{200B}b", 2);
|
||||
assert_eq!(result, vec!["a\u{200B}b"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_emoji() {
|
||||
let result = wrap_line_chars("😀a", 3);
|
||||
assert_eq!(result, vec!["😀a"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_emoji_exact_wrap() {
|
||||
let result = wrap_line_chars("😀a", 2);
|
||||
assert_eq!(result, vec!["😀", "a"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_combining_mark() {
|
||||
// Scalar-width wrapping: combining mark (width 0) stays with next base char,
|
||||
// not the preceding one, because the base char already triggered a flush.
|
||||
let result = wrap_line_chars("a\u{0301}b", 1);
|
||||
assert_eq!(result, vec!["a", "\u{0301}b"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrap_cjk_width_one() {
|
||||
let result = wrap_line_chars("你好", 1);
|
||||
assert_eq!(result, vec!["你", "好"]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ pub enum FileEvent {
|
||||
Truncated { new_size: u64 },
|
||||
Rotated { new_inode: u64 },
|
||||
Removed,
|
||||
WatcherError { message: String },
|
||||
}
|
||||
|
||||
// ─── get_inode ──────────────────────────────────────────────────────────────
|
||||
@@ -34,6 +35,43 @@ struct WatchState {
|
||||
last_inode: u64,
|
||||
}
|
||||
|
||||
fn process_event(event: Event, watch_path: &Path, state: &mut WatchState) -> Option<FileEvent> {
|
||||
if !event.paths.iter().any(|p| p == watch_path) {
|
||||
return None;
|
||||
}
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
EventKind::Remove(_) => return Some(FileEvent::Removed),
|
||||
_ => return None,
|
||||
}
|
||||
|
||||
let current_inode = get_inode(watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
if current_inode != 0 && state.last_inode != 0 && current_inode != state.last_inode {
|
||||
state.last_inode = current_inode;
|
||||
state.last_size = current_size;
|
||||
Some(FileEvent::Rotated {
|
||||
new_inode: current_inode,
|
||||
})
|
||||
} else if current_size > state.last_size {
|
||||
state.last_size = current_size;
|
||||
state.last_inode = current_inode;
|
||||
Some(FileEvent::Appended {
|
||||
new_size: current_size,
|
||||
})
|
||||
} else if current_size < state.last_size {
|
||||
state.last_size = current_size;
|
||||
state.last_inode = current_inode;
|
||||
Some(FileEvent::Truncated {
|
||||
new_size: current_size,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// ─── FileWatcher ────────────────────────────────────────────────────────────
|
||||
pub struct FileWatcher {
|
||||
rx: Receiver<FileEvent>,
|
||||
@@ -56,24 +94,13 @@ impl FileWatcher {
|
||||
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
|
||||
let event = match res {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
EventKind::Remove(_) => {
|
||||
let _ = tx.try_send(FileEvent::Removed);
|
||||
Err(error) => {
|
||||
let _ = tx.try_send(FileEvent::WatcherError {
|
||||
message: error.to_string(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
_ => return,
|
||||
}
|
||||
|
||||
if !event.paths.iter().any(|p| p == &watch_path) {
|
||||
return;
|
||||
}
|
||||
|
||||
let current_inode = get_inode(&watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
};
|
||||
|
||||
let mut st = state.lock().unwrap_or_else(|poison| {
|
||||
// Recover from poisoned mutex — state only tracks last_size
|
||||
@@ -81,25 +108,8 @@ impl FileWatcher {
|
||||
// cause a duplicate event, which is harmless.
|
||||
poison.into_inner()
|
||||
});
|
||||
|
||||
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode {
|
||||
let _ = tx.try_send(FileEvent::Rotated {
|
||||
new_inode: current_inode,
|
||||
});
|
||||
st.last_inode = current_inode;
|
||||
st.last_size = current_size;
|
||||
} else if current_size > st.last_size {
|
||||
let _ = tx.try_send(FileEvent::Appended {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
} else if current_size < st.last_size {
|
||||
let _ = tx.try_send(FileEvent::Truncated {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
if let Some(fe) = process_event(event, &watch_path, &mut st) {
|
||||
let _ = tx.try_send(fe);
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -144,6 +154,56 @@ mod tests {
|
||||
events
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_wrong_path_ignored() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let watched = dir.path().join("watched.log");
|
||||
let other = dir.path().join("other.log");
|
||||
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||
std::fs::write(&other, b"other\n").expect("write other");
|
||||
|
||||
let mut state = WatchState {
|
||||
last_size: 6,
|
||||
last_inode: get_inode(&watched).unwrap_or(0),
|
||||
};
|
||||
|
||||
let event = Event {
|
||||
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||
paths: vec![other.clone()],
|
||||
attrs: Default::default(),
|
||||
};
|
||||
|
||||
let result = process_event(event, &watched, &mut state);
|
||||
assert_eq!(
|
||||
result, None,
|
||||
"Remove for non-watched path should be ignored"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_correct_path_emits_removed() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let watched = dir.path().join("watched.log");
|
||||
std::fs::write(&watched, b"hello\n").expect("write watched");
|
||||
|
||||
let mut state = WatchState {
|
||||
last_size: 6,
|
||||
last_inode: get_inode(&watched).unwrap_or(0),
|
||||
};
|
||||
|
||||
let event = Event {
|
||||
kind: EventKind::Remove(notify::event::RemoveKind::File),
|
||||
paths: vec![watched.clone()],
|
||||
attrs: Default::default(),
|
||||
};
|
||||
|
||||
let result = process_event(event, &watched, &mut state);
|
||||
assert!(
|
||||
matches!(result, Some(FileEvent::Removed)),
|
||||
"Remove for watched path should emit Removed"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watcher_append() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
@@ -232,5 +292,19 @@ mod tests {
|
||||
|
||||
let d = FileEvent::Rotated { new_inode: 42 };
|
||||
assert_ne!(a, d);
|
||||
|
||||
let e1 = FileEvent::WatcherError {
|
||||
message: "io error".into(),
|
||||
};
|
||||
let e2 = FileEvent::WatcherError {
|
||||
message: "io error".into(),
|
||||
};
|
||||
assert_eq!(e1, e2);
|
||||
assert_ne!(
|
||||
e1,
|
||||
FileEvent::WatcherError {
|
||||
message: "other".into()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -857,6 +857,7 @@ impl App {
|
||||
FileEvent::Removed => {
|
||||
self.loading_state = AppLoadingState::Error("File has been deleted".into());
|
||||
}
|
||||
FileEvent::WatcherError { message: _ } => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -865,6 +866,7 @@ impl App {
|
||||
let width = self.get_content_width();
|
||||
match &mut self.loading_state {
|
||||
AppLoadingState::Ready { reader } => {
|
||||
let old_reader_line_count = reader.line_count();
|
||||
let status = reader.update_for_append();
|
||||
match status {
|
||||
Ok(
|
||||
@@ -883,13 +885,24 @@ impl App {
|
||||
};
|
||||
let new_line_count = reader.line_count();
|
||||
|
||||
if can_extend && new_line_count > old_line_count {
|
||||
if can_extend && old_line_count == old_reader_line_count {
|
||||
if let log_viewer_core::io::progressive_reader::ReaderState::Ready {
|
||||
visual_height_index: Some(index),
|
||||
reader: fr,
|
||||
} = &mut reader.state
|
||||
{
|
||||
let mut new_heights = Vec::with_capacity(new_line_count - old_line_count);
|
||||
if old_line_count > 0 {
|
||||
let last_old_line_text =
|
||||
fr.get_line(old_line_count - 1).unwrap_or("");
|
||||
let new_h = compute_line_visual_height(
|
||||
last_old_line_text,
|
||||
width,
|
||||
self.json_format,
|
||||
);
|
||||
index.replace_last_line_height(new_h);
|
||||
}
|
||||
let mut new_heights =
|
||||
Vec::with_capacity(new_line_count.saturating_sub(old_line_count));
|
||||
for i in old_line_count..new_line_count {
|
||||
let line_text = fr.get_line(i).unwrap_or("");
|
||||
new_heights.push(compute_line_visual_height(
|
||||
@@ -2473,7 +2486,9 @@ plain text line
|
||||
}
|
||||
|
||||
fn install_vhi(app: &mut App, heights: &[usize]) {
|
||||
let vhi = VisualHeightIndex::build(heights);
|
||||
let width = app.get_content_width();
|
||||
let json_format = app.json_format;
|
||||
let vhi = VisualHeightIndex::build(heights).with_params(json_format, width);
|
||||
if let AppLoadingState::Ready { reader } = &mut app.loading_state {
|
||||
if let log_viewer_core::io::progressive_reader::ReaderState::Ready {
|
||||
visual_height_index,
|
||||
@@ -2800,4 +2815,88 @@ plain text line
|
||||
cleanup(&path);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_no_trailing_newline_updates_last_line_height() {
|
||||
let path = make_temp_file("abc");
|
||||
let result = std::panic::catch_unwind(|| {
|
||||
let mut app = App::new();
|
||||
load_file_ready(&mut app, &path);
|
||||
assert_eq!(app.total_lines(), 1);
|
||||
|
||||
app.content_width = 5;
|
||||
install_vhi(&mut app, &[1usize]);
|
||||
|
||||
{
|
||||
let vhi = app.get_visual_height_index().unwrap();
|
||||
assert_eq!(vhi.visual_height_of_line(0), 1);
|
||||
assert_eq!(vhi.total_visual_rows(), 1);
|
||||
}
|
||||
|
||||
// Append content that extends line 0 (no trailing newline before)
|
||||
// "abc" + "defgh\n" = "abcdefgh\n" → 8 chars in width 5 → wraps to 2 visual rows
|
||||
// old_total=1, old_had_trailing=false → starts_new_line=false
|
||||
// new_has_trailing=true, new_newlines=1 → added = 1-1 = 0
|
||||
// Still 1 logical line, but line 0 text changed from "abc" to "abcdefgh"
|
||||
{
|
||||
use std::io::Write;
|
||||
let mut f = std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.unwrap();
|
||||
f.write_all(b"defgh\n").unwrap();
|
||||
}
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
app.poll_file_watcher();
|
||||
|
||||
assert_eq!(app.total_lines(), 1,
|
||||
"\"abcdefgh\\n\" has trailing newline → 1 logical line");
|
||||
|
||||
let vhi = app.get_visual_height_index().expect("VHI should still exist after append");
|
||||
assert_eq!(vhi.visual_height_of_line(0), 2,
|
||||
"line 0 height should be updated from 1 to 2 after extending 'abcdefgh' in width 5");
|
||||
assert_eq!(vhi.total_visual_rows(), 2);
|
||||
assert_eq!(vhi.cursor_to_first_visual_row(0), 0);
|
||||
cleanup(&path);
|
||||
});
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_no_trailing_newline_no_new_lines_only_height_change() {
|
||||
let path = make_temp_file("abc");
|
||||
let result = std::panic::catch_unwind(|| {
|
||||
let mut app = App::new();
|
||||
load_file_ready(&mut app, &path);
|
||||
assert_eq!(app.total_lines(), 1);
|
||||
|
||||
app.content_width = 5;
|
||||
install_vhi(&mut app, &[1usize]);
|
||||
|
||||
// Append without adding any new line — just extends line 0
|
||||
// "abc" + "def" = "abcdef" → 6 chars in width 5 → wraps to 2 visual rows, still 1 logical line
|
||||
{
|
||||
use std::io::Write;
|
||||
let mut f = std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.unwrap();
|
||||
f.write_all(b"def").unwrap();
|
||||
}
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
app.poll_file_watcher();
|
||||
|
||||
assert_eq!(app.total_lines(), 1,
|
||||
"no new logical line should be added");
|
||||
|
||||
let vhi = app.get_visual_height_index().expect("VHI should still exist");
|
||||
assert_eq!(vhi.visual_height_of_line(0), 2,
|
||||
"line 0 height should update even when no new lines added");
|
||||
assert_eq!(vhi.total_visual_rows(), 2);
|
||||
cleanup(&path);
|
||||
});
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user