// ─── mmap_reader.rs ────────────────────────────────────────────────────────── // mmap-based FileReaderBackend implementations with 5 variants for benchmarking. // Includes SIGBUS handler for file-truncation resilience and remap support // for growing-file scenarios. // ────────────────────────────────────────────────────────────────────────────── use std::fs::File; use std::io::BufReader; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, Ordering}; use std::sync::Once; use memmap2::{Advice, Mmap, MmapOptions, RemapOptions}; use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal}; use crate::line_index::LineIndex; use crate::FileReaderBackend; // ─── SIGBUS Handler ────────────────────────────────────────────────────────── // // Signal-safety architecture: // - Old handler state is stored in raw atomics (AtomicU8 + AtomicPtr) that are // async-signal-safe to read — no OnceLock, Mutex, or other non-trivial abstractions // in the signal handler path. // - Installation uses `Once` for idempotency and follows a strict publish-then-install // sequence to close the "handler-active-but-state-unpublished" race window. /// Global flag set by the SIGBUS handler when a bus error is intercepted. /// Process-global: concurrent benchmarks that reset/check this flag may interfere. /// Currently acceptable because only `rotation` suite (sequential) uses it. static SIGBUS_OCCURRED: AtomicBool = AtomicBool::new(false); /// Discriminant values for old SIGBUS handler type. const HANDLER_NONE: u8 = 0; const HANDLER_DEFAULT: u8 = 1; const HANDLER_IGNORE: u8 = 2; const HANDLER_PLAIN: u8 = 3; // extern "C" fn(c_int) #[allow(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")] const HANDLER_SIGACTION: u8 = 4; // extern "C" fn(c_int, *mut siginfo_t, *mut c_void) /// Old SIGBUS handler type — raw atomic, async-signal-safe to read. static OLD_HANDLER_KIND: AtomicU8 = AtomicU8::new(HANDLER_NONE); /// Old SIGBUS handler function pointer — raw atomic, async-signal-safe to read. static OLD_HANDLER_PTR: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); /// Ensures `install_sigbus_handler` runs exactly once across all threads. static INSTALL_ONCE: Once = Once::new(); /// Returns `true` if a SIGBUS was intercepted since the last reset. pub fn sigbus_flag() -> bool { SIGBUS_OCCURRED.load(Ordering::SeqCst) } /// Resets the SIGBUS flag. Call before operations where you want to detect /// a fresh SIGBUS. pub fn reset_sigbus_flag() { SIGBUS_OCCURRED.store(false, Ordering::SeqCst) } /// SIGBUS signal handler. /// /// # Safety Constraints (signal handler context) /// - NO TLS access /// - NO memory allocation /// - NO lock acquisition /// - Only: AtomicBool store → mmap → raw atomic loads → chain extern "C" fn sigbus_handler( sig: libc::c_int, info: *mut libc::siginfo_t, ctx: *mut std::ffi::c_void, ) { SIGBUS_OCCURRED.store(true, Ordering::SeqCst); // Align fault address down to page boundary. // si_addr is NOT guaranteed to be page-aligned; without this, mmap(MAP_FIXED) // could replace the wrong page. let addr = unsafe { (*info).si_addr() }; let aligned = (addr as usize & !0xFFF) as *mut libc::c_void; // Map anonymous zero page at fault address to prevent crash on file truncation. let result = unsafe { libc::mmap( aligned, 4096, libc::PROT_READ, libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_FIXED, -1, 0, ) }; if result == libc::MAP_FAILED { // Chain to old handler via raw atomics (async-signal-safe). let kind = OLD_HANDLER_KIND.load(Ordering::Acquire); match kind { HANDLER_PLAIN => { let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire); if !ptr.is_null() { // SAFETY: ptr was derived from a valid function pointer // published by install_sigbus_handler before this handler was installed. let f: extern "C" fn(libc::c_int) = unsafe { std::mem::transmute(ptr) }; f(sig); } else { unsafe { libc::_exit(128 + sig) }; } } HANDLER_SIGACTION => { let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire); if !ptr.is_null() { let f: extern "C" fn( libc::c_int, *mut libc::siginfo_t, *mut std::ffi::c_void, ) = unsafe { std::mem::transmute(ptr) }; f(sig, info, ctx); } else { unsafe { libc::_exit(128 + sig) }; } } _ => { // HANDLER_NONE / HANDLER_DEFAULT / HANDLER_IGNORE — no safe chaining. unsafe { libc::_exit(128 + sig) }; } } } } /// Install the SIGBUS handler via sigaction(). Chains to any previous handler. /// Uses `Once` to guarantee exactly-once installation — safe to call from /// multiple threads concurrently. /// /// The install sequence closes the "install-then-publish" race by: /// 1. Querying the current handler via raw `libc::sigaction` (no modification). /// 2. Publishing old handler state to raw atomics (`Release` stores). /// 3. Installing our handler via `nix::sigaction`. /// /// This ensures the atomics are readable *before* our handler can fire. fn install_sigbus_handler() { INSTALL_ONCE.call_once(|| { // Step 1: Query current SIGBUS disposition without changing it. let mut old_act: libc::sigaction = unsafe { std::mem::zeroed() }; let ret = unsafe { libc::sigaction(libc::SIGBUS, std::ptr::null(), &mut old_act) }; if ret != 0 { // Failed to query — skip installation entirely. return; } // Step 2: Publish old handler to atomics BEFORE installing ours. let is_siginfo = (old_act.sa_flags & libc::SA_SIGINFO) != 0; let raw_usize = old_act.sa_sigaction as usize; if raw_usize == 0 { // SIG_DFL (null function pointer → default disposition) OLD_HANDLER_KIND.store(HANDLER_DEFAULT, Ordering::Release); } else if raw_usize == 1 { // SIG_IGN (sentinel value 1 → ignore disposition) OLD_HANDLER_KIND.store(HANDLER_IGNORE, Ordering::Release); } else { // Real handler function pointer OLD_HANDLER_PTR.store(raw_usize as *mut std::ffi::c_void, Ordering::Release); if is_siginfo { OLD_HANDLER_KIND.store(HANDLER_SIGACTION, Ordering::Release); } else { OLD_HANDLER_KIND.store(HANDLER_PLAIN, Ordering::Release); } } // Step 3: Install our handler. Atomics are already published, // so the handler can safely read them from the moment it's installed. let new_action = SigAction::new( SigHandler::SigAction(sigbus_handler), SaFlags::SA_SIGINFO, SigSet::empty(), ); let _ = unsafe { sigaction(Signal::SIGBUS, &new_action) }; }); } // ─── Core MmapReader ───────────────────────────────────────────────────────── /// Core mmap-based reader. Holds the memory mapping, file handle, and /// line index. Used as the engine inside each variant wrapper. pub struct MmapReader { mmap: Mmap, #[allow(dead_code)] file: File, line_index: LineIndex, file_size: u64, } impl MmapReader { /// Open a file and create an mmap. Does NOT apply madvise. /// The caller is responsible for applying the desired advice. fn open_raw(path: &Path) -> std::io::Result { let file = File::open(path)?; let file_size = file.metadata()?.len(); let mmap = unsafe { Mmap::map(&file)? }; let line_index = { let mut reader = BufReader::new(&file); LineIndex::from_reader(&mut reader)? }; Ok(Self { mmap, file, line_index, file_size, }) } /// Open with MmapOptions (for populate, etc.). fn open_with_options(path: &Path, opts: &MmapOptions) -> std::io::Result { let file = File::open(path)?; let file_size = file.metadata()?.len(); let mmap = if file_size == 0 { unsafe { Mmap::map(&file)? } } else { unsafe { opts.map(&file)? } }; let line_index = { let mut reader = BufReader::new(&file); LineIndex::from_reader(&mut reader)? }; Ok(Self { mmap, file, line_index, file_size, }) } /// Apply madvise to the entire mapping. fn advise(&self, advice: Advice) { let _ = self.mmap.advise(advice); } /// Grow the mmap to `new_size` bytes. /// /// # Safety /// Caller must ensure no `&[u8]` references to the old mmap data exist. /// After remap, any pointers derived from the old mapping may be invalid /// (if the kernel moved the mapping). pub unsafe fn remap(&mut self, new_size: usize) -> std::io::Result<()> { unsafe { self.mmap .remap(new_size, RemapOptions::new().may_move(true))?; } self.file_size = new_size as u64; Ok(()) } #[inline] pub fn file_size(&self) -> u64 { self.file_size } #[inline] pub fn total_lines(&self) -> usize { self.line_index.line_count() } #[inline] pub fn get_line(&self, idx: usize) -> Option { self.line_index .get_line(&self.mmap, idx) .map(|s| s.to_owned()) } #[inline] pub fn read_range(&self, offset: u64, len: usize) -> Option> { let start = offset as usize; let end = start.checked_add(len)?; if end > self.mmap.len() { return None; } Some(self.mmap[start..end].to_vec()) } } // ─── 5 Variants ────────────────────────────────────────────────────────────── /// Variant 1: Plain mmap, no madvise. pub struct MmapReaderPlain { inner: MmapReader, } impl MmapReaderPlain { pub fn remap(&mut self, new_size: usize) -> std::io::Result<()> { unsafe { self.inner.remap(new_size) } } } impl FileReaderBackend for MmapReaderPlain { fn name(&self) -> &str { "mmap_plain" } fn open(path: &Path) -> std::io::Result where Self: Sized, { install_sigbus_handler(); let inner = MmapReader::open_raw(path)?; Ok(Self { inner }) } fn file_size(&self) -> u64 { self.inner.file_size() } fn total_lines(&self) -> usize { self.inner.total_lines() } fn get_line(&self, idx: usize) -> Option { self.inner.get_line(idx) } fn read_range(&self, offset: u64, len: usize) -> Option> { self.inner.read_range(offset, len) } fn close(self) {} } /// Variant 2: mmap with MADV_SEQUENTIAL — optimal for sequential scan (index build). pub struct MmapReaderSequential { inner: MmapReader, } impl FileReaderBackend for MmapReaderSequential { fn name(&self) -> &str { "mmap_sequential" } fn open(path: &Path) -> std::io::Result where Self: Sized, { install_sigbus_handler(); let inner = MmapReader::open_raw(path)?; inner.advise(Advice::Sequential); Ok(Self { inner }) } fn file_size(&self) -> u64 { self.inner.file_size() } fn total_lines(&self) -> usize { self.inner.total_lines() } fn get_line(&self, idx: usize) -> Option { self.inner.get_line(idx) } fn read_range(&self, offset: u64, len: usize) -> Option> { self.inner.read_range(offset, len) } fn close(self) {} } /// Variant 3: mmap with MADV_RANDOM — optimal for random line access after index is built. pub struct MmapReaderRandom { inner: MmapReader, } impl FileReaderBackend for MmapReaderRandom { fn name(&self) -> &str { "mmap_random" } fn open(path: &Path) -> std::io::Result where Self: Sized, { install_sigbus_handler(); let inner = MmapReader::open_raw(path)?; inner.advise(Advice::Random); Ok(Self { inner }) } fn file_size(&self) -> u64 { self.inner.file_size() } fn total_lines(&self) -> usize { self.inner.total_lines() } fn get_line(&self, idx: usize) -> Option { self.inner.get_line(idx) } fn read_range(&self, offset: u64, len: usize) -> Option> { self.inner.read_range(offset, len) } fn close(self) {} } /// Variant 4: mmap with MAP_POPULATE — pre-fault all pages at mmap time. /// Trades higher upfront cost for smoother subsequent access. pub struct MmapReaderPopulate { inner: MmapReader, } impl FileReaderBackend for MmapReaderPopulate { fn name(&self) -> &str { "mmap_populate" } fn open(path: &Path) -> std::io::Result where Self: Sized, { install_sigbus_handler(); let mut opts = MmapOptions::new(); opts.populate(); let inner = MmapReader::open_with_options(path, &opts)?; Ok(Self { inner }) } fn file_size(&self) -> u64 { self.inner.file_size() } fn total_lines(&self) -> usize { self.inner.total_lines() } fn get_line(&self, idx: usize) -> Option { self.inner.get_line(idx) } fn read_range(&self, offset: u64, len: usize) -> Option> { self.inner.read_range(offset, len) } fn close(self) {} } /// Variant 5: Phase-aware — MADV_SEQUENTIAL during index build, then MADV_RANDOM /// for line access. Best of both worlds for the read-index-then-query pattern. pub struct MmapReaderPhaseAware { inner: MmapReader, } impl FileReaderBackend for MmapReaderPhaseAware { fn name(&self) -> &str { "mmap_phase_aware" } fn open(path: &Path) -> std::io::Result where Self: Sized, { install_sigbus_handler(); let inner = MmapReader::open_raw(path)?; // Index build used sequential streaming via BufReader. Now switch to random. inner.advise(Advice::Random); Ok(Self { inner }) } fn file_size(&self) -> u64 { self.inner.file_size() } fn total_lines(&self) -> usize { self.inner.total_lines() } fn get_line(&self, idx: usize) -> Option { self.inner.get_line(idx) } fn read_range(&self, offset: u64, len: usize) -> Option> { self.inner.read_range(offset, len) } fn close(self) {} } // ─── Tests ─────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use std::io::Write as _; use tempfile::NamedTempFile; fn create_temp_file(content: &[u8]) -> NamedTempFile { let mut f = NamedTempFile::new().unwrap(); f.write_all(content).unwrap(); f.flush().unwrap(); f } #[test] fn test_plain_open_and_read_lines() { let f = create_temp_file(b"hello\nworld\nfoo\n"); let reader = MmapReaderPlain::open(f.path()).unwrap(); assert_eq!(reader.name(), "mmap_plain"); assert_eq!(reader.total_lines(), 3); assert_eq!(reader.get_line(0), Some("hello".to_owned())); assert_eq!(reader.get_line(1), Some("world".to_owned())); assert_eq!(reader.get_line(2), Some("foo".to_owned())); assert_eq!(reader.get_line(3), None); reader.close(); } #[test] fn test_plain_out_of_bounds_returns_none() { let f = create_temp_file(b"line1\nline2\n"); let reader = MmapReaderPlain::open(f.path()).unwrap(); assert_eq!(reader.total_lines(), 2); assert_eq!(reader.get_line(100), None); assert_eq!(reader.get_line(usize::MAX), None); reader.close(); } #[test] fn test_plain_read_range() { let content = b"hello world\nfoo bar\n"; let f = create_temp_file(content); let reader = MmapReaderPlain::open(f.path()).unwrap(); let range = reader.read_range(0, 5).unwrap(); assert_eq!(&range, b"hello"); let range = reader.read_range(6, 5).unwrap(); assert_eq!(&range, b"world"); assert_eq!(reader.read_range(0, 1000), None); reader.close(); } #[test] fn test_plain_empty_file() { let f = create_temp_file(b""); let reader = MmapReaderPlain::open(f.path()).unwrap(); assert_eq!(reader.total_lines(), 0); assert_eq!(reader.file_size(), 0); assert_eq!(reader.get_line(0), None); reader.close(); } #[test] fn test_sequential_variant() { let f = create_temp_file(b"alpha\nbeta\ngamma\n"); let reader = MmapReaderSequential::open(f.path()).unwrap(); assert_eq!(reader.name(), "mmap_sequential"); assert_eq!(reader.total_lines(), 3); assert_eq!(reader.get_line(1), Some("beta".to_owned())); reader.close(); } #[test] fn test_random_variant() { let f = create_temp_file(b"one\ntwo\nthree\n"); let reader = MmapReaderRandom::open(f.path()).unwrap(); assert_eq!(reader.name(), "mmap_random"); assert_eq!(reader.total_lines(), 3); assert_eq!(reader.get_line(2), Some("three".to_owned())); reader.close(); } #[test] fn test_populate_variant() { let f = create_temp_file(b"x\ny\nz\n"); let reader = MmapReaderPopulate::open(f.path()).unwrap(); assert_eq!(reader.name(), "mmap_populate"); assert_eq!(reader.total_lines(), 3); assert_eq!(reader.get_line(0), Some("x".to_owned())); reader.close(); } #[test] fn test_phase_aware_variant() { let f = create_temp_file(b"a\nb\nc\n"); let reader = MmapReaderPhaseAware::open(f.path()).unwrap(); assert_eq!(reader.name(), "mmap_phase_aware"); assert_eq!(reader.total_lines(), 3); assert_eq!(reader.get_line(1), Some("b".to_owned())); reader.close(); } #[test] fn test_file_size() { let content = b"hello world"; let f = create_temp_file(content); let reader = MmapReaderPlain::open(f.path()).unwrap(); assert_eq!(reader.file_size(), content.len() as u64); reader.close(); } #[test] fn test_sigbus_flag_api() { reset_sigbus_flag(); assert!(!sigbus_flag()); } #[test] fn test_concurrent_open_installs_handler_once() { let f = create_temp_file(b"line1\nline2\nline3\n"); let path = f.path().to_owned(); let num_threads = 8; let handles: Vec<_> = (0..num_threads) .map(|thread_id| { let path = path.clone(); std::thread::spawn(move || { let reader = match thread_id % 5 { 0 => { let r = MmapReaderPlain::open(&path).unwrap(); assert_eq!(r.total_lines(), 3); r.close(); "plain" } 1 => { let r = MmapReaderSequential::open(&path).unwrap(); assert_eq!(r.total_lines(), 3); r.close(); "sequential" } 2 => { let r = MmapReaderRandom::open(&path).unwrap(); assert_eq!(r.total_lines(), 3); r.close(); "random" } 3 => { let r = MmapReaderPopulate::open(&path).unwrap(); assert_eq!(r.total_lines(), 3); r.close(); "populate" } _ => { let r = MmapReaderPhaseAware::open(&path).unwrap(); assert_eq!(r.total_lines(), 3); r.close(); "phase_aware" } }; reader.to_owned() }) }) .collect(); let results: Vec = handles .into_iter() .map(|h| h.join().expect("thread panicked")) .collect(); assert_eq!(results.len(), num_threads); } #[test] fn test_no_trailing_newline() { let f = create_temp_file(b"line1\nline2"); let reader = MmapReaderPlain::open(f.path()).unwrap(); assert_eq!(reader.total_lines(), 2); assert_eq!(reader.get_line(0), Some("line1".to_owned())); assert_eq!(reader.get_line(1), Some("line2".to_owned())); reader.close(); } /// Diagnostic test: measure how long each stage of `open_raw()` takes /// on a large file (e.g. the 5GB extreme.log). /// Run with: cargo test -p log-viewer-bench --release -- --nocapture diag_open_stages #[test] fn diag_open_stages() { let path = std::path::Path::new("/tmp/test-logviewer/extreme.log"); if !path.exists() { eprintln!("SKIP: test file not found"); return; } // Stage 1: File::open + metadata let t0 = std::time::Instant::now(); let file = File::open(path).unwrap(); let file_size = file.metadata().unwrap().len(); let stage1 = t0.elapsed(); eprintln!( "Stage 1 - File::open + metadata: {:.2}ms ({:.1}GB)", stage1.as_secs_f64() * 1000.0, file_size as f64 / 1073741824.0 ); // Stage 2: mmap::map let t1 = std::time::Instant::now(); let _mmap = unsafe { Mmap::map(&file) }.unwrap(); let stage2 = t1.elapsed(); eprintln!( "Stage 2 - mmap::map: {:.2}ms", stage2.as_secs_f64() * 1000.0 ); drop(_mmap); // Stage 3: LineIndex::from_reader via BufReader (default 8KB buffer) let t2 = std::time::Instant::now(); let mut reader = BufReader::new(&file); let line_index = crate::line_index::LineIndex::from_reader(&mut reader).unwrap(); let stage3 = t2.elapsed(); eprintln!( "Stage 3 - LineIndex::from_reader: {:.2}ms ({} lines, {} sampled_offsets)", stage3.as_secs_f64() * 1000.0, line_index.line_count(), line_index.sampled_offsets.len() ); // Stage 3b: Try with 1MB buffer let file2 = File::open(path).unwrap(); let t2b = std::time::Instant::now(); let mut reader2 = BufReader::with_capacity(1024 * 1024, &file2); let line_index2 = crate::line_index::LineIndex::from_reader(&mut reader2).unwrap(); let stage3b = t2b.elapsed(); eprintln!( "Stage 3b - LineIndex (1MB buffer): {:.2}ms", stage3b.as_secs_f64() * 1000.0 ); eprintln!("\n=== 瓶颈分析 ==="); let total_ms = stage1.as_secs_f64() * 1000.0 + stage2.as_secs_f64() * 1000.0 + stage3.as_secs_f64() * 1000.0; let total_dur = stage1 + stage2 + stage3; eprintln!("Total ~{:.0}ms", total_ms); eprintln!( " File::open: {:.1}% ({:.0}ms)", stage1.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, stage1.as_secs_f64() * 1000.0 ); eprintln!( " mmap::map: {:.1}% ({:.0}ms)", stage2.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, stage2.as_secs_f64() * 1000.0 ); eprintln!( " from_reader: {:.1}% ({:.0}ms)", stage3.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, stage3.as_secs_f64() * 1000.0 ); // Suppress unused warnings let _ = line_index2; } }