From f9f451e0d6d2ebec6f7a59d20c833779acd558ca Mon Sep 17 00:00:00 2001 From: dailz Date: Tue, 14 Apr 2026 09:07:09 +0800 Subject: [PATCH] feat(core): implement ProgressiveFileReader with background indexer Add ProgressiveFileReader state machine (Sampling/Ready/Error) with scanned_newlines cache for O(1) line lookups. Implement spawn_indexer() for background mmap-based indexing with cancellation and progress reporting. Add VisualHeightIndex with prefix-sum + binary search for O(log N) visual height queries. Register all new io submodules and extend FileReader with reload(), save_cache(), and accessor methods. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- crates/core/src/io/file_reader.rs | 176 +++ crates/core/src/io/mod.rs | 6 + crates/core/src/io/progressive_reader.rs | 1267 ++++++++++++++++++++++ 3 files changed, 1449 insertions(+) create mode 100644 crates/core/src/io/progressive_reader.rs diff --git a/crates/core/src/io/file_reader.rs b/crates/core/src/io/file_reader.rs index f623172..79f7a1d 100644 --- a/crates/core/src/io/file_reader.rs +++ b/crates/core/src/io/file_reader.rs @@ -5,6 +5,7 @@ // ────────────────────────────────────────────────────────────────────────────── use crate::error::{CoreError, Result}; +use crate::io::index_cache::IndexCache; use crate::io::line_index::LineIndex; use std::path::{Path, PathBuf}; @@ -66,6 +67,68 @@ impl FileReader { pub fn path(&self) -> &Path { &self.path } + + pub fn from_parts(path: PathBuf, mmap: Option, line_index: LineIndex) -> Self { + Self { + path, + mmap, + line_index, + } + } + + /// Save the line index cache to disk. + pub fn save_cache(&self) -> std::io::Result<()> { + IndexCache::save(&self.path, &self.line_index)?; + Ok(()) + } + + /// Access the underlying line index. + pub fn line_index(&self) -> &LineIndex { + &self.line_index + } + + pub fn reload(&mut self) -> Result<()> { + let file = std::fs::File::open(&self.path)?; + let file_size = file.metadata()?.len(); + + self.mmap = None; + + if file_size > 0 { + self.mmap = Some( + unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?, + ); + } + + let mut reader = std::io::BufReader::new(&file); + self.line_index = LineIndex::from_reader(&mut reader).map_err(|e| CoreError::Io { + source: e, + context: "rebuilding line index on reload".into(), + })?; + + Ok(()) + } + + pub fn update_for_append(&mut self) -> Result { + let file = std::fs::File::open(&self.path)?; + let new_size = file.metadata()?.len(); + let old_size = self.mmap.as_ref().map_or(0u64, |m| m.len() as u64); + + if new_size <= old_size { + return Ok(0); + } + + let old_lines = self.line_index.line_count() as u64; + + self.mmap = None; + let mmap = + unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?; + + self.line_index + .extend_from_bytes(&mmap[old_size as usize..], old_size); + self.mmap = Some(mmap); + + Ok(self.line_index.line_count() as u64 - old_lines) + } } const _: () = { @@ -192,4 +255,117 @@ mod tests { assert!(reader.data().is_empty()); assert_eq!(reader.data(), b""); } + + #[test] + fn test_file_reader_reload() { + let f = create_temp_file(b"aaa\nbbb\n"); + let mut reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 2); + assert_eq!(reader.get_line(0), Some("aaa")); + assert_eq!(reader.get_line(1), Some("bbb")); + + reader.reload().unwrap(); + assert_eq!(reader.line_count(), 2); + assert_eq!(reader.get_line(0), Some("aaa")); + assert_eq!(reader.get_line(1), Some("bbb")); + } + + #[test] + fn test_file_reader_reload_after_truncate() { + let f = create_temp_file(b"aaa\nbbb\nccc\n"); + let mut reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 3); + + { + use std::io::Write; + let file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(f.path()) + .unwrap(); + drop(file); + } + + reader.reload().unwrap(); + assert_eq!(reader.line_count(), 0); + } + + #[test] + fn test_file_reader_update_for_append() { + let f = create_temp_file(b"aaa\nbbb\n"); + let mut reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 2); + + { + use std::io::Write; + let mut file = std::fs::OpenOptions::new() + .append(true) + .open(f.path()) + .unwrap(); + file.write_all(b"ccc\nddd\n").unwrap(); + } + + let new_lines = reader.update_for_append().unwrap(); + assert_eq!(new_lines, 2); + assert_eq!(reader.line_count(), 4); + assert_eq!(reader.get_line(0), Some("aaa")); + assert_eq!(reader.get_line(1), Some("bbb")); + assert_eq!(reader.get_line(2), Some("ccc")); + assert_eq!(reader.get_line(3), Some("ddd")); + } + + #[test] + fn test_file_reader_update_for_append_no_change() { + let f = create_temp_file(b"aaa\n"); + let mut reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 1); + + let new_lines = reader.update_for_append().unwrap(); + assert_eq!(new_lines, 0); + assert_eq!(reader.line_count(), 1); + } + + #[test] + fn test_file_reader_update_for_append_shrunk_file() { + let f = create_temp_file(b"aaa\nbbb\nccc\n"); + let mut reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 3); + + { + use std::io::Write; + let mut file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(f.path()) + .unwrap(); + file.write_all(b"x\n").unwrap(); + } + + let new_lines = reader.update_for_append().unwrap(); + assert_eq!(new_lines, 0); + } + + #[test] + fn test_file_reader_save_cache() { + let f = create_temp_file(b"aaa\nbbb\n"); + // Clear any existing cache + if let Some(cp) = crate::io::cache_util::cache_path(f.path()) { + let _ = std::fs::remove_file(cp); + } + + let reader = FileReader::open(f.path()).unwrap(); + assert!(reader.save_cache().is_ok()); + + let cached = crate::io::index_cache::IndexCache::load(f.path()); + assert!(cached.is_some(), "cache should exist after save_cache"); + assert_eq!(cached.unwrap().line_count(), 2); + } + + #[test] + fn test_file_reader_line_index_accessor() { + let f = create_temp_file(b"x\ny\n"); + let reader = FileReader::open(f.path()).unwrap(); + let idx = reader.line_index(); + assert_eq!(idx.line_count(), 2); + } } diff --git a/crates/core/src/io/mod.rs b/crates/core/src/io/mod.rs index 6da2d4a..77c88bd 100644 --- a/crates/core/src/io/mod.rs +++ b/crates/core/src/io/mod.rs @@ -13,3 +13,9 @@ pub mod file_reader; // 同理,声明 line_index 子模块,定义在 line_index.rs 文件中。 // 这个模块负责维护文件中每一行的起始位置索引,用于快速定位某一行内容。 pub mod line_index; + +pub mod cache_util; +pub mod index_cache; +pub mod line_sampler; +pub mod progressive_reader; +pub mod wrap; diff --git a/crates/core/src/io/progressive_reader.rs b/crates/core/src/io/progressive_reader.rs new file mode 100644 index 0000000..5e62482 --- /dev/null +++ b/crates/core/src/io/progressive_reader.rs @@ -0,0 +1,1267 @@ +use std::cell::RefCell; +use std::fmt; +use std::path::{Path, PathBuf}; + +use crate::error::{CoreError, Result}; +use crate::io::file_reader::FileReader; +use crate::io::index_cache::IndexCache; +use crate::io::line_index::LineIndex; +use crate::io::line_sampler::sample_line_count; +use crate::io::wrap::{format_json_line, wrap_line_chars, MAX_WRAP_INPUT_LEN}; + +// ─── IndexerMessage ────────────────────────────────────────────────────────── + +pub enum IndexerMessage { + Progress { + generation: u64, + percent: f64, + lines_scanned: u64, + }, + Complete { + generation: u64, + reader: FileReader, + visual_height_index: Option, + }, + Error { + generation: u64, + message: String, + }, +} + +impl fmt::Debug for IndexerMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IndexerMessage::Progress { + generation, + percent, + lines_scanned, + } => f + .debug_struct("IndexerMessage::Progress") + .field("generation", generation) + .field("percent", percent) + .field("lines_scanned", lines_scanned) + .finish(), + IndexerMessage::Complete { + generation, + reader: _, + visual_height_index, + } => f + .debug_struct("IndexerMessage::Complete") + .field("generation", generation) + .field("reader", &"") + .field("visual_height_index", visual_height_index) + .finish(), + IndexerMessage::Error { + generation, + message, + } => f + .debug_struct("IndexerMessage::Error") + .field("generation", generation) + .field("message", message) + .finish(), + } + } +} + +impl IndexerMessage { + pub fn generation(&self) -> u64 { + match self { + IndexerMessage::Progress { generation, .. } => *generation, + IndexerMessage::Complete { generation, .. } => *generation, + IndexerMessage::Error { generation, .. } => *generation, + } + } +} + +// ─── VisualHeightIndex ─────────────────────────────────────────────────────── + +#[derive(Debug)] +pub struct VisualHeightIndex { + /// prefix_sums[i] = sum of visual_heights[0..i], prefix_sums[0] = 0. + /// Length = line_count + 1. + prefix_sums: Vec, + total_visual_rows: u64, + json_format: bool, + terminal_width: usize, +} + +impl VisualHeightIndex { + pub fn build(visual_heights: &[usize]) -> Self { + let mut prefix_sums = Vec::with_capacity(visual_heights.len() + 1); + prefix_sums.push(0u64); + for &h in visual_heights { + prefix_sums.push(prefix_sums.last().unwrap() + h as u64); + } + let total = *prefix_sums.last().unwrap(); + Self { + prefix_sums, + total_visual_rows: total, + json_format: false, + terminal_width: 0, + } + } + + fn with_params(mut self, json_format: bool, terminal_width: usize) -> Self { + self.json_format = json_format; + self.terminal_width = terminal_width; + self + } + + pub fn total_visual_rows(&self) -> u64 { + self.total_visual_rows + } + + /// O(log N) binary-search: which logical line does `visual_row` belong to? + pub fn visual_row_to_logical_row(&self, visual_row: u64) -> usize { + if visual_row >= self.total_visual_rows { + return self.prefix_sums.len().saturating_sub(2); + } + self.prefix_sums.partition_point(|&sum| sum <= visual_row) - 1 + } + + /// Returns (logical_line, sub_row_offset_within_that_line). + pub fn visual_row_to_logical_row_with_offset(&self, visual_row: u64) -> (usize, usize) { + let logical = self.visual_row_to_logical_row(visual_row); + let row_start = self.prefix_sums.get(logical).copied().unwrap_or(0); + let offset = (visual_row - row_start) as usize; + (logical, offset) + } + + /// O(1) prefix-sum lookup: first visual row for a given logical line. + pub fn cursor_to_first_visual_row(&self, logical_line: usize) -> u64 { + self.prefix_sums.get(logical_line).copied().unwrap_or(0) + } + + /// O(1): visual height of a single logical line. + pub fn visual_height_of_line(&self, logical_line: usize) -> usize { + let start = self.prefix_sums.get(logical_line).copied().unwrap_or(0); + let end = self + .prefix_sums + .get(logical_line + 1) + .copied() + .unwrap_or(start); + (end - start) as usize + } + + pub fn line_count(&self) -> usize { + self.prefix_sums.len().saturating_sub(1) + } + + pub fn is_valid_for(&self, json_format: bool, terminal_width: usize) -> bool { + self.json_format == json_format && self.terminal_width == terminal_width + } + + pub fn extend_from_heights(&mut self, new_heights: &[usize]) { + for &h in new_heights { + let last = *self.prefix_sums.last().unwrap_or(&0); + self.prefix_sums.push(last + h as u64); + self.total_visual_rows += h as u64; + } + } +} + +// ─── VisualHeightRebuildResult ──────────────────────────────────────────────── + +pub struct VisualHeightRebuildResult { + pub generation: u64, + pub index: VisualHeightIndex, +} + +// ─── Visual height computation helpers ──────────────────────────────────────── + +pub fn compute_line_visual_height( + line_text: &str, + terminal_width: usize, + json_format: bool, +) -> usize { + if line_text.len() > MAX_WRAP_INPUT_LEN { + return 1; + } + if json_format { + let formatted = format_json_line(line_text); + compute_text_visual_height(&formatted, terminal_width) + } else { + compute_text_visual_height(line_text, terminal_width) + } +} + +fn compute_text_visual_height(text: &str, width: usize) -> usize { + let mut height = 0; + for sub_line in text.split('\n') { + height += wrap_line_chars(sub_line, width).len(); + } + height.max(1) +} + +fn compute_visual_heights( + reader: &FileReader, + terminal_width: usize, + json_format: bool, +) -> Vec { + let line_count = reader.line_count(); + let mut visual_heights = Vec::with_capacity(line_count); + for i in 0..line_count { + let line_text = reader.get_line(i).unwrap_or(""); + visual_heights.push(compute_line_visual_height( + line_text, + terminal_width, + json_format, + )); + } + visual_heights +} + +// ─── ReaderState ───────────────────────────────────────────────────────────── + +pub enum ReaderState { + Sampling { + mmap: Option, + estimated_lines: u64, + avg_line_length: f64, + /// Byte positions of newlines found during initial scan. + /// These are mutable via interior mutability (RefCell). + scanned_newlines: RefCell>, + /// Byte position up to which we've scanned. + scanned_up_to: RefCell, + }, + Ready { + reader: FileReader, + visual_height_index: Option, + }, + Error(String), +} + +pub fn spawn_indexer( + path: PathBuf, + generation: u64, + terminal_width: usize, + json_format: bool, + cancel_rx: crossbeam_channel::Receiver<()>, +) -> crossbeam_channel::Receiver { + let (tx, rx) = crossbeam_channel::bounded(10); + + std::thread::spawn(move || { + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) => { + let _ = tx.send(IndexerMessage::Error { + generation, + message: e.to_string(), + }); + return; + } + }; + let file_size = match file.metadata() { + Ok(m) => m.len(), + Err(e) => { + let _ = tx.send(IndexerMessage::Error { + generation, + message: e.to_string(), + }); + return; + } + }; + + let mmap = if file_size == 0 { + None + } else { + match unsafe { memmap2::Mmap::map(&file) } { + Ok(m) => Some(m), + Err(e) => { + let _ = tx.send(IndexerMessage::Error { + generation, + message: e.to_string(), + }); + return; + } + } + }; + + let data = mmap.as_deref().unwrap_or(&[]); + + if !data.is_empty() { + let mut newline_count: usize = 0; + let mut chars_since_check: usize = 0; + let mut prev_pos: usize = 0; + + for pos in memchr::memchr_iter(b'\n', data) { + chars_since_check += pos - prev_pos; + prev_pos = pos; + + if chars_since_check >= 1_000_000 { + chars_since_check = 0; + if cancel_rx.try_recv().is_ok() { + return; + } + } + + newline_count += 1; + + if newline_count % 256_000 == 0 { + let percent = (pos as f64 / file_size as f64) * 100.0; + let _ = tx.send(IndexerMessage::Progress { + generation, + percent, + lines_scanned: newline_count as u64, + }); + } + } + } + + if cancel_rx.try_recv().is_ok() { + return; + } + + let line_index = LineIndex::from_bytes(data); + + let _ = IndexCache::save(&path, &line_index); + + let reader = FileReader::from_parts(path, mmap, line_index); + + let visual_height_index = if terminal_width > 0 { + let visual_heights = compute_visual_heights(&reader, terminal_width, json_format); + Some(VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width)) + } else { + None + }; + + let _ = tx.send(IndexerMessage::Complete { + generation, + reader, + visual_height_index, + }); + }); + + rx +} + +pub fn spawn_visual_height_rebuild( + path: PathBuf, + generation: u64, + terminal_width: usize, + json_format: bool, + cancel_rx: crossbeam_channel::Receiver<()>, +) -> crossbeam_channel::Receiver { + let (tx, rx) = crossbeam_channel::bounded(1); + + std::thread::spawn(move || { + let line_index = match IndexCache::load(&path) { + Some(idx) => idx, + None => return, + }; + + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(_) => return, + }; + + let file_size = match file.metadata() { + Ok(m) => m.len(), + Err(_) => return, + }; + + let mmap = if file_size == 0 { + None + } else { + match unsafe { memmap2::Mmap::map(&file) } { + Ok(m) => Some(m), + Err(_) => return, + } + }; + + if cancel_rx.try_recv().is_ok() { + return; + } + + let reader = FileReader::from_parts(path, mmap, line_index); + let visual_heights = compute_visual_heights(&reader, terminal_width, json_format); + let index = + VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width); + + let _ = tx.send(VisualHeightRebuildResult { generation, index }); + }); + + rx +} + +// ─── ProgressiveFileReader ─────────────────────────────────────────────────── + +/// Maximum bytes to scan during initial open for the Sampling state. +const INITIAL_SCAN_BYTES: usize = 64 * 1024; + +pub struct ProgressiveFileReader { + path: PathBuf, + pub state: ReaderState, + cancel_tx: Option>, + indexer_rx: Option>, + generation: u64, + vh_rebuild_cancel_tx: Option>, + vh_rebuild_rx: Option>, + vh_generation: u64, +} + +impl ProgressiveFileReader { + /// Open a file for progressive reading. + /// + /// - If a cached `LineIndex` exists, returns immediately in `Ready` state. + /// - Otherwise, mmaps the file, takes a quick line-count sample, scans the + /// first ~64KB for line positions, and returns in `Sampling` state. + /// - Empty files return `Sampling` with `mmap: None`. + pub fn open(path: &Path) -> Result { + // Try cache first + if let Some(line_index) = IndexCache::load(path) { + let file = std::fs::File::open(path)?; + let file_size = file.metadata()?.len(); + let mmap = if file_size == 0 { + None + } else { + Some( + unsafe { memmap2::Mmap::map(&file) } + .map_err(|e| CoreError::Mmap(e.to_string()))?, + ) + }; + let reader = FileReader::from_parts(path.to_path_buf(), mmap, line_index); + return Ok(ProgressiveFileReader { + path: path.to_path_buf(), + state: ReaderState::Ready { + reader, + visual_height_index: None, + }, + cancel_tx: None, + indexer_rx: None, + generation: 1, + vh_rebuild_cancel_tx: None, + vh_rebuild_rx: None, + vh_generation: 0, + }); + } + + // Cache miss: mmap and sample + let file = std::fs::File::open(path)?; + let file_size = file.metadata()?.len(); + + let mmap = if file_size == 0 { + None + } else { + Some(unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?) + }; + + let sample = sample_line_count(path).map_err(|e| CoreError::Io { + source: e, + context: "sampling line count".into(), + })?; + + // Scan first ~64KB for newline positions + let data = mmap.as_deref().unwrap_or(&[]); + let scan_limit = INITIAL_SCAN_BYTES.min(data.len()); + let scanned_newlines: Vec = + memchr::memchr_iter(b'\n', &data[..scan_limit]).collect(); + + Ok(ProgressiveFileReader { + path: path.to_path_buf(), + state: ReaderState::Sampling { + mmap, + estimated_lines: sample.estimated_lines, + avg_line_length: sample.avg_line_length, + scanned_newlines: RefCell::new(scanned_newlines), + scanned_up_to: RefCell::new(scan_limit), + }, + cancel_tx: None, + indexer_rx: None, + generation: 1, + vh_rebuild_cancel_tx: None, + vh_rebuild_rx: None, + vh_generation: 0, + }) + } + + /// Create a ProgressiveFileReader with channel endpoints for background indexing. + pub fn with_channels( + path: &Path, + cancel_tx: crossbeam_channel::Sender<()>, + indexer_rx: crossbeam_channel::Receiver, + generation: u64, + ) -> Result { + let mut reader = Self::open(path)?; + reader.cancel_tx = Some(cancel_tx); + reader.indexer_rx = Some(indexer_rx); + reader.generation = generation; + Ok(reader) + } + + /// Get a line by index. + /// + /// - In `Sampling` state: uses the cached newline positions to extract lines + /// from the mmap. If the requested line is beyond the scanned region, + /// extends the scan incrementally. + /// - In `Ready` state: delegates to `FileReader::get_line`. + /// - In `Error` state: returns `None`. + pub fn get_line(&self, idx: usize) -> Option { + match &self.state { + ReaderState::Sampling { + mmap, + scanned_newlines, + scanned_up_to, + .. + } => { + let mmap_data = mmap.as_deref()?; + if mmap_data.is_empty() { + return None; + } + + let mut newlines = scanned_newlines.borrow_mut(); + let mut up_to = scanned_up_to.borrow_mut(); + + // We need `idx + 1` newlines to have `idx` lines available. + // Line i starts after the i-th newline (or at byte 0 for line 0) + // and ends at the (i+1)-th newline (or end of file). + // So to return line `idx`, we need at least `idx + 1` newline positions + // (the idx-th newline marks end of line idx-1/start of line idx, + // and the (idx+1)-th newline marks end of line idx). + // Actually: line 0 starts at byte 0, ends at newline[0]. + // line 1 starts at newline[0]+1, ends at newline[1]. + // line i starts at newline[i-1]+1, ends at newline[i]. + // So to serve line idx, we need newline positions up to index idx. + + // Extend scan if needed + while newlines.len() <= idx && *up_to < mmap_data.len() { + let remaining = &mmap_data[*up_to..]; + if let Some(rel_pos) = memchr::memchr(b'\n', remaining) { + newlines.push(*up_to + rel_pos); + *up_to += rel_pos + 1; + } else { + // No more newlines; rest of file is the last line + *up_to = mmap_data.len(); + break; + } + } + + // Check if line idx is beyond what's available + // If idx == newlines.len(), it's the last line (after last newline, no trailing \n) + if idx > newlines.len() { + return None; + } + + let (start, end) = if idx == 0 { + (0, newlines.first().copied().unwrap_or(mmap_data.len())) + } else if idx <= newlines.len() { + let s = newlines[idx - 1] + 1; + let e = if idx < newlines.len() { + newlines[idx] + } else { + mmap_data.len() + }; + (s, e) + } else { + return None; + }; + + if start >= mmap_data.len() { + return None; + } + + let line_bytes = &mmap_data[start..end]; + std::str::from_utf8(line_bytes) + .map(|s| s.trim_end_matches(['\r', '\n']).to_owned()) + .ok() + } + ReaderState::Ready { reader, .. } => reader.get_line(idx).map(|s| s.to_owned()), + ReaderState::Error(_) => None, + } + } + + /// Transition to Ready state with a fully-indexed FileReader. + pub fn set_ready( + &mut self, + reader: FileReader, + visual_height_index: Option, + ) { + self.state = ReaderState::Ready { + reader, + visual_height_index, + }; + } + + /// Return the estimated or exact line count. + pub fn line_count(&self) -> usize { + match &self.state { + ReaderState::Sampling { + estimated_lines, .. + } => *estimated_lines as usize, + ReaderState::Ready { reader, .. } => reader.line_count(), + ReaderState::Error(_) => 0, + } + } + + /// Return the number of lines available from the scan (Sampling state), + /// or the exact count (Ready state). + pub fn sampled_line_count(&self) -> usize { + match &self.state { + ReaderState::Sampling { + scanned_newlines, .. + } => { + let newlines = scanned_newlines.borrow(); + // Number of newlines found = number of complete lines in scanned region. + // If we hit EOF without a trailing newline, there's one more line. + // But we return newlines.len() as specified — it matches "lines in sampled region" + // since each newline demarcates one line. + newlines.len() + } + ReaderState::Ready { reader, .. } => reader.line_count(), + ReaderState::Error(_) => 0, + } + } + + /// Whether the reader is in Sampling state. + pub fn is_sampling(&self) -> bool { + matches!(self.state, ReaderState::Sampling { .. }) + } + + /// Whether a full prefix sum index is available (only in Ready state). + pub fn has_prefix_sum(&self) -> bool { + matches!(self.state, ReaderState::Ready { .. }) + } + + /// The file path. + pub fn path(&self) -> &Path { + &self.path + } + + /// The generation counter (tracks file-open count). + pub fn generation(&self) -> u64 { + self.generation + } + + /// Non-blocking poll for indexer messages. + /// + /// Discards messages whose generation doesn't match `self.generation`. + /// Returns the first matching message, or `None` if no message is available. + pub fn poll_indexer(&mut self) -> Option { + let rx = self.indexer_rx.as_ref()?; + loop { + match rx.try_recv() { + Ok(msg) => { + if msg.generation() == self.generation { + return Some(msg); + } + // Stale message, discard and keep polling + } + Err(crossbeam_channel::TryRecvError::Empty) => return None, + Err(crossbeam_channel::TryRecvError::Disconnected) => return None, + } + } + } + + /// Get a reference to the internal FileReader (only in Ready state). + pub fn reader(&self) -> Option<&FileReader> { + match &self.state { + ReaderState::Ready { reader, .. } => Some(reader), + _ => None, + } + } + + pub fn update_for_append(&mut self) -> Result { + match &mut self.state { + ReaderState::Ready { reader, .. } => reader.update_for_append(), + _ => Ok(0), + } + } + + pub fn invalidate_visual_height_index(&mut self) { + if let ReaderState::Ready { + visual_height_index, + .. + } = &mut self.state + { + *visual_height_index = None; + } + } + + pub fn start_visual_height_rebuild(&mut self, terminal_width: usize, json_format: bool) { + if let Some(tx) = self.vh_rebuild_cancel_tx.take() { + let _ = tx.send(()); + } + + let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + self.vh_generation += 1; + let vh_gen = self.vh_generation; + + let rx = spawn_visual_height_rebuild( + self.path.clone(), + vh_gen, + terminal_width, + json_format, + cancel_rx, + ); + + self.vh_rebuild_cancel_tx = Some(cancel_tx); + self.vh_rebuild_rx = Some(rx); + } + + pub fn poll_visual_height_rebuild(&mut self) -> Option { + let rx = self.vh_rebuild_rx.as_ref()?; + match rx.try_recv() { + Ok(result) if result.generation == self.vh_generation => { + self.vh_rebuild_cancel_tx = None; + self.vh_rebuild_rx = None; + Some(result.index) + } + Ok(_) => None, + Err(crossbeam_channel::TryRecvError::Empty) => None, + Err(crossbeam_channel::TryRecvError::Disconnected) => { + self.vh_rebuild_cancel_tx = None; + self.vh_rebuild_rx = None; + None + } + } + } + + pub fn reload(&mut self) -> Result<()> { + match &mut self.state { + ReaderState::Ready { reader, .. } => reader.reload(), + _ => Ok(()), + } + } + + pub fn save_cache(&self) -> std::io::Result<()> { + match &self.state { + ReaderState::Ready { reader, .. } => reader.save_cache(), + _ => Ok(()), + } + } +} + +impl Drop for ProgressiveFileReader { + fn drop(&mut self) { + if let Some(tx) = &self.cancel_tx { + let _ = tx.send(()); + } + if let Some(tx) = self.vh_rebuild_cancel_tx.take() { + let _ = tx.send(()); + } + } +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::line_index::LineIndex; + use std::io::Write; + use tempfile::NamedTempFile; + + fn create_temp_file(content: &[u8]) -> NamedTempFile { + let mut f = NamedTempFile::new().unwrap(); + f.write_all(content).unwrap(); + f.flush().unwrap(); + f + } + + #[test] + fn test_progressive_cache_hit() { + let f = create_temp_file(b"line1\nline2\nline3\n"); + // Build and save a cache + let data = std::fs::read(f.path()).unwrap(); + let index = LineIndex::from_bytes(&data); + IndexCache::save(f.path(), &index).unwrap(); + + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(!reader.is_sampling(), "cache hit should return Ready state"); + assert_eq!(reader.line_count(), 3); + } + + #[test] + fn test_progressive_cache_miss() { + let f = create_temp_file(b"alpha\nbeta\ngamma\n"); + // No cache saved — should return Sampling + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!( + reader.is_sampling(), + "cache miss should return Sampling state" + ); + } + + #[test] + fn test_progressive_transition_sampling_to_ready() { + let f = create_temp_file(b"aaa\nbbb\nccc\n"); + let mut pfr = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(pfr.is_sampling()); + + // Build a FileReader for the transition + let fr = FileReader::open(f.path()).unwrap(); + pfr.set_ready(fr, None); + + assert!(!pfr.is_sampling()); + assert!(pfr.has_prefix_sum()); + assert_eq!(pfr.line_count(), 3); + } + + #[test] + fn test_progressive_empty_file() { + let f = create_temp_file(b""); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + // Empty file should not panic, either Sampling (mmap=None) or Ready + assert_eq!(reader.line_count(), 0); + assert_eq!(reader.get_line(0), None); + } + + #[test] + fn test_progressive_get_line_sampling() { + let content = b"first line\nsecond line\nthird line\nfourth line\n"; + let f = create_temp_file(content); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(reader.is_sampling()); + + // Get first few lines (within initial 64KB scan) + assert_eq!(reader.get_line(0), Some("first line".to_owned())); + assert_eq!(reader.get_line(1), Some("second line".to_owned())); + assert_eq!(reader.get_line(2), Some("third line".to_owned())); + assert_eq!(reader.get_line(3), Some("fourth line".to_owned())); + assert_eq!(reader.get_line(4), None); + } + + #[test] + fn test_progressive_get_line_ready() { + let f = create_temp_file(b"aaa\nbbb\nccc\n"); + // Save cache so we get Ready state + let data = std::fs::read(f.path()).unwrap(); + let index = LineIndex::from_bytes(&data); + IndexCache::save(f.path(), &index).unwrap(); + + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(!reader.is_sampling()); + assert_eq!(reader.get_line(0), Some("aaa".to_owned())); + assert_eq!(reader.get_line(1), Some("bbb".to_owned())); + assert_eq!(reader.get_line(2), Some("ccc".to_owned())); + assert_eq!(reader.get_line(3), None); + } + + #[test] + fn test_progressive_from_parts() { + let f = create_temp_file(b"hello\nworld\n"); + let data = std::fs::read(f.path()).unwrap(); + let index = LineIndex::from_bytes(&data); + let file = std::fs::File::open(f.path()).unwrap(); + let mmap = Some(unsafe { memmap2::Mmap::map(&file).unwrap() }); + + let reader = FileReader::from_parts(f.path().to_path_buf(), mmap, index); + assert_eq!(reader.line_count(), 2); + assert_eq!(reader.get_line(0), Some("hello")); + assert_eq!(reader.get_line(1), Some("world")); + } + + #[test] + fn test_progressive_drop_sends_cancel() { + let (tx, rx) = crossbeam_channel::bounded(1); + let f = create_temp_file(b"test\n"); + + let mut reader = ProgressiveFileReader::open(f.path()).unwrap(); + reader.cancel_tx = Some(tx); + reader.indexer_rx = None; + + drop(reader); + + // The cancel signal should have been sent + assert!(rx.try_recv().is_ok()); + } + + #[test] + fn test_progressive_sampled_line_count() { + let content = b"alpha\nbeta\ngamma\ndelta\nepsilon\n"; + let f = create_temp_file(content); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(reader.is_sampling()); + + // sampled_line_count should return the number of newlines found in initial scan + let count = reader.sampled_line_count(); + assert_eq!(count, 5, "should find all 5 newlines in small file"); + } + + #[test] + fn test_progressive_poll_indexer_discards_stale() { + let (tx, rx) = crossbeam_channel::bounded(10); + let f = create_temp_file(b"data\n"); + + let mut reader = ProgressiveFileReader::open(f.path()).unwrap(); + reader.indexer_rx = Some(rx); + reader.generation = 2; // Our generation is 2 + + // Send a stale message (generation 1) + tx.send(IndexerMessage::Progress { + generation: 1, + percent: 50.0, + lines_scanned: 100, + }) + .unwrap(); + + // Send a current message (generation 2) + tx.send(IndexerMessage::Progress { + generation: 2, + percent: 75.0, + lines_scanned: 200, + }) + .unwrap(); + + // Should get the current message, discarding the stale one + let msg = reader.poll_indexer().unwrap(); + assert_eq!(msg.generation(), 2); + + // No more current messages + assert!(reader.poll_indexer().is_none()); + } + + #[test] + fn test_progressive_get_line_sampling_no_trailing_newline() { + let content = b"aaa\nbbb\nccc"; // No trailing newline + let f = create_temp_file(content); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(reader.is_sampling()); + + assert_eq!(reader.get_line(0), Some("aaa".to_owned())); + assert_eq!(reader.get_line(1), Some("bbb".to_owned())); + assert_eq!(reader.get_line(2), Some("ccc".to_owned())); + assert_eq!(reader.get_line(3), None); + } + + #[test] + fn test_progressive_get_line_beyond_initial_scan() { + // Create content larger than 64KB to test incremental scan + let mut content = Vec::new(); + for i in 0..5000 { + writeln!(content, "line number {:05}", i).unwrap(); + } + // Make it > 64KB + assert!( + content.len() > INITIAL_SCAN_BYTES, + "test data should exceed initial scan size" + ); + + let f = create_temp_file(&content); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert!(reader.is_sampling()); + + // Initial sampled lines should be less than 5000 + let initial = reader.sampled_line_count(); + assert!(initial < 5000, "initial scan should not find all lines"); + + // Request line beyond initial scan — should extend incrementally + let line_4000 = reader.get_line(4000); + assert!( + line_4000.is_some(), + "should find line 4000 after scan extension" + ); + assert_eq!(line_4000.unwrap(), "line number 04000"); + + // Line 0 should still work + assert_eq!(reader.get_line(0), Some("line number 00000".to_owned())); + } + + #[test] + fn test_progressive_path() { + let f = create_temp_file(b"data\n"); + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + assert_eq!(reader.path(), f.path()); + } + + #[test] + fn test_progressive_reader_accessor() { + let f = create_temp_file(b"x\ny\nz\n"); + let data = std::fs::read(f.path()).unwrap(); + let index = LineIndex::from_bytes(&data); + IndexCache::save(f.path(), &index).unwrap(); + + let reader = ProgressiveFileReader::open(f.path()).unwrap(); + let fr = reader.reader().unwrap(); + assert_eq!(fr.line_count(), 3); + } + + #[test] + fn test_spawn_indexer_complete() { + let f = create_temp_file(b"line1\nline2\nline3\n"); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + let msg = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap(); + match msg { + IndexerMessage::Complete { + generation, + reader, + visual_height_index, + } => { + assert_eq!(generation, 1); + assert_eq!(reader.line_count(), 3); + assert_eq!(reader.get_line(0), Some("line1")); + assert_eq!(reader.get_line(1), Some("line2")); + assert_eq!(reader.get_line(2), Some("line3")); + assert!(visual_height_index.is_some()); + let idx = visual_height_index.unwrap(); + assert_eq!(idx.total_visual_rows(), 3); + assert_eq!(idx.visual_row_to_logical_row(0), 0); + assert_eq!(idx.visual_row_to_logical_row(1), 1); + assert_eq!(idx.visual_row_to_logical_row(2), 2); + } + other => panic!("expected Complete, got {:?}", other), + } + } + + #[test] + fn test_spawn_indexer_cancel() { + let mut content = Vec::new(); + for i in 0..500_000 { + writeln!(content, "line number {:08}", i).unwrap(); + } + let f = create_temp_file(&content); + + let (cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + cancel_tx.send(()).unwrap(); + + let result = rx.recv_timeout(std::time::Duration::from_secs(5)); + match result { + Err(crossbeam_channel::RecvTimeoutError::Timeout) + | Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {} + Ok(IndexerMessage::Complete { .. }) => { + // Thread finished before cancel was processed — acceptable + } + other => panic!("unexpected message: {:?}", other), + } + } + + #[test] + fn test_spawn_indexer_progress() { + let mut content = Vec::new(); + for i in 0..600_000 { + writeln!(content, "line number {:08}", i).unwrap(); + } + let f = create_temp_file(&content); + + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + let mut got_progress = false; + let mut got_complete = false; + loop { + match rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(IndexerMessage::Progress { generation, .. }) => { + assert_eq!(generation, 1); + got_progress = true; + } + Ok(IndexerMessage::Complete { + generation, reader, .. + }) => { + assert_eq!(generation, 1); + assert_eq!(reader.line_count(), 600_000); + got_complete = true; + break; + } + Ok(IndexerMessage::Error { message, .. }) => { + panic!("unexpected error: {}", message); + } + Err(e) => panic!("recv error: {:?}", e), + } + } + assert!(got_complete, "should receive Complete"); + // Progress may or may not be sent depending on timing, so we don't assert got_progress + let _ = got_progress; + } + + #[test] + fn test_spawn_indexer_cache_saved() { + let f = create_temp_file(b"cached line1\ncached line2\n"); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + let msg = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap(); + match &msg { + IndexerMessage::Complete { .. } => {} + other => panic!("expected Complete, got {:?}", other), + } + + let cached = IndexCache::load(f.path()); + assert!( + cached.is_some(), + "cache should be saved after indexer completes" + ); + let cached_index = cached.unwrap(); + assert_eq!(cached_index.line_count(), 2); + } + + #[test] + fn test_spawn_indexer_error() { + let bad_path = PathBuf::from("/nonexistent/file/that/does/not/exist.log"); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(bad_path.clone(), 1, 80, false, cancel_rx); + + let msg = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap(); + match msg { + IndexerMessage::Error { + generation, + message, + } => { + assert_eq!(generation, 1); + assert!(!message.is_empty()); + } + other => panic!("expected Error, got {:?}", other), + } + } + + #[test] + fn test_spawn_indexer_empty_file() { + let f = create_temp_file(b""); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + let msg = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap(); + match msg { + IndexerMessage::Complete { + generation, reader, .. + } => { + assert_eq!(generation, 1); + assert_eq!(reader.line_count(), 0); + } + other => panic!("expected Complete, got {:?}", other), + } + } + + #[test] + fn test_visual_height_index_binary_search() { + // heights: [3, 1, 4, 1, 5] + // prefix: [0, 3, 4, 8, 9, 14] + let heights = [3, 1, 4, 1, 5]; + let idx = VisualHeightIndex::build(&heights); + + assert_eq!(idx.total_visual_rows(), 14); + assert_eq!(idx.line_count(), 5); + + assert_eq!(idx.visual_row_to_logical_row(0), 0); + assert_eq!(idx.visual_row_to_logical_row(1), 0); + assert_eq!(idx.visual_row_to_logical_row(2), 0); + assert_eq!(idx.visual_row_to_logical_row(3), 1); + assert_eq!(idx.visual_row_to_logical_row(4), 2); + assert_eq!(idx.visual_row_to_logical_row(7), 2); + assert_eq!(idx.visual_row_to_logical_row(8), 3); + assert_eq!(idx.visual_row_to_logical_row(9), 4); + assert_eq!(idx.visual_row_to_logical_row(13), 4); + assert_eq!(idx.visual_row_to_logical_row(14), 4); + assert_eq!(idx.visual_row_to_logical_row(999), 4); + } + + #[test] + fn test_visual_height_index_with_offset() { + let heights = [3, 1, 4]; + let idx = VisualHeightIndex::build(&heights); + + assert_eq!(idx.visual_row_to_logical_row_with_offset(0), (0, 0)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(1), (0, 1)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(2), (0, 2)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(3), (1, 0)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(4), (2, 0)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(5), (2, 1)); + assert_eq!(idx.visual_row_to_logical_row_with_offset(7), (2, 3)); + } + + #[test] + fn test_visual_height_index_cursor_to_first() { + let heights = [2, 3, 1]; + let idx = VisualHeightIndex::build(&heights); + + assert_eq!(idx.cursor_to_first_visual_row(0), 0); + assert_eq!(idx.cursor_to_first_visual_row(1), 2); + assert_eq!(idx.cursor_to_first_visual_row(2), 5); + assert_eq!(idx.cursor_to_first_visual_row(99), 0); + } + + #[test] + fn test_visual_height_index_empty() { + let heights: [usize; 0] = []; + let idx = VisualHeightIndex::build(&heights); + + assert_eq!(idx.total_visual_rows(), 0); + assert_eq!(idx.line_count(), 0); + assert_eq!(idx.visual_row_to_logical_row(0), 0); + assert_eq!(idx.cursor_to_first_visual_row(0), 0); + } + + #[test] + fn test_max_wrap_input_len_guard() { + let long_line = "x".repeat(MAX_WRAP_INPUT_LEN + 1); + let temp = create_temp_file(format!("short\n{}\n", long_line).as_bytes()); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(temp.path().to_path_buf(), 1, 80, false, cancel_rx); + let msg = rx.recv_timeout(std::time::Duration::from_secs(10)).unwrap(); + + match msg { + IndexerMessage::Complete { + visual_height_index, + .. + } => { + let idx = visual_height_index.expect("should have visual height index"); + assert_eq!(idx.visual_height_of_line(0), 1); + assert_eq!(idx.visual_height_of_line(1), 1); + } + other => panic!("expected Complete, got {:?}", other), + } + } + + #[test] + fn test_visual_height_index_is_valid_for() { + let heights = [1, 2, 3]; + let idx = VisualHeightIndex::build(&heights).with_params(true, 80); + + assert!(idx.is_valid_for(true, 80)); + assert!(!idx.is_valid_for(false, 80)); + assert!(!idx.is_valid_for(true, 40)); + } + + #[test] + fn test_visual_height_index_visual_height_of_line() { + let heights = [3, 1, 5, 2]; + let idx = VisualHeightIndex::build(&heights); + + assert_eq!(idx.visual_height_of_line(0), 3); + assert_eq!(idx.visual_height_of_line(1), 1); + assert_eq!(idx.visual_height_of_line(2), 5); + assert_eq!(idx.visual_height_of_line(3), 2); + assert_eq!(idx.visual_height_of_line(4), 0); + } + + #[test] + fn test_visual_height_index_extend_from_heights() { + let heights = [2, 3]; + let mut idx = VisualHeightIndex::build(&heights); + assert_eq!(idx.total_visual_rows(), 5); + assert_eq!(idx.line_count(), 2); + + idx.extend_from_heights(&[4, 1]); + + assert_eq!(idx.line_count(), 4); + assert_eq!(idx.total_visual_rows(), 10); + assert_eq!(idx.cursor_to_first_visual_row(0), 0); + assert_eq!(idx.cursor_to_first_visual_row(1), 2); + assert_eq!(idx.cursor_to_first_visual_row(2), 5); + assert_eq!(idx.cursor_to_first_visual_row(3), 9); + assert_eq!(idx.visual_row_to_logical_row(4), 1); + assert_eq!(idx.visual_row_to_logical_row(6), 2); + } + + #[test] + fn test_visual_height_index_extend_empty() { + let heights: [usize; 0] = []; + let mut idx = VisualHeightIndex::build(&heights); + assert_eq!(idx.total_visual_rows(), 0); + + idx.extend_from_heights(&[1, 2, 3]); + + assert_eq!(idx.total_visual_rows(), 6); + assert_eq!(idx.line_count(), 3); + } +}