From dad5f5a635c436add70938fdb6aa132d043b132e Mon Sep 17 00:00:00 2001 From: dailz Date: Fri, 5 Jun 2026 13:22:02 +0800 Subject: [PATCH] fix(bench): eliminate SIGBUS handler static mut UB with Once + raw atomics (closes #33) Replace `static mut OLD_SIGBUS_HANDLER` with AtomicU8 + AtomicPtr to remove data race UB when concurrent benchmarks call open() from multiple threads. Key changes: - Use `Once::call_once` to guarantee single handler installation - Publish old handler to atomics BEFORE installing new handler (closes the handler-active-but-state-unpublished race window) - Read atomics with Acquire in signal handler (async-signal-safe) - Align si_addr to page boundary before mmap(MAP_FIXED) - Add concurrent test: 8 threads open all 5 variants simultaneously --- crates/bench/Cargo.toml | 18 + crates/bench/src/data_gen.rs | 219 ++++++++ crates/bench/src/lib.rs | 24 + crates/bench/src/line_index.rs | 110 ++++ crates/bench/src/main.rs | 68 +++ crates/bench/src/metrics.rs | 280 ++++++++++ crates/bench/src/mmap_reader.rs | 752 ++++++++++++++++++++++++++ crates/bench/src/pread_reader.rs | 454 ++++++++++++++++ crates/bench/src/report.rs | 207 +++++++ crates/bench/src/runner.rs | 59 ++ crates/bench/src/suites/concurrent.rs | 117 ++++ crates/bench/src/suites/growth.rs | 269 +++++++++ crates/bench/src/suites/jump.rs | 246 +++++++++ crates/bench/src/suites/memory.rs | 233 ++++++++ crates/bench/src/suites/mod.rs | 7 + crates/bench/src/suites/render.rs | 177 ++++++ crates/bench/src/suites/rotation.rs | 180 ++++++ crates/bench/src/suites/startup.rs | 129 +++++ crates/bench/src/types.rs | 13 + 19 files changed, 3562 insertions(+) create mode 100644 crates/bench/Cargo.toml create mode 100644 crates/bench/src/data_gen.rs create mode 100644 crates/bench/src/lib.rs create mode 100644 crates/bench/src/line_index.rs create mode 100644 crates/bench/src/main.rs create mode 100644 crates/bench/src/metrics.rs create mode 100644 crates/bench/src/mmap_reader.rs create mode 100644 crates/bench/src/pread_reader.rs create mode 100644 crates/bench/src/report.rs create mode 100644 crates/bench/src/runner.rs create mode 100644 crates/bench/src/suites/concurrent.rs create mode 100644 crates/bench/src/suites/growth.rs create mode 100644 crates/bench/src/suites/jump.rs create mode 100644 crates/bench/src/suites/memory.rs create mode 100644 crates/bench/src/suites/mod.rs create mode 100644 crates/bench/src/suites/render.rs create mode 100644 crates/bench/src/suites/rotation.rs create mode 100644 crates/bench/src/suites/startup.rs create mode 100644 crates/bench/src/types.rs diff --git a/crates/bench/Cargo.toml b/crates/bench/Cargo.toml new file mode 100644 index 0000000..05f22c4 --- /dev/null +++ b/crates/bench/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "log-viewer-bench" +version = "0.1.0" +edition = "2024" + +[[bin]] +name = "log-viewer-bench" +path = "src/main.rs" + +[dependencies] +memmap2 = "0.9" +nix = { version = "0.30", features = ["signal", "resource", "mman", "fs"] } +libc = "0.2" +memchr = "2" +serde_json = "1" +clap = { version = "4", features = ["derive"] } +crossbeam-channel = "0.5" +tempfile = "3" diff --git a/crates/bench/src/data_gen.rs b/crates/bench/src/data_gen.rs new file mode 100644 index 0000000..a05f696 --- /dev/null +++ b/crates/bench/src/data_gen.rs @@ -0,0 +1,219 @@ +use std::fs; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; + +pub struct TestFileInfo { + pub path: PathBuf, + pub size_bytes: u64, + pub line_count: u64, + pub avg_line_length: f64, +} + +/// Check if test file exists and return its info, or generate it +pub fn ensure_test_file(path: &Path) -> std::io::Result { + if path.exists() { + return get_file_info(path); + } + generate_test_file(path) +} + +/// Get info about an existing test file +fn get_file_info(path: &Path) -> std::io::Result { + let metadata = fs::metadata(path)?; + let size_bytes = metadata.len(); + + let file = fs::File::open(path)?; + let mut reader = BufReader::new(file); + let mut line_count: u64 = 0; + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf)? > 0 { + line_count += 1; + buf.clear(); + } + + let avg_line_length = if line_count > 0 { + size_bytes as f64 / line_count as f64 + } else { + 0.0 + }; + + Ok(TestFileInfo { + path: path.to_path_buf(), + size_bytes, + line_count, + avg_line_length, + }) +} + +/// Generate a large test file (~5GB / ~74M lines) if it doesn't exist +fn generate_test_file(path: &Path) -> std::io::Result { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + + let mut file = fs::File::create(path)?; + let target_lines: u64 = 74_000_000; + for i in 0..target_lines { + writeln!( + file, + "2024-01-15 10:30:{:02} INFO [thread-{}] Application processing request id={} user_id={}", + i % 60, + i % 16, + i, + i * 7 + )?; + } + + get_file_info(path) +} + +/// Generate a smaller file (~10MB / ~150K lines) for growth/rotation tests +pub fn generate_growable_file(dir: &Path) -> std::io::Result { + fs::create_dir_all(dir)?; + let path = dir.join("growable.log"); + + let mut file = fs::File::create(&path)?; + for i in 0..150_000u64 { + writeln!( + file, + "2024-01-15 10:30:{:02} INFO [thread-{}] Appending test line {}", + i % 60, + i % 16, + i + )?; + } + + Ok(path) +} + +/// Append `count` lines to the file +pub fn append_lines(path: &Path, count: usize) -> std::io::Result<()> { + let mut file = fs::OpenOptions::new().append(true).open(path)?; + let existing_lines = count_existing_lines(path).unwrap_or(0); + for i in 0..count { + writeln!( + file, + "2024-01-15 10:30:00 INFO Appended line {}", + existing_lines + i as u64 + )?; + } + Ok(()) +} + +/// Truncate file to specified size +pub fn truncate_file(path: &Path, size: u64) -> std::io::Result<()> { + let file = fs::OpenOptions::new().write(true).open(path)?; + file.set_len(size) +} + +/// Rotate file: rename existing file, create new empty file +pub fn rotate_file(path: &Path) -> std::io::Result { + let rotated = path.with_extension("log.1"); + fs::rename(path, &rotated)?; + fs::File::create(path)?; + Ok(rotated) +} + +/// Count lines in a file (helper) +fn count_existing_lines(path: &Path) -> std::io::Result { + let file = fs::File::open(path)?; + let reader = BufReader::new(file); + Ok(reader.lines().count() as u64) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_growable_file_creates_approximately_correct_size() { + let dir = tempfile::tempdir().unwrap(); + let path = generate_growable_file(dir.path()).unwrap(); + + assert!(path.exists()); + let metadata = fs::metadata(&path).unwrap(); + let size_mb = metadata.len() as f64 / (1024.0 * 1024.0); + // ~150K lines × ~67 bytes ≈ ~10MB; allow 5MB–15MB range + assert!( + (5.0..=15.0).contains(&size_mb), + "Expected ~10MB, got {size_mb:.1}MB" + ); + } + + #[test] + fn test_append_lines_increases_line_count() { + let dir = tempfile::tempdir().unwrap(); + let path = { + let mut f = fs::File::create(dir.path().join("test.log")).unwrap(); + for i in 0..10u64 { + writeln!(f, "line {i}").unwrap(); + } + dir.path().join("test.log") + }; + + let before = count_existing_lines(&path).unwrap(); + append_lines(&path, 5).unwrap(); + let after = count_existing_lines(&path).unwrap(); + assert_eq!(after, before + 5); + } + + #[test] + fn test_truncate_file_reduces_size() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("trunc.log"); + { + let mut f = fs::File::create(&path).unwrap(); + write!(f, "{}", "A".repeat(1024)).unwrap(); + } + + let before = fs::metadata(&path).unwrap().len(); + assert_eq!(before, 1024); + + truncate_file(&path, 512).unwrap(); + let after = fs::metadata(&path).unwrap().len(); + assert_eq!(after, 512); + } + + #[test] + fn test_rotate_file_renames_and_creates_empty() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("rotate.log"); + { + let mut f = fs::File::create(&path).unwrap(); + write!(f, "original content").unwrap(); + } + + let rotated = rotate_file(&path).unwrap(); + + // Rotated file has the old content + assert!(rotated.exists()); + assert_eq!(fs::read_to_string(&rotated).unwrap(), "original content"); + + // New file is empty + assert!(path.exists()); + assert_eq!(fs::metadata(&path).unwrap().len(), 0); + } + + #[test] + fn test_ensure_test_file_generates_when_missing() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("fresh.log"); + assert!(!path.exists()); + + // Override the generator to use a small file for test speed: + // We'll test ensure_test_file indirectly by checking it calls generate_test_file. + // Since generate_test_file creates 74M lines (too slow for tests), test the logic + // by directly creating a small file and checking get_file_info works. + { + let mut f = fs::File::create(&path).unwrap(); + for i in 0..100u64 { + writeln!(f, "2024-01-15 10:30:00 INFO line {i}").unwrap(); + } + } + + let info = ensure_test_file(&path).unwrap(); + assert_eq!(info.line_count, 100); + assert!(info.size_bytes > 0); + assert!(info.avg_line_length > 0.0); + } +} diff --git a/crates/bench/src/lib.rs b/crates/bench/src/lib.rs new file mode 100644 index 0000000..bbe51b1 --- /dev/null +++ b/crates/bench/src/lib.rs @@ -0,0 +1,24 @@ +pub mod data_gen; +pub mod line_index; +pub mod metrics; +pub mod mmap_reader; +pub mod pread_reader; +pub mod report; +pub mod runner; +pub mod suites; +pub mod types; + +use std::path::Path; + +/// A single reader backend (mmap or pread) +pub trait FileReaderBackend { + fn name(&self) -> &str; + fn open(path: &Path) -> std::io::Result + where + Self: Sized; + fn file_size(&self) -> u64; + fn total_lines(&self) -> usize; + fn get_line(&self, idx: usize) -> Option; + fn read_range(&self, offset: u64, len: usize) -> Option>; + fn close(self); +} diff --git a/crates/bench/src/line_index.rs b/crates/bench/src/line_index.rs new file mode 100644 index 0000000..2b8a3a8 --- /dev/null +++ b/crates/bench/src/line_index.rs @@ -0,0 +1,110 @@ +// ─── line_index.rs ─────────────────────────────────────────────────────────── +// Vendored from crates/core/src/io/line_index.rs +// Sparse line index: sample every 256 lines to reduce memory usage. +// ────────────────────────────────────────────────────────────────────────────── + +const BLOCK_SIZE: usize = 256; + +pub struct LineIndex { + pub(crate) sampled_offsets: Vec, + pub(crate) total_lines: u64, + #[allow(dead_code)] + pub(crate) has_trailing_newline: bool, +} + +impl LineIndex { + /// Build sparse line index from a streaming reader. + /// Uses fill_buf()/consume() to avoid loading the entire file into memory. + /// RSS stays at ~64KB (BufReader buffer size), independent of file size. + pub fn from_reader(reader: &mut impl std::io::BufRead) -> std::io::Result { + let mut sampled_offsets: Vec = vec![0]; // line 0 starts at offset 0 + let mut next_line_idx: usize = 1; + let mut newline_count: usize = 0; + let mut chunk_offset: u64 = 0; + let mut last_byte: Option = None; + + loop { + let buf = reader.fill_buf()?; + if buf.is_empty() { + break; + } + + if let Some(&b) = buf.last() { + last_byte = Some(b); + } + + for pos in memchr::memchr_iter(b'\n', buf) { + newline_count += 1; + if next_line_idx.is_multiple_of(BLOCK_SIZE) { + sampled_offsets.push(chunk_offset + pos as u64 + 1); + } + next_line_idx += 1; + } + + let consumed = buf.len(); + chunk_offset += consumed as u64; + reader.consume(consumed); + } + + // Empty file: no data at all + if chunk_offset == 0 { + return Ok(LineIndex { + sampled_offsets: vec![], + total_lines: 0, + has_trailing_newline: false, + }); + } + + let has_trailing_newline = last_byte == Some(b'\n') && newline_count > 0; + + let total_lines: u64 = if has_trailing_newline && newline_count > 0 { + newline_count as u64 + } else { + (1 + newline_count) as u64 + }; + + // Trailing \n pop logic + if has_trailing_newline && newline_count > 0 { + let trailing_line_idx = newline_count; + if trailing_line_idx.is_multiple_of(BLOCK_SIZE) { + sampled_offsets.pop(); + } + } + + Ok(LineIndex { + sampled_offsets, + total_lines, + has_trailing_newline, + }) + } + + /// Return total line count. + pub fn line_count(&self) -> usize { + self.total_lines as usize + } + + /// Retrieve the content of line `idx` from the given data slice. + /// Uses sparse index to locate the block start, then scans forward + /// a small number of newlines to find the target line. + pub fn get_line<'a>(&self, data: &'a [u8], idx: usize) -> Option<&'a str> { + if idx >= self.total_lines as usize || data.is_empty() { + return None; + } + let block = idx / BLOCK_SIZE; + let offset_in_block = idx % BLOCK_SIZE; + let mut pos = self.sampled_offsets[block] as usize; + for _ in 0..offset_in_block { + match memchr::memchr(b'\n', &data[pos..]) { + Some(rel) => pos = pos + rel + 1, + None => return None, + } + } + let end = memchr::memchr(b'\n', &data[pos..]) + .map(|rel| pos + rel) + .unwrap_or(data.len()); + let line_bytes = &data[pos..end]; + std::str::from_utf8(line_bytes) + .map(|s| s.trim_end_matches(['\r', '\n'])) + .ok() + } +} diff --git a/crates/bench/src/main.rs b/crates/bench/src/main.rs new file mode 100644 index 0000000..1155d04 --- /dev/null +++ b/crates/bench/src/main.rs @@ -0,0 +1,68 @@ +use clap::Parser; +use std::path::PathBuf; + +/// Benchmark: mmap vs pread for large file reading +#[derive(Parser, Debug)] +#[command(name = "log-viewer-bench", version, about = "Benchmark mmap vs pread")] +struct Args { + /// Path to the test file (default: /tmp/test-logviewer/extreme.log) + #[arg(default_value = "/tmp/test-logviewer/extreme.log")] + test_file: PathBuf, + + /// Quick mode: use smaller iterations and skip cold cache tests + #[arg(long)] + quick: bool, + + /// Output report path (default: benchmark-report.md) + #[arg(long, default_value = "benchmark-report.md")] + output: PathBuf, + + /// Only run specified suites (comma-separated: startup,render,jump,memory,growth,rotation,concurrent) + #[arg(long, value_delimiter = ',')] + suites: Option>, +} + +fn main() { + let args = Args::parse(); + + println!("=== Benchmark: mmap vs pread ==="); + println!("Test file: {}", args.test_file.display()); + println!("Quick mode: {}", args.quick); + println!(); + + let config = log_viewer_bench::runner::BenchConfig { + test_file: args.test_file.clone(), + quick_mode: args.quick, + suites: args.suites, + }; + + if !config.test_file.exists() { + eprintln!("ERROR: Test file not found: {}", config.test_file.display()); + eprintln!( + "Generate one with: dd if=/dev/urandom of=/tmp/test-logviewer/extreme.log bs=1M count=5000" + ); + std::process::exit(1); + } + + if !log_viewer_bench::metrics::MetricsCollector::can_reset_vm_hwm() { + eprintln!("WARNING: VmHWM reset unavailable (no root). Memory peak values may be contaminated across tests."); + } + + println!("Running benchmarks..."); + let results = log_viewer_bench::runner::run_all(&config); + println!("Completed {} benchmarks.\n", results.len()); + + let report = log_viewer_bench::report::format_report(&results); + + println!("{}", report); + + if let Err(e) = std::fs::write(&args.output, &report) { + eprintln!( + "WARNING: Failed to save report to {}: {}", + args.output.display(), + e + ); + } else { + eprintln!("Report saved to {}", args.output.display()); + } +} diff --git a/crates/bench/src/metrics.rs b/crates/bench/src/metrics.rs new file mode 100644 index 0000000..f51d9ca --- /dev/null +++ b/crates/bench/src/metrics.rs @@ -0,0 +1,280 @@ +use std::fs::{self, File}; +use std::os::unix::fs::MetadataExt; +use std::os::unix::io::AsRawFd; +use std::path::Path; + +pub struct RssMetrics { + pub vm_rss_kb: u64, + pub vm_hwm_kb: u64, +} + +pub struct PageFaultMetrics { + pub minor_faults: u64, + pub major_faults: u64, +} + +pub struct MetricsCollector; + +impl MetricsCollector { + /// Read VmRSS and VmHWM from /proc/self/status + pub fn read_rss() -> RssMetrics { + let status = fs::read_to_string("/proc/self/status").unwrap_or_default(); + let mut vm_rss_kb: u64 = 0; + let mut vm_hwm_kb: u64 = 0; + for line in status.lines() { + if line.starts_with("VmRSS:") { + vm_rss_kb = parse_kb_value(line); + } else if line.starts_with("VmHWM:") { + vm_hwm_kb = parse_kb_value(line); + } + } + RssMetrics { + vm_rss_kb, + vm_hwm_kb, + } + } + + /// Read page fault counts from getrusage + pub fn read_page_faults() -> PageFaultMetrics { + let usage = + nix::sys::resource::getrusage(nix::sys::resource::UsageWho::RUSAGE_SELF).unwrap(); + PageFaultMetrics { + // getrusage() returns c_long (i64 on 64-bit Linux) — explicit as u64 conversion + minor_faults: usage.minor_page_faults() as u64, + major_faults: usage.major_page_faults() as u64, + } + } + + /// Clear page cache (requires root: sync + drop_caches) + /// Falls back to doing nothing if no permission + pub fn clear_page_cache() -> std::io::Result<()> { + let _ = std::process::Command::new("sync").status(); + fs::write("/proc/sys/vm/drop_caches", "1") + } + + /// Clear file cache using posix_fadvise(DONTNEED) — no root required + pub fn clear_file_cache(path: &Path) -> std::io::Result<()> { + let file = File::open(path)?; + let len = file.metadata()?.len(); + let ret = unsafe { + libc::posix_fadvise(file.as_raw_fd(), 0, len as i64, libc::POSIX_FADV_DONTNEED) + }; + // posix_fadvise returns error code directly (not errno), 0 = success + if ret != 0 { + return Err(std::io::Error::from_raw_os_error(ret)); + } + Ok(()) + } + + /// Reset VmHWM by writing to /proc/self/clear_refs (requires root) + pub fn reset_vm_hwm() -> std::io::Result<()> { + fs::write("/proc/self/clear_refs", "5").map_err(|e| { + if e.kind() == std::io::ErrorKind::PermissionDenied { + std::io::Error::new( + std::io::ErrorKind::PermissionDenied, + "VmHWM reset requires root (can't write /proc/self/clear_refs)", + ) + } else { + e + } + }) + } + + /// Check if we can reset VmHWM (i.e., have root) + pub fn can_reset_vm_hwm() -> bool { + fs::write("/proc/self/clear_refs", "5").is_ok() + } + + /// Get file inode number + pub fn get_inode(path: &Path) -> std::io::Result { + let meta = fs::metadata(path)?; + Ok(meta.ino()) + } + + /// Check if file was rotated (inode changed) + pub fn detect_rotation(original_inode: u64, path: &Path) -> bool { + Self::get_inode(path) + .map(|ino| ino != original_inode) + .unwrap_or(true) + } +} + +fn parse_kb_value(line: &str) -> u64 { + // Format: "VmRSS: 12345 kB" + line.split_whitespace() + .nth(1) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0) +} + +pub fn mean(data: &[u64]) -> f64 { + if data.is_empty() { + return 0.0; + } + data.iter().sum::() as f64 / data.len() as f64 +} + +/// Percentile of data at given fraction (0.0–1.0). Returns from a sorted copy. +pub fn percentile(data: &[u64], p: f64) -> u64 { + if data.is_empty() { + return 0; + } + let mut sorted: Vec = data.to_vec(); + sorted.sort_unstable(); + let idx = ((p * (sorted.len() - 1) as f64).round()) as usize; + sorted[idx.min(sorted.len() - 1)] +} + +pub fn stdev(data: &[u64]) -> f64 { + if data.len() < 2 { + return 0.0; + } + let m = mean(data); + let variance: f64 = data + .iter() + .map(|&v| { + let d = v as f64 - m; + d * d + }) + .sum::() + / (data.len() - 1) as f64; + variance.sqrt() +} + +pub fn p50(data: &[u64]) -> u64 { + percentile(data, 0.50) +} + +pub fn p95(data: &[u64]) -> u64 { + percentile(data, 0.95) +} + +pub fn p99(data: &[u64]) -> u64 { + percentile(data, 0.99) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rss_returns_values() { + let rss = MetricsCollector::read_rss(); + assert!( + rss.vm_rss_kb > 0, + "VmRSS should be non-zero for a running process" + ); + assert!( + rss.vm_hwm_kb > 0, + "VmHWM should be non-zero for a running process" + ); + } + + #[test] + fn test_page_faults_returns_values() { + let faults = MetricsCollector::read_page_faults(); + assert!( + faults.minor_faults > 0, + "Should have some minor page faults" + ); + } + + #[test] + fn test_mean() { + let data = vec![100, 200, 300, 400, 500]; + let result = mean(&data); + assert!( + (result - 300.0).abs() < f64::EPSILON, + "mean should be 300.0, got {result}" + ); + } + + #[test] + fn test_mean_empty() { + assert_eq!(mean(&[]), 0.0); + } + + #[test] + fn test_percentile_p50() { + let data = vec![100, 200, 300, 400, 500]; + assert_eq!(percentile(&data, 0.50), 300); + } + + #[test] + fn test_percentile_p99() { + let data = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100]; + let p99_result = percentile(&data, 0.99); + assert!(p99_result >= 90, "P99 should be near max, got {p99_result}"); + } + + #[test] + fn test_percentile_empty() { + assert_eq!(percentile(&[], 0.5), 0); + } + + #[test] + fn test_stdev() { + let data = vec![100, 200, 300, 400, 500]; + let s = stdev(&data); + assert!(s > 100.0, "stdev should be significant, got {s}"); + assert!(s < 200.0, "stdev should be < 200, got {s}"); + } + + #[test] + fn test_stdev_single() { + assert_eq!(stdev(&[42]), 0.0); + assert_eq!(stdev(&[]), 0.0); + } + + #[test] + fn test_parse_kb_value() { + assert_eq!(parse_kb_value("VmRSS: 12345 kB"), 12345); + assert_eq!(parse_kb_value("VmHWM:\t2048 kB"), 2048); + assert_eq!(parse_kb_value("VmRSS: 0 kB"), 0); + } + + #[test] + fn test_parse_kb_value_malformed() { + assert_eq!(parse_kb_value("VmRSS: NaN kB"), 0); + assert_eq!(parse_kb_value("garbage"), 0); + } + + #[test] + fn test_convenience_percentiles() { + let data = vec![10, 20, 30, 40, 50]; + assert_eq!(p50(&data), 30); + assert_eq!(p95(&data), 50); + assert_eq!(p99(&data), 50); + } + + #[test] + fn test_inode_for_existing_file() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let inode = MetricsCollector::get_inode(tmp.path()).unwrap(); + assert!(inode > 0, "inode should be non-zero"); + } + + #[test] + fn test_detect_rotation_no_rotation() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let inode = MetricsCollector::get_inode(tmp.path()).unwrap(); + assert!(!MetricsCollector::detect_rotation(inode, tmp.path())); + } + + #[test] + fn test_detect_rotation_file_removed() { + let inode: u64 = 99999; + let result = MetricsCollector::detect_rotation(inode, Path::new("/no/such/file")); + assert!(result, "missing file should indicate rotation"); + } + + #[test] + fn test_clear_file_cache() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let result = MetricsCollector::clear_file_cache(tmp.path()); + assert!( + result.is_ok(), + "clear_file_cache should succeed on temp file: {result:?}" + ); + } +} diff --git a/crates/bench/src/mmap_reader.rs b/crates/bench/src/mmap_reader.rs new file mode 100644 index 0000000..6b2af7e --- /dev/null +++ b/crates/bench/src/mmap_reader.rs @@ -0,0 +1,752 @@ +// ─── mmap_reader.rs ────────────────────────────────────────────────────────── +// mmap-based FileReaderBackend implementations with 5 variants for benchmarking. +// Includes SIGBUS handler for file-truncation resilience and remap support +// for growing-file scenarios. +// ────────────────────────────────────────────────────────────────────────────── + +use std::fs::File; +use std::io::BufReader; +use std::path::Path; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, Ordering}; +use std::sync::Once; + +use memmap2::{Advice, Mmap, MmapOptions, RemapOptions}; +use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal}; + +use crate::line_index::LineIndex; +use crate::FileReaderBackend; + +// ─── SIGBUS Handler ────────────────────────────────────────────────────────── +// +// Signal-safety architecture: +// - Old handler state is stored in raw atomics (AtomicU8 + AtomicPtr) that are +// async-signal-safe to read — no OnceLock, Mutex, or other non-trivial abstractions +// in the signal handler path. +// - Installation uses `Once` for idempotency and follows a strict publish-then-install +// sequence to close the "handler-active-but-state-unpublished" race window. + +/// Global flag set by the SIGBUS handler when a bus error is intercepted. +/// Process-global: concurrent benchmarks that reset/check this flag may interfere. +/// Currently acceptable because only `rotation` suite (sequential) uses it. +static SIGBUS_OCCURRED: AtomicBool = AtomicBool::new(false); + +/// Discriminant values for old SIGBUS handler type. +const HANDLER_NONE: u8 = 0; +const HANDLER_DEFAULT: u8 = 1; +const HANDLER_IGNORE: u8 = 2; +const HANDLER_PLAIN: u8 = 3; // extern "C" fn(c_int) +#[expect(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")] +const HANDLER_SIGACTION: u8 = 4; // extern "C" fn(c_int, *mut siginfo_t, *mut c_void) + +/// Old SIGBUS handler type — raw atomic, async-signal-safe to read. +static OLD_HANDLER_KIND: AtomicU8 = AtomicU8::new(HANDLER_NONE); + +/// Old SIGBUS handler function pointer — raw atomic, async-signal-safe to read. +static OLD_HANDLER_PTR: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); + +/// Ensures `install_sigbus_handler` runs exactly once across all threads. +static INSTALL_ONCE: Once = Once::new(); + +/// Returns `true` if a SIGBUS was intercepted since the last reset. +pub fn sigbus_flag() -> bool { + SIGBUS_OCCURRED.load(Ordering::SeqCst) +} + +/// Resets the SIGBUS flag. Call before operations where you want to detect +/// a fresh SIGBUS. +pub fn reset_sigbus_flag() { + SIGBUS_OCCURRED.store(false, Ordering::SeqCst) +} + +/// SIGBUS signal handler. +/// +/// # Safety Constraints (signal handler context) +/// - NO TLS access +/// - NO memory allocation +/// - NO lock acquisition +/// - Only: AtomicBool store → mmap → raw atomic loads → chain +extern "C" fn sigbus_handler( + sig: libc::c_int, + info: *mut libc::siginfo_t, + ctx: *mut std::ffi::c_void, +) { + SIGBUS_OCCURRED.store(true, Ordering::SeqCst); + + // Align fault address down to page boundary. + // si_addr is NOT guaranteed to be page-aligned; without this, mmap(MAP_FIXED) + // could replace the wrong page. + let addr = unsafe { (*info).si_addr() }; + let aligned = (addr as usize & !0xFFF) as *mut libc::c_void; + + // Map anonymous zero page at fault address to prevent crash on file truncation. + let result = unsafe { + libc::mmap( + aligned, + 4096, + libc::PROT_READ, + libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_FIXED, + -1, + 0, + ) + }; + if result == libc::MAP_FAILED { + // Chain to old handler via raw atomics (async-signal-safe). + let kind = OLD_HANDLER_KIND.load(Ordering::Acquire); + match kind { + HANDLER_PLAIN => { + let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire); + if !ptr.is_null() { + // SAFETY: ptr was derived from a valid function pointer + // published by install_sigbus_handler before this handler was installed. + let f: extern "C" fn(libc::c_int) = unsafe { std::mem::transmute(ptr) }; + f(sig); + } else { + unsafe { libc::_exit(128 + sig) }; + } + } + HANDLER_SIGACTION => { + let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire); + if !ptr.is_null() { + let f: extern "C" fn( + libc::c_int, + *mut libc::siginfo_t, + *mut std::ffi::c_void, + ) = unsafe { std::mem::transmute(ptr) }; + f(sig, info, ctx); + } else { + unsafe { libc::_exit(128 + sig) }; + } + } + _ => { + // HANDLER_NONE / HANDLER_DEFAULT / HANDLER_IGNORE — no safe chaining. + unsafe { libc::_exit(128 + sig) }; + } + } + } +} + +/// Install the SIGBUS handler via sigaction(). Chains to any previous handler. +/// Uses `Once` to guarantee exactly-once installation — safe to call from +/// multiple threads concurrently. +/// +/// The install sequence closes the "install-then-publish" race by: +/// 1. Querying the current handler via raw `libc::sigaction` (no modification). +/// 2. Publishing old handler state to raw atomics (`Release` stores). +/// 3. Installing our handler via `nix::sigaction`. +/// +/// This ensures the atomics are readable *before* our handler can fire. +fn install_sigbus_handler() { + INSTALL_ONCE.call_once(|| { + // Step 1: Query current SIGBUS disposition without changing it. + let mut old_act: libc::sigaction = unsafe { std::mem::zeroed() }; + let ret = unsafe { libc::sigaction(libc::SIGBUS, std::ptr::null(), &mut old_act) }; + if ret != 0 { + // Failed to query — skip installation entirely. + return; + } + + // Step 2: Publish old handler to atomics BEFORE installing ours. + let is_siginfo = (old_act.sa_flags & libc::SA_SIGINFO) != 0; + + let raw_usize = old_act.sa_sigaction as usize; + + if raw_usize == 0 { + // SIG_DFL (null function pointer → default disposition) + OLD_HANDLER_KIND.store(HANDLER_DEFAULT, Ordering::Release); + } else if raw_usize == 1 { + // SIG_IGN (sentinel value 1 → ignore disposition) + OLD_HANDLER_KIND.store(HANDLER_IGNORE, Ordering::Release); + } else { + // Real handler function pointer + OLD_HANDLER_PTR.store(raw_usize as *mut std::ffi::c_void, Ordering::Release); + if is_siginfo { + OLD_HANDLER_KIND.store(HANDLER_SIGACTION, Ordering::Release); + } else { + OLD_HANDLER_KIND.store(HANDLER_PLAIN, Ordering::Release); + } + } + + // Step 3: Install our handler. Atomics are already published, + // so the handler can safely read them from the moment it's installed. + let new_action = SigAction::new( + SigHandler::SigAction(sigbus_handler), + SaFlags::SA_SIGINFO, + SigSet::empty(), + ); + let _ = unsafe { sigaction(Signal::SIGBUS, &new_action) }; + }); +} + +// ─── Core MmapReader ───────────────────────────────────────────────────────── + +/// Core mmap-based reader. Holds the memory mapping, file handle, and +/// line index. Used as the engine inside each variant wrapper. +pub struct MmapReader { + mmap: Mmap, + #[allow(dead_code)] + file: File, + line_index: LineIndex, + file_size: u64, +} + +impl MmapReader { + /// Open a file and create an mmap. Does NOT apply madvise. + /// The caller is responsible for applying the desired advice. + fn open_raw(path: &Path) -> std::io::Result { + let file = File::open(path)?; + let file_size = file.metadata()?.len(); + + let mmap = unsafe { Mmap::map(&file)? }; + + let line_index = { + let mut reader = BufReader::new(&file); + LineIndex::from_reader(&mut reader)? + }; + + Ok(Self { + mmap, + file, + line_index, + file_size, + }) + } + + /// Open with MmapOptions (for populate, etc.). + fn open_with_options(path: &Path, opts: &MmapOptions) -> std::io::Result { + let file = File::open(path)?; + let file_size = file.metadata()?.len(); + + let mmap = if file_size == 0 { + unsafe { Mmap::map(&file)? } + } else { + unsafe { opts.map(&file)? } + }; + + let line_index = { + let mut reader = BufReader::new(&file); + LineIndex::from_reader(&mut reader)? + }; + + Ok(Self { + mmap, + file, + line_index, + file_size, + }) + } + + /// Apply madvise to the entire mapping. + fn advise(&self, advice: Advice) { + let _ = self.mmap.advise(advice); + } + + /// Grow the mmap to `new_size` bytes. + /// + /// # Safety + /// Caller must ensure no `&[u8]` references to the old mmap data exist. + /// After remap, any pointers derived from the old mapping may be invalid + /// (if the kernel moved the mapping). + pub unsafe fn remap(&mut self, new_size: usize) -> std::io::Result<()> { + unsafe { + self.mmap + .remap(new_size, RemapOptions::new().may_move(true))?; + } + self.file_size = new_size as u64; + Ok(()) + } + + #[inline] + pub fn file_size(&self) -> u64 { + self.file_size + } + + #[inline] + pub fn total_lines(&self) -> usize { + self.line_index.line_count() + } + + #[inline] + pub fn get_line(&self, idx: usize) -> Option { + self.line_index + .get_line(&self.mmap, idx) + .map(|s| s.to_owned()) + } + + #[inline] + pub fn read_range(&self, offset: u64, len: usize) -> Option> { + let start = offset as usize; + let end = start.checked_add(len)?; + if end > self.mmap.len() { + return None; + } + Some(self.mmap[start..end].to_vec()) + } +} + +// ─── 5 Variants ────────────────────────────────────────────────────────────── + +/// Variant 1: Plain mmap, no madvise. +pub struct MmapReaderPlain { + inner: MmapReader, +} + +impl MmapReaderPlain { + pub fn remap(&mut self, new_size: usize) -> std::io::Result<()> { + unsafe { self.inner.remap(new_size) } + } +} + +impl FileReaderBackend for MmapReaderPlain { + fn name(&self) -> &str { + "mmap_plain" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + install_sigbus_handler(); + let inner = MmapReader::open_raw(path)?; + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range(offset, len) + } + + fn close(self) {} +} + +/// Variant 2: mmap with MADV_SEQUENTIAL — optimal for sequential scan (index build). +pub struct MmapReaderSequential { + inner: MmapReader, +} + +impl FileReaderBackend for MmapReaderSequential { + fn name(&self) -> &str { + "mmap_sequential" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + install_sigbus_handler(); + let inner = MmapReader::open_raw(path)?; + inner.advise(Advice::Sequential); + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range(offset, len) + } + + fn close(self) {} +} + +/// Variant 3: mmap with MADV_RANDOM — optimal for random line access after index is built. +pub struct MmapReaderRandom { + inner: MmapReader, +} + +impl FileReaderBackend for MmapReaderRandom { + fn name(&self) -> &str { + "mmap_random" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + install_sigbus_handler(); + let inner = MmapReader::open_raw(path)?; + inner.advise(Advice::Random); + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range(offset, len) + } + + fn close(self) {} +} + +/// Variant 4: mmap with MAP_POPULATE — pre-fault all pages at mmap time. +/// Trades higher upfront cost for smoother subsequent access. +pub struct MmapReaderPopulate { + inner: MmapReader, +} + +impl FileReaderBackend for MmapReaderPopulate { + fn name(&self) -> &str { + "mmap_populate" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + install_sigbus_handler(); + let mut opts = MmapOptions::new(); + opts.populate(); + let inner = MmapReader::open_with_options(path, &opts)?; + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range(offset, len) + } + + fn close(self) {} +} + +/// Variant 5: Phase-aware — MADV_SEQUENTIAL during index build, then MADV_RANDOM +/// for line access. Best of both worlds for the read-index-then-query pattern. +pub struct MmapReaderPhaseAware { + inner: MmapReader, +} + +impl FileReaderBackend for MmapReaderPhaseAware { + fn name(&self) -> &str { + "mmap_phase_aware" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + install_sigbus_handler(); + let inner = MmapReader::open_raw(path)?; + // Index build used sequential streaming via BufReader. Now switch to random. + inner.advise(Advice::Random); + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range(offset, len) + } + + fn close(self) {} +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write as _; + use tempfile::NamedTempFile; + + fn create_temp_file(content: &[u8]) -> NamedTempFile { + let mut f = NamedTempFile::new().unwrap(); + f.write_all(content).unwrap(); + f.flush().unwrap(); + f + } + + #[test] + fn test_plain_open_and_read_lines() { + let f = create_temp_file(b"hello\nworld\nfoo\n"); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.name(), "mmap_plain"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(0), Some("hello".to_owned())); + assert_eq!(reader.get_line(1), Some("world".to_owned())); + assert_eq!(reader.get_line(2), Some("foo".to_owned())); + assert_eq!(reader.get_line(3), None); + reader.close(); + } + + #[test] + fn test_plain_out_of_bounds_returns_none() { + let f = create_temp_file(b"line1\nline2\n"); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 2); + assert_eq!(reader.get_line(100), None); + assert_eq!(reader.get_line(usize::MAX), None); + reader.close(); + } + + #[test] + fn test_plain_read_range() { + let content = b"hello world\nfoo bar\n"; + let f = create_temp_file(content); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + let range = reader.read_range(0, 5).unwrap(); + assert_eq!(&range, b"hello"); + let range = reader.read_range(6, 5).unwrap(); + assert_eq!(&range, b"world"); + assert_eq!(reader.read_range(0, 1000), None); + reader.close(); + } + + #[test] + fn test_plain_empty_file() { + let f = create_temp_file(b""); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 0); + assert_eq!(reader.file_size(), 0); + assert_eq!(reader.get_line(0), None); + reader.close(); + } + + #[test] + fn test_sequential_variant() { + let f = create_temp_file(b"alpha\nbeta\ngamma\n"); + let reader = MmapReaderSequential::open(f.path()).unwrap(); + assert_eq!(reader.name(), "mmap_sequential"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(1), Some("beta".to_owned())); + reader.close(); + } + + #[test] + fn test_random_variant() { + let f = create_temp_file(b"one\ntwo\nthree\n"); + let reader = MmapReaderRandom::open(f.path()).unwrap(); + assert_eq!(reader.name(), "mmap_random"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(2), Some("three".to_owned())); + reader.close(); + } + + #[test] + fn test_populate_variant() { + let f = create_temp_file(b"x\ny\nz\n"); + let reader = MmapReaderPopulate::open(f.path()).unwrap(); + assert_eq!(reader.name(), "mmap_populate"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(0), Some("x".to_owned())); + reader.close(); + } + + #[test] + fn test_phase_aware_variant() { + let f = create_temp_file(b"a\nb\nc\n"); + let reader = MmapReaderPhaseAware::open(f.path()).unwrap(); + assert_eq!(reader.name(), "mmap_phase_aware"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(1), Some("b".to_owned())); + reader.close(); + } + + #[test] + fn test_file_size() { + let content = b"hello world"; + let f = create_temp_file(content); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.file_size(), content.len() as u64); + reader.close(); + } + + #[test] + fn test_sigbus_flag_api() { + reset_sigbus_flag(); + assert!(!sigbus_flag()); + } + + #[test] + fn test_concurrent_open_installs_handler_once() { + let f = create_temp_file(b"line1\nline2\nline3\n"); + let path = f.path().to_owned(); + let num_threads = 8; + + let handles: Vec<_> = (0..num_threads) + .map(|thread_id| { + let path = path.clone(); + std::thread::spawn(move || { + let reader = match thread_id % 5 { + 0 => { + let r = MmapReaderPlain::open(&path).unwrap(); + assert_eq!(r.total_lines(), 3); + r.close(); + "plain" + } + 1 => { + let r = MmapReaderSequential::open(&path).unwrap(); + assert_eq!(r.total_lines(), 3); + r.close(); + "sequential" + } + 2 => { + let r = MmapReaderRandom::open(&path).unwrap(); + assert_eq!(r.total_lines(), 3); + r.close(); + "random" + } + 3 => { + let r = MmapReaderPopulate::open(&path).unwrap(); + assert_eq!(r.total_lines(), 3); + r.close(); + "populate" + } + _ => { + let r = MmapReaderPhaseAware::open(&path).unwrap(); + assert_eq!(r.total_lines(), 3); + r.close(); + "phase_aware" + } + }; + reader.to_owned() + }) + }) + .collect(); + + let results: Vec = handles + .into_iter() + .map(|h| h.join().expect("thread panicked")) + .collect(); + assert_eq!(results.len(), num_threads); + } + + #[test] + fn test_no_trailing_newline() { + let f = create_temp_file(b"line1\nline2"); + let reader = MmapReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 2); + assert_eq!(reader.get_line(0), Some("line1".to_owned())); + assert_eq!(reader.get_line(1), Some("line2".to_owned())); + reader.close(); + } + + /// Diagnostic test: measure how long each stage of `open_raw()` takes + /// on a large file (e.g. the 5GB extreme.log). + /// Run with: cargo test -p log-viewer-bench --release -- --nocapture diag_open_stages + #[test] + fn diag_open_stages() { + let path = std::path::Path::new("/tmp/test-logviewer/extreme.log"); + if !path.exists() { + eprintln!("SKIP: test file not found"); + return; + } + + // Stage 1: File::open + metadata + let t0 = std::time::Instant::now(); + let file = File::open(path).unwrap(); + let file_size = file.metadata().unwrap().len(); + let stage1 = t0.elapsed(); + eprintln!( + "Stage 1 - File::open + metadata: {:.2}ms ({:.1}GB)", + stage1.as_secs_f64() * 1000.0, + file_size as f64 / 1073741824.0 + ); + + // Stage 2: mmap::map + let t1 = std::time::Instant::now(); + let _mmap = unsafe { Mmap::map(&file) }.unwrap(); + let stage2 = t1.elapsed(); + eprintln!( + "Stage 2 - mmap::map: {:.2}ms", + stage2.as_secs_f64() * 1000.0 + ); + drop(_mmap); + + // Stage 3: LineIndex::from_reader via BufReader (default 8KB buffer) + let t2 = std::time::Instant::now(); + let mut reader = BufReader::new(&file); + let line_index = crate::line_index::LineIndex::from_reader(&mut reader).unwrap(); + let stage3 = t2.elapsed(); + eprintln!( + "Stage 3 - LineIndex::from_reader: {:.2}ms ({} lines, {} sampled_offsets)", + stage3.as_secs_f64() * 1000.0, + line_index.line_count(), + line_index.sampled_offsets.len() + ); + + // Stage 3b: Try with 1MB buffer + let file2 = File::open(path).unwrap(); + let t2b = std::time::Instant::now(); + let mut reader2 = BufReader::with_capacity(1024 * 1024, &file2); + let line_index2 = crate::line_index::LineIndex::from_reader(&mut reader2).unwrap(); + let stage3b = t2b.elapsed(); + eprintln!( + "Stage 3b - LineIndex (1MB buffer): {:.2}ms", + stage3b.as_secs_f64() * 1000.0 + ); + + eprintln!("\n=== 瓶颈分析 ==="); + let total_ms = stage1.as_secs_f64() * 1000.0 + + stage2.as_secs_f64() * 1000.0 + + stage3.as_secs_f64() * 1000.0; + let total_dur = stage1 + stage2 + stage3; + eprintln!("Total ~{:.0}ms", total_ms); + eprintln!( + " File::open: {:.1}% ({:.0}ms)", + stage1.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, + stage1.as_secs_f64() * 1000.0 + ); + eprintln!( + " mmap::map: {:.1}% ({:.0}ms)", + stage2.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, + stage2.as_secs_f64() * 1000.0 + ); + eprintln!( + " from_reader: {:.1}% ({:.0}ms)", + stage3.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0, + stage3.as_secs_f64() * 1000.0 + ); + + // Suppress unused warnings + let _ = line_index2; + } +} diff --git a/crates/bench/src/pread_reader.rs b/crates/bench/src/pread_reader.rs new file mode 100644 index 0000000..b6d43d9 --- /dev/null +++ b/crates/bench/src/pread_reader.rs @@ -0,0 +1,454 @@ +// ─── pread_reader.rs ────────────────────────────────────────────────────────── +// pread-based FileReaderBackend implementations with 3 variants for benchmarking. +// Uses ReadCache for 4KB block caching to reduce syscalls, and posix_fadvise +// for kernel readahead control. +// +// CRITICAL: get_line() is custom — it does NOT use LineIndex::get_line() which +// requires a full &[u8] data slice. Instead it uses sampled_offsets directly +// and reads on-demand via pread. +// ────────────────────────────────────────────────────────────────────────────── + +use std::cell::RefCell; +use std::fs::File; +use std::io::BufReader; +use std::os::unix::fs::FileExt; +use std::os::unix::io::AsRawFd; +use std::path::Path; + +use crate::line_index::LineIndex; +use crate::FileReaderBackend; + +const BLOCK_SIZE: usize = 256; +const CACHE_CHUNK: usize = 4096; + +// ─── ReadCache ──────────────────────────────────────────────────────────────── + +/// Single-block read cache. Reduces syscalls by caching the last read. +/// Typical cache hit: sequential get_line() calls within the same 4KB block. +struct ReadCache { + buf: Vec, + buf_offset: u64, + buf_len: usize, +} + +impl ReadCache { + fn new() -> Self { + Self { + buf: vec![0u8; CACHE_CHUNK], + buf_offset: 0, + buf_len: 0, + } + } + + /// Read `len` bytes starting at `offset`. Returns a slice into the cache. + /// On cache hit (range fully within cached block), no syscall needed. + /// On miss, performs a `read_exact_at` syscall. + fn get(&mut self, file: &File, offset: u64, len: usize) -> std::io::Result<&[u8]> { + let end = offset + len as u64; + if offset >= self.buf_offset && end <= self.buf_offset + self.buf_len as u64 { + let start = (offset - self.buf_offset) as usize; + return Ok(&self.buf[start..start + len]); + } + let alloc_len = CACHE_CHUNK.max(len); + self.buf.resize(alloc_len, 0); + file.read_exact_at(&mut self.buf[..len], offset)?; + self.buf_offset = offset; + self.buf_len = len; + Ok(&self.buf[..len]) + } +} + +// ─── PreadReaderCore ────────────────────────────────────────────────────────── + +/// Core pread-based reader. Uses ReadCache and sparse LineIndex for on-demand +/// line retrieval without mmap. +pub struct PreadReaderCore { + file: File, + line_index: LineIndex, + file_size: u64, + cache: RefCell, +} + +impl PreadReaderCore { + /// Open file, build line index via streaming BufReader, prepare pread reader. + fn open_raw(path: &Path) -> std::io::Result { + let file = File::open(path)?; + let file_size = file.metadata()?.len(); + + let line_index = { + let mut reader = BufReader::new(&file); + LineIndex::from_reader(&mut reader)? + }; + + Ok(Self { + file, + line_index, + file_size, + cache: RefCell::new(ReadCache::new()), + }) + } + + /// Apply posix_fadvise to the entire file. + fn advise(&self, advice: libc::c_int) -> std::io::Result<()> { + let fd = self.file.as_raw_fd(); + let ret = unsafe { libc::posix_fadvise(fd, 0, self.file_size as i64, advice) }; + if ret != 0 { + return Err(std::io::Error::from_raw_os_error(ret)); + } + Ok(()) + } + + /// Custom get_line using sparse index + pread. + /// Does NOT use LineIndex::get_line() (which needs full &[u8] slice). + /// + /// Algorithm: + /// 1. Look up sampled_offsets[block] to get approximate byte position + /// 2. Scan forward through offset_in_block newlines via memchr + /// 3. Collect bytes from that position until next newline (may span 4KB blocks) + fn get_line_impl(&self, idx: usize) -> Option { + let total = self.line_index.total_lines as usize; + if idx >= total || total == 0 { + return None; + } + + let block = idx / BLOCK_SIZE; + let offset_in_block = idx % BLOCK_SIZE; + + let mut cache = self.cache.borrow_mut(); + + // Phase 1: Find byte position of the start of line `idx`. + // Start at sampled_offsets[block], scan forward through `offset_in_block` newlines. + let start_offset = self.line_index.sampled_offsets[block]; + let mut pos = start_offset; + let mut newlines_found = 0; + + while newlines_found < offset_in_block { + let remaining = self.file_size.saturating_sub(pos) as usize; + if remaining == 0 { + return None; + } + let to_read = CACHE_CHUNK.min(remaining); + let data = cache.get(&self.file, pos, to_read).ok()?; + + for byte_pos in memchr::memchr_iter(b'\n', data) { + newlines_found += 1; + if newlines_found == offset_in_block { + pos = pos + byte_pos as u64 + 1; + break; + } + } + + if newlines_found < offset_in_block { + pos += to_read as u64; + } + } + + // Phase 2: Collect bytes from `pos` until next newline or EOF. + // Line data may span multiple 4KB cache blocks. + let mut result = Vec::new(); + + while pos < self.file_size { + let remaining = (self.file_size - pos) as usize; + let to_read = CACHE_CHUNK.min(remaining); + let data = cache.get(&self.file, pos, to_read).ok()?; + + match memchr::memchr(b'\n', data) { + Some(rel) => { + result.extend_from_slice(&data[..rel]); + break; + } + None => { + result.extend_from_slice(data); + pos += to_read as u64; + } + } + } + + String::from_utf8(result) + .ok() + .map(|s| s.trim_end_matches(['\r', '\n']).to_owned()) + } + + /// Read a raw byte range from the file using pread. + fn read_range_impl(&self, offset: u64, len: usize) -> Option> { + let end = offset.checked_add(len as u64)?; + if end > self.file_size { + return None; + } + let mut buf = vec![0u8; len]; + self.file.read_exact_at(&mut buf, offset).ok()?; + Some(buf) + } + + #[inline] + pub fn file_size(&self) -> u64 { + self.file_size + } + + #[inline] + pub fn total_lines(&self) -> usize { + self.line_index.line_count() + } +} + +// ─── 3 Variants ─────────────────────────────────────────────────────────────── + +/// Variant 1: Plain pread, no fadvise. +pub struct PreadReaderPlain { + inner: PreadReaderCore, +} + +impl FileReaderBackend for PreadReaderPlain { + fn name(&self) -> &str { + "pread_plain" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + let inner = PreadReaderCore::open_raw(path)?; + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line_impl(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range_impl(offset, len) + } + + fn close(self) {} +} + +/// Variant 2: pread with POSIX_FADV_RANDOM — disable kernel readahead. +/// Best for random-access line lookup patterns. +pub struct PreadReaderRandom { + inner: PreadReaderCore, +} + +impl FileReaderBackend for PreadReaderRandom { + fn name(&self) -> &str { + "pread_random" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + let inner = PreadReaderCore::open_raw(path)?; + inner.advise(libc::POSIX_FADV_RANDOM)?; + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line_impl(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range_impl(offset, len) + } + + fn close(self) {} +} + +/// Variant 3: pread with POSIX_FADV_SEQUENTIAL — aggressive kernel readahead. +/// Best for sequential scan patterns. +pub struct PreadReaderSequential { + inner: PreadReaderCore, +} + +impl FileReaderBackend for PreadReaderSequential { + fn name(&self) -> &str { + "pread_sequential" + } + + fn open(path: &Path) -> std::io::Result + where + Self: Sized, + { + let inner = PreadReaderCore::open_raw(path)?; + inner.advise(libc::POSIX_FADV_SEQUENTIAL)?; + Ok(Self { inner }) + } + + fn file_size(&self) -> u64 { + self.inner.file_size() + } + + fn total_lines(&self) -> usize { + self.inner.total_lines() + } + + fn get_line(&self, idx: usize) -> Option { + self.inner.get_line_impl(idx) + } + + fn read_range(&self, offset: u64, len: usize) -> Option> { + self.inner.read_range_impl(offset, len) + } + + fn close(self) {} +} + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write as _; + use tempfile::NamedTempFile; + + fn create_temp_file(content: &[u8]) -> NamedTempFile { + let mut f = NamedTempFile::new().unwrap(); + f.write_all(content).unwrap(); + f.flush().unwrap(); + f + } + + #[test] + fn test_plain_open_and_read_lines() { + let f = create_temp_file(b"hello\nworld\nfoo\n"); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.name(), "pread_plain"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(0), Some("hello".to_owned())); + assert_eq!(reader.get_line(1), Some("world".to_owned())); + assert_eq!(reader.get_line(2), Some("foo".to_owned())); + assert_eq!(reader.get_line(3), None); + reader.close(); + } + + #[test] + fn test_plain_out_of_bounds_returns_none() { + let f = create_temp_file(b"line1\nline2\n"); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 2); + assert_eq!(reader.get_line(100), None); + assert_eq!(reader.get_line(usize::MAX), None); + reader.close(); + } + + #[test] + fn test_plain_read_range() { + let content = b"hello world\nfoo bar\n"; + let f = create_temp_file(content); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + let range = reader.read_range(0, 5).unwrap(); + assert_eq!(&range, b"hello"); + let range = reader.read_range(6, 5).unwrap(); + assert_eq!(&range, b"world"); + assert_eq!(reader.read_range(0, 1000), None); + reader.close(); + } + + #[test] + fn test_plain_empty_file() { + let f = create_temp_file(b""); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 0); + assert_eq!(reader.file_size(), 0); + assert_eq!(reader.get_line(0), None); + reader.close(); + } + + #[test] + fn test_random_variant() { + let f = create_temp_file(b"one\ntwo\nthree\n"); + let reader = PreadReaderRandom::open(f.path()).unwrap(); + assert_eq!(reader.name(), "pread_random"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(2), Some("three".to_owned())); + reader.close(); + } + + #[test] + fn test_sequential_variant() { + let f = create_temp_file(b"alpha\nbeta\ngamma\n"); + let reader = PreadReaderSequential::open(f.path()).unwrap(); + assert_eq!(reader.name(), "pread_sequential"); + assert_eq!(reader.total_lines(), 3); + assert_eq!(reader.get_line(1), Some("beta".to_owned())); + reader.close(); + } + + #[test] + fn test_file_size() { + let content = b"hello world"; + let f = create_temp_file(content); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.file_size(), content.len() as u64); + reader.close(); + } + + #[test] + fn test_no_trailing_newline() { + let f = create_temp_file(b"line1\nline2"); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 2); + assert_eq!(reader.get_line(0), Some("line1".to_owned())); + assert_eq!(reader.get_line(1), Some("line2".to_owned())); + reader.close(); + } + + #[test] + fn test_line_spanning_cache_boundary() { + // Create a line that spans a 4KB boundary + let mut content = vec![b'a'; 4090]; + content.push(b'\n'); + content.extend_from_slice(b"target\n"); + let f = create_temp_file(&content); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.get_line(0).unwrap().len(), 4090); + assert_eq!(reader.get_line(1), Some("target".to_owned())); + assert_eq!(reader.get_line(2), None); + reader.close(); + } + + #[test] + fn test_many_lines_random_access() { + // Create 300 lines to test block boundary (256 lines per block) + let mut content = String::new(); + for i in 0..300 { + content.push_str(&format!("line_{}\n", i)); + } + let f = create_temp_file(content.as_bytes()); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.total_lines(), 300); + // Test lines across the 256-line block boundary + assert_eq!(reader.get_line(0), Some("line_0".to_owned())); + assert_eq!(reader.get_line(255), Some("line_255".to_owned())); + assert_eq!(reader.get_line(256), Some("line_256".to_owned())); + assert_eq!(reader.get_line(299), Some("line_299".to_owned())); + assert_eq!(reader.get_line(300), None); + reader.close(); + } + + #[test] + fn test_truncation_graceful_error() { + // Verify that truncated file access returns None/Err instead of panicking + let f = create_temp_file(b"hello\nworld\n"); + let reader = PreadReaderPlain::open(f.path()).unwrap(); + assert_eq!(reader.get_line(0), Some("hello".to_owned())); + // Truncate behind the reader — pread may return zeros or error + let _ = reader.get_line(1); // must not panic + reader.close(); + } +} diff --git a/crates/bench/src/report.rs b/crates/bench/src/report.rs new file mode 100644 index 0000000..bc870aa --- /dev/null +++ b/crates/bench/src/report.rs @@ -0,0 +1,207 @@ +use crate::metrics; +use crate::types::BenchmarkResult; + +fn format_rss_mb(kb: u64) -> String { + format!("{:.1}MB", kb as f64 / 1024.0) +} + +fn format_faults(count: u64) -> String { + if count < 1000 { + count.to_string() + } else { + let s = count.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() + } +} + +// Format benchmark results as Markdown, grouped by category. +pub fn format_report(results: &[BenchmarkResult]) -> String { + let mut report = String::new(); + report.push_str("# Benchmark: mmap vs pread\n\n"); + + let mut categories: std::collections::BTreeMap<&str, Vec<&BenchmarkResult>> = + std::collections::BTreeMap::new(); + for r in results { + categories.entry(&r.category).or_default().push(r); + } + + for (category, category_results) in &categories { + report.push_str(&format!("## {}\n\n", capitalize(category))); + + let mut tests: std::collections::BTreeMap<&str, Vec<&BenchmarkResult>> = + std::collections::BTreeMap::new(); + for r in category_results { + tests.entry(&r.test_name).or_default().push(r); + } + + let mut variants: Vec<(String, String)> = Vec::new(); + for r in category_results { + let key = format!("{} ({})", r.backend, r.variant); + if !variants + .iter() + .any(|(b, v)| format!("{} ({})", b, v) == key) + { + variants.push((r.backend.clone(), r.variant.clone())); + } + } + + report.push_str("### Latency\n\n"); + report.push_str("| Test |"); + for (backend, variant) in &variants { + report.push_str(&format!(" {} ({}) |", backend, variant)); + } + report.push_str(" Winner |\n"); + + report.push_str("|------|"); + for _ in &variants { + report.push_str("------|"); + } + report.push_str("--------|\n"); + + for (test_name, test_results) in &tests { + report.push_str(&format!("| {} |", test_name)); + + let mut all_means: Vec = Vec::new(); + for (backend, variant) in &variants { + if let Some(r) = test_results + .iter() + .find(|r| r.backend == *backend && r.variant == *variant) + { + let avg = if r.latency_us.is_empty() { + 0.0 + } else { + metrics::mean(&r.latency_us) + }; + all_means.push(avg); + } + } + let use_ms = all_means.iter().any(|&v| v >= 1000.0); + + let mut best_backend = String::new(); + let mut best_latency = f64::MAX; + + for (backend, variant) in &variants { + let matching = test_results + .iter() + .find(|r| r.backend == *backend && r.variant == *variant); + if let Some(r) = matching { + let avg = if r.latency_us.is_empty() { + 0.0 + } else { + metrics::mean(&r.latency_us) + }; + let sd = metrics::stdev(&r.latency_us); + let p95_val = metrics::p95(&r.latency_us) as f64; + + let cell = if use_ms { + format!( + "{:.2}\u{00b1}{:.2}ms (p95:{:.2}ms)", + avg / 1000.0, + sd / 1000.0, + p95_val / 1000.0 + ) + } else { + format!( + "{:.1}\u{00b1}{:.1}\u{b5}s (p95:{:.1}\u{b5}s)", + avg, sd, p95_val + ) + }; + report.push_str(&format!(" {} |", cell)); + if avg > 0.0 && avg < best_latency { + best_latency = avg; + best_backend = format!("{} ({})", backend, variant); + } + } else { + report.push_str(" - |"); + } + } + report.push_str(&format!( + " {} |\n", + if best_backend.is_empty() { + "-" + } else { + &best_backend + } + )); + } + + report.push('\n'); + + let has_memory = category_results + .iter() + .any(|r| r.rss_kb > 0 || r.rss_peak_kb > 0); + + if has_memory { + report.push_str("### Memory\n\n"); + report.push_str("| Test | Variant | RSS | Peak RSS | Page Faults |\n"); + report.push_str("|------|---------|-----|----------|-------------|\n"); + + for r in category_results { + let variant_label = format!("{} ({})", r.backend, r.variant); + report.push_str(&format!( + "| {} | {} | {} | {} | {} |\n", + r.test_name, + variant_label, + format_rss_mb(r.rss_kb), + format_rss_mb(r.rss_peak_kb), + format_faults(r.page_faults), + )); + } + report.push('\n'); + } + + let extras: Vec<(String, String, Vec<(String, f64)>)> = category_results + .iter() + .filter(|r| !r.extra.is_empty()) + .map(|r| { + let mut pairs: Vec<(String, f64)> = + r.extra.iter().map(|(k, &v)| (k.clone(), v)).collect(); + pairs.sort_by(|a, b| a.0.cmp(&b.0)); + ( + r.test_name.clone(), + format!("{} ({})", r.backend, r.variant), + pairs, + ) + }) + .collect(); + + if !extras.is_empty() { + report.push_str("### Extra Metrics\n\n"); + report.push_str("| Test | Variant | Metric | Value |\n"); + report.push_str("|------|---------|--------|-------|\n"); + + for (test_name, variant_label, pairs) in &extras { + for (key, val) in pairs { + report.push_str(&format!( + "| {} | {} | {} | {:.3} |\n", + test_name, variant_label, key, val + )); + } + } + report.push('\n'); + } + } + + report.push_str("## Summary\n\n"); + report.push_str(&format!("- Total benchmarks: {}\n", results.len())); + report.push_str("- Categories: "); + report.push_str(&categories.keys().cloned().collect::>().join(", ")); + report.push('\n'); + + report +} + +fn capitalize(s: &str) -> String { + let mut c = s.chars(); + match c.next() { + None => String::new(), + Some(f) => f.to_uppercase().chain(c).collect(), + } +} diff --git a/crates/bench/src/runner.rs b/crates/bench/src/runner.rs new file mode 100644 index 0000000..726561f --- /dev/null +++ b/crates/bench/src/runner.rs @@ -0,0 +1,59 @@ +use crate::metrics::MetricsCollector; +use crate::types::BenchmarkResult; +use std::path::PathBuf; + +pub struct BenchConfig { + pub test_file: PathBuf, + pub quick_mode: bool, + pub suites: Option>, +} + +fn warn_reset_hwm() { + if let Err(e) = MetricsCollector::reset_vm_hwm() { + if e.kind() == std::io::ErrorKind::PermissionDenied { + eprintln!("WARNING: VmHWM reset requires root: {e}"); + } + } +} + +pub fn run_all(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + let should_run = |name: &str| -> bool { + match &config.suites { + Some(suites) => suites.iter().any(|s| s == name), + None => true, + } + }; + + if should_run("startup") { + warn_reset_hwm(); + results.extend(crate::suites::startup::run(config)); + } + if should_run("render") { + warn_reset_hwm(); + results.extend(crate::suites::render::run(config)); + } + if should_run("jump") { + warn_reset_hwm(); + results.extend(crate::suites::jump::run(config)); + } + if should_run("memory") { + warn_reset_hwm(); + results.extend(crate::suites::memory::run(config)); + } + if should_run("growth") { + warn_reset_hwm(); + results.extend(crate::suites::growth::run(config)); + } + if should_run("rotation") { + warn_reset_hwm(); + results.extend(crate::suites::rotation::run(config)); + } + if should_run("concurrent") { + warn_reset_hwm(); + results.extend(crate::suites::concurrent::run(config)); + } + + results +} diff --git a/crates/bench/src/suites/concurrent.rs b/crates/bench/src/suites/concurrent.rs new file mode 100644 index 0000000..15c521d --- /dev/null +++ b/crates/bench/src/suites/concurrent.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; + +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{ + MmapReaderPhaseAware, MmapReaderPlain, MmapReaderPopulate, MmapReaderRandom, + MmapReaderSequential, +}; +use crate::pread_reader::{PreadReaderPlain, PreadReaderRandom, PreadReaderSequential}; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + results.extend(bench_parallel_reads::( + "mmap", "plain", config, + )); + results.extend(bench_parallel_reads::( + "mmap", + "sequential", + config, + )); + results.extend(bench_parallel_reads::( + "mmap", "random", config, + )); + results.extend(bench_parallel_reads::( + "mmap", "populate", config, + )); + results.extend(bench_parallel_reads::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_parallel_reads::( + "pread", "plain", config, + )); + results.extend(bench_parallel_reads::( + "pread", "random", config, + )); + results.extend(bench_parallel_reads::( + "pread", + "sequential", + config, + )); + + results +} + +fn bench_parallel_reads( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = config.test_file.clone(); + let iterations = if config.quick_mode { 250 } else { 1000 }; + + let total_lines = { + let reader = B::open(&path).expect("Failed to open file for line count"); + let count = reader.total_lines(); + reader.close(); + count + }; + + let overall_start = std::time::Instant::now(); + let num_threads = 4usize; + + let handles: Vec<_> = (0..num_threads) + .map(|thread_id| { + let path = path.clone(); + std::thread::spawn(move || { + let reader = B::open(&path).expect("Failed to open file in thread"); + let mut latencies = Vec::with_capacity(iterations); + + for i in 0..iterations { + let line_idx = (thread_id * iterations + i) % total_lines.max(1); + let t = std::time::Instant::now(); + let _ = reader.get_line(line_idx); + latencies.push(t.elapsed().as_micros() as u64); + } + + reader.close(); + latencies + }) + }) + .collect(); + + let thread_latencies: Vec> = handles + .into_iter() + .map(|h| h.join().expect("Thread panicked")) + .collect(); + + let total_elapsed = overall_start.elapsed(); + + let all_latencies: Vec = thread_latencies.into_iter().flatten().collect(); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("num_threads".into(), num_threads as f64); + extra.insert("iterations_per_thread".into(), iterations as f64); + extra.insert("total_time_us".into(), total_elapsed.as_micros() as f64); + extra.insert("total_lines".into(), total_lines as f64); + + vec![BenchmarkResult { + category: "concurrent".into(), + test_name: "parallel_reads".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: all_latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/growth.rs b/crates/bench/src/suites/growth.rs new file mode 100644 index 0000000..e206c62 --- /dev/null +++ b/crates/bench/src/suites/growth.rs @@ -0,0 +1,269 @@ +use std::collections::HashMap; + +use crate::data_gen; +use crate::metrics::MetricsCollector; +use crate::mmap_reader::MmapReaderPlain; +use crate::pread_reader::PreadReaderPlain; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + results.extend(bench_append_visibility_mmap(config, dir.path())); + results.extend(bench_append_visibility_pread(config, dir.path())); + results.extend(bench_remap_cost(config, dir.path())); + results.extend(bench_scroll_during_append(config, dir.path())); + results.extend(bench_high_frequency_append(config, dir.path())); + + results +} + +fn bench_append_visibility_mmap( + config: &BenchConfig, + dir: &std::path::Path, +) -> Vec { + let path = data_gen::generate_growable_file(dir).expect("Failed to create growable file"); + let append_count: usize = if config.quick_mode { 100 } else { 1000 }; + + let mut reader = MmapReaderPlain::open(&path).expect("Failed to open growable file"); + let original_lines = reader.total_lines(); + let original_size = reader.file_size(); + + data_gen::append_lines(&path, append_count).expect("Failed to append lines"); + + let new_metadata = std::fs::metadata(&path).expect("Failed to read metadata"); + let new_size = new_metadata.len() as usize; + + let remap_start = std::time::Instant::now(); + reader.remap(new_size).expect("Failed to remap"); + let remap_elapsed = remap_start.elapsed(); + + let new_line_bytes = + reader.read_range(original_size, (new_size - original_size as usize).min(256)); + let visible = new_line_bytes.is_some(); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("original_lines".into(), original_lines as f64); + extra.insert("appended_lines".into(), append_count as f64); + extra.insert("new_bytes_visible".into(), visible as u64 as f64); + extra.insert("original_size".into(), original_size as f64); + extra.insert("new_size".into(), new_size as f64); + extra.insert("remap_us".into(), remap_elapsed.as_micros() as f64); + + reader.close(); + + vec![BenchmarkResult { + category: "growth".into(), + test_name: "append_visibility_mmap".into(), + backend: "mmap".into(), + variant: "plain".into(), + latency_us: vec![remap_elapsed.as_micros() as u64], + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_append_visibility_pread( + config: &BenchConfig, + dir: &std::path::Path, +) -> Vec { + let sub_dir = dir.join("pread_growth"); + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file"); + let append_count: usize = if config.quick_mode { 100 } else { 1000 }; + + let reader = PreadReaderPlain::open(&path).expect("Failed to open growable file"); + let original_lines = reader.total_lines(); + reader.close(); + + data_gen::append_lines(&path, append_count).expect("Failed to append lines"); + + let reopen_start = std::time::Instant::now(); + let new_reader = PreadReaderPlain::open(&path).expect("Failed to reopen file"); + let reopen_elapsed = reopen_start.elapsed(); + + let new_lines = new_reader.total_lines(); + let can_read_new = new_lines > original_lines; + if can_read_new { + let last_line = new_lines.saturating_sub(1); + let _ = new_reader.get_line(last_line); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("original_lines".into(), original_lines as f64); + extra.insert("appended_lines".into(), append_count as f64); + extra.insert("new_total_lines".into(), new_lines as f64); + extra.insert("reopen_us".into(), reopen_elapsed.as_micros() as f64); + + new_reader.close(); + + vec![BenchmarkResult { + category: "growth".into(), + test_name: "append_visibility_pread".into(), + backend: "pread".into(), + variant: "plain".into(), + latency_us: vec![reopen_elapsed.as_micros() as u64], + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_remap_cost(config: &BenchConfig, dir: &std::path::Path) -> Vec { + let sub_dir = dir.join("remap_cost"); + let append_count: usize = if config.quick_mode { 100 } else { 1000 }; + let iterations: usize = if config.quick_mode { 5 } else { 20 }; + + let mut latencies = Vec::with_capacity(iterations); + + for _ in 0..iterations { + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file"); + let mut reader = MmapReaderPlain::open(&path).expect("Failed to open file"); + + data_gen::append_lines(&path, append_count).expect("Failed to append"); + + let new_size = std::fs::metadata(&path).expect("metadata").len() as usize; + let t = std::time::Instant::now(); + reader.remap(new_size).expect("remap failed"); + latencies.push(t.elapsed().as_micros() as u64); + + reader.close(); + let _ = std::fs::remove_file(&path); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("appended_per_iter".into(), append_count as f64); + extra.insert("iterations".into(), iterations as f64); + + vec![BenchmarkResult { + category: "growth".into(), + test_name: "remap_cost".into(), + backend: "mmap".into(), + variant: "plain".into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_scroll_during_append(config: &BenchConfig, dir: &std::path::Path) -> Vec { + let sub_dir = dir.join("scroll_append"); + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file"); + let duration_secs: u64 = if config.quick_mode { 2 } else { 10 }; + let append_rate: usize = if config.quick_mode { 1000 } else { 10000 }; + + let bg_path = path.clone(); + let bg_handle = std::thread::spawn(move || { + let batch_size = 100; + let batch_interval = + std::time::Duration::from_micros(1_000_000 / (append_rate / batch_size).max(1) as u64); + let start = std::time::Instant::now(); + while start.elapsed().as_secs() < duration_secs { + data_gen::append_lines(&bg_path, batch_size).ok(); + std::thread::sleep(batch_interval); + } + }); + + let reader = PreadReaderPlain::open(&path).expect("Failed to open file"); + let mut frame_latencies = Vec::new(); + let mut current_line = 0usize; + let scroll_start = std::time::Instant::now(); + + while scroll_start.elapsed().as_secs() < duration_secs { + let t = std::time::Instant::now(); + let _ = reader.get_line(current_line); + frame_latencies.push(t.elapsed().as_micros() as u64); + current_line += 1; + } + + reader.close(); + bg_handle.join().ok(); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("duration_secs".into(), duration_secs as f64); + extra.insert("append_rate_per_sec".into(), append_rate as f64); + extra.insert("frames_rendered".into(), frame_latencies.len() as f64); + + vec![BenchmarkResult { + category: "growth".into(), + test_name: "scroll_during_append".into(), + backend: "pread".into(), + variant: "plain".into(), + latency_us: frame_latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_high_frequency_append( + config: &BenchConfig, + dir: &std::path::Path, +) -> Vec { + let sub_dir = dir.join("high_freq_append"); + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file"); + let duration_secs: u64 = if config.quick_mode { 3 } else { 30 }; + let append_rate: usize = if config.quick_mode { 1000 } else { 10000 }; + let batch_size: usize = 100; + let batches_per_sec = append_rate / batch_size; + let total_batches = (duration_secs as usize * batches_per_sec).max(1); + + let mut detect_latencies = Vec::with_capacity(total_batches); + + for _ in 0..total_batches { + data_gen::append_lines(&path, batch_size).expect("Failed to append"); + + let t = std::time::Instant::now(); + if let Ok(reader) = PreadReaderPlain::open(&path) { + let total = reader.total_lines(); + let _ = reader.get_line(total.saturating_sub(1)); + reader.close(); + } + detect_latencies.push(t.elapsed().as_micros() as u64); + + std::thread::sleep(std::time::Duration::from_micros( + 1_000_000 / batches_per_sec as u64, + )); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("duration_secs".into(), duration_secs as f64); + extra.insert("append_rate_per_sec".into(), append_rate as f64); + extra.insert("batch_size".into(), batch_size as f64); + extra.insert("total_batches".into(), total_batches as f64); + + vec![BenchmarkResult { + category: "growth".into(), + test_name: "high_frequency_append".into(), + backend: "pread".into(), + variant: "plain".into(), + latency_us: detect_latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/jump.rs b/crates/bench/src/suites/jump.rs new file mode 100644 index 0000000..b49b695 --- /dev/null +++ b/crates/bench/src/suites/jump.rs @@ -0,0 +1,246 @@ +use std::collections::HashMap; + +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{ + MmapReaderPhaseAware, MmapReaderPlain, MmapReaderPopulate, MmapReaderRandom, + MmapReaderSequential, +}; +use crate::pread_reader::{PreadReaderPlain, PreadReaderRandom, PreadReaderSequential}; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + results.extend(bench_near_jump::("mmap", "plain", config)); + results.extend(bench_near_jump::( + "mmap", + "sequential", + config, + )); + results.extend(bench_near_jump::( + "mmap", "random", config, + )); + results.extend(bench_near_jump::( + "mmap", "populate", config, + )); + results.extend(bench_near_jump::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_near_jump::( + "pread", "plain", config, + )); + results.extend(bench_near_jump::( + "pread", "random", config, + )); + results.extend(bench_near_jump::( + "pread", + "sequential", + config, + )); + + results.extend(bench_far_jump::("mmap", "plain", config)); + results.extend(bench_far_jump::( + "mmap", + "sequential", + config, + )); + results.extend(bench_far_jump::("mmap", "random", config)); + results.extend(bench_far_jump::( + "mmap", "populate", config, + )); + results.extend(bench_far_jump::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_far_jump::("pread", "plain", config)); + results.extend(bench_far_jump::( + "pread", "random", config, + )); + results.extend(bench_far_jump::( + "pread", + "sequential", + config, + )); + + results.extend(bench_jump_end::("mmap", "plain", config)); + results.extend(bench_jump_end::("pread", "plain", config)); + + results.extend(bench_reverse_scan::( + "mmap", "plain", config, + )); + results.extend(bench_reverse_scan::( + "pread", "plain", config, + )); + + results +} + +fn bench_near_jump( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + let iterations: usize = if config.quick_mode { 10 } else { 100 }; + + let mut latencies = Vec::with_capacity(iterations); + let mut current = 0usize; + + for _ in 0..iterations { + let target = (current + 15).min(total.saturating_sub(1)); + let t = std::time::Instant::now(); + let _ = reader.get_line(target); + latencies.push(t.elapsed().as_micros() as u64); + current = target; + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + reader.close(); + + vec![BenchmarkResult { + category: "jump".into(), + test_name: "near_jump".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra: HashMap::new(), + }] +} + +fn bench_far_jump( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + let repetitions: usize = if config.quick_mode { 3 } else { 10 }; + + let fractions = [0.25, 0.50, 0.75]; + let mut latencies = Vec::new(); + let mut extra = HashMap::new(); + + for &frac in &fractions { + let target = ((total as f64 * frac) as usize).min(total.saturating_sub(1)); + for _ in 0..repetitions { + let t = std::time::Instant::now(); + let _ = reader.get_line(target); + latencies.push(t.elapsed().as_micros() as u64); + } + } + + extra.insert("jump_positions".into(), 3.0); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + reader.close(); + + vec![BenchmarkResult { + category: "jump".into(), + test_name: "far_jump".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_jump_end( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + let iterations: usize = if config.quick_mode { 5 } else { 10 }; + + let last_line = total.saturating_sub(1); + let mut latencies = Vec::with_capacity(iterations); + + for _ in 0..iterations { + let t = std::time::Instant::now(); + let _ = reader.get_line(last_line); + latencies.push(t.elapsed().as_micros() as u64); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("last_line_idx".into(), last_line as f64); + + reader.close(); + + vec![BenchmarkResult { + category: "jump".into(), + test_name: "jump_end".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_reverse_scan( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + let iterations: usize = if config.quick_mode { 5 } else { 10 }; + + let mut latencies = Vec::with_capacity(iterations * 35); + + for _ in 0..iterations { + // Read lines backwards from end: last line, last-1, ..., last-34 + let start = total.saturating_sub(35); + for i in (start..total).rev() { + let t = std::time::Instant::now(); + let _ = reader.get_line(i); + latencies.push(t.elapsed().as_micros() as u64); + } + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + reader.close(); + + let mut extra = HashMap::new(); + extra.insert("lines_per_scan".into(), 35.0); + extra.insert("iterations".into(), iterations as f64); + + vec![BenchmarkResult { + category: "jump".into(), + test_name: "reverse_scan".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/memory.rs b/crates/bench/src/suites/memory.rs new file mode 100644 index 0000000..684d9fe --- /dev/null +++ b/crates/bench/src/suites/memory.rs @@ -0,0 +1,233 @@ +use std::collections::HashMap; + +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{ + MmapReaderPhaseAware, MmapReaderPlain, MmapReaderPopulate, MmapReaderRandom, + MmapReaderSequential, +}; +use crate::pread_reader::{PreadReaderPlain, PreadReaderRandom, PreadReaderSequential}; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + results.extend(bench_idle_rss::("mmap", "plain", config)); + results.extend(bench_idle_rss::( + "mmap", + "sequential", + config, + )); + results.extend(bench_idle_rss::("mmap", "random", config)); + results.extend(bench_idle_rss::( + "mmap", "populate", config, + )); + results.extend(bench_idle_rss::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_idle_rss::("pread", "plain", config)); + results.extend(bench_idle_rss::( + "pread", "random", config, + )); + results.extend(bench_idle_rss::( + "pread", + "sequential", + config, + )); + + results.extend(bench_scroll_rss::("mmap", "plain", config)); + results.extend(bench_scroll_rss::( + "pread", "plain", config, + )); + + results.extend(bench_jump_end_rss::( + "mmap", "plain", config, + )); + results.extend(bench_jump_end_rss::( + "pread", "plain", config, + )); + + results.extend(bench_rss_reclaim::( + "mmap", "plain", config, + )); + results.extend(bench_rss_reclaim::( + "pread", "plain", config, + )); + + results +} + +fn bench_idle_rss( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let reader = B::open(&config.test_file).expect("Failed to open file"); + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("total_lines".into(), reader.total_lines() as f64); + extra.insert( + "file_size_mb".into(), + reader.file_size() as f64 / (1024.0 * 1024.0), + ); + + reader.close(); + + vec![BenchmarkResult { + category: "memory".into(), + test_name: "idle_rss".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: vec![], + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_scroll_rss( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let reader = B::open(&config.test_file).expect("Failed to open file"); + let total = reader.total_lines(); + let sample_interval = 100_000; + let max_lines = if config.quick_mode { 100_000 } else { total }; + + let mut rss_samples = Vec::new(); + let mut hwm_samples = Vec::new(); + + for i in (0..max_lines).step_by(sample_interval) { + let _ = reader.get_line(i); + let rss = MetricsCollector::read_rss(); + rss_samples.push(rss.vm_rss_kb); + hwm_samples.push(rss.vm_hwm_kb); + } + + let final_rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("rss_samples_count".into(), rss_samples.len() as f64); + extra.insert( + "max_rss_kb".into(), + rss_samples.iter().copied().fold(0u64, u64::max) as f64, + ); + extra.insert( + "max_hwm_kb".into(), + hwm_samples.iter().copied().fold(0u64, u64::max) as f64, + ); + extra.insert("lines_read".into(), max_lines.min(total) as f64); + + reader.close(); + + vec![BenchmarkResult { + category: "memory".into(), + test_name: "scroll_rss".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: vec![], + rss_kb: final_rss.vm_rss_kb, + rss_peak_kb: final_rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_jump_end_rss( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let reader = B::open(&config.test_file).expect("Failed to open file"); + let total = reader.total_lines(); + let last_line = total.saturating_sub(1); + + let _ = reader.get_line(last_line); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("last_line_idx".into(), last_line as f64); + extra.insert("total_lines".into(), total as f64); + + reader.close(); + + vec![BenchmarkResult { + category: "memory".into(), + test_name: "jump_end_rss".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: vec![], + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_rss_reclaim( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let reader = B::open(&config.test_file).expect("Failed to open file"); + let total = reader.total_lines(); + let last_line = total.saturating_sub(1); + + let _ = reader.get_line(last_line); + + let wait_secs: u64 = if config.quick_mode { 5 } else { 30 }; + let sample_interval: u64 = 5; + let num_samples = (wait_secs / sample_interval) as usize; + + let mut rss_samples = Vec::with_capacity(num_samples); + let mut hwm_samples = Vec::with_capacity(num_samples); + + for _ in 0..num_samples { + std::thread::sleep(std::time::Duration::from_secs(sample_interval)); + let rss = MetricsCollector::read_rss(); + rss_samples.push(rss.vm_rss_kb); + hwm_samples.push(rss.vm_hwm_kb); + } + + let final_rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + reader.close(); + + let mut extra = HashMap::new(); + extra.insert("wait_total_secs".into(), wait_secs as f64); + extra.insert("rss_samples".into(), rss_samples.len() as f64); + if let (Some(&first), Some(&last)) = (rss_samples.first(), rss_samples.last()) { + extra.insert("rss_first_kb".into(), first as f64); + extra.insert("rss_last_kb".into(), last as f64); + extra.insert( + "rss_change_pct".into(), + if first > 0 { + ((last as f64 - first as f64) / first as f64) * 100.0 + } else { + 0.0 + }, + ); + } + + vec![BenchmarkResult { + category: "memory".into(), + test_name: "rss_reclaim".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: vec![], + rss_kb: final_rss.vm_rss_kb, + rss_peak_kb: final_rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/mod.rs b/crates/bench/src/suites/mod.rs new file mode 100644 index 0000000..7c72fea --- /dev/null +++ b/crates/bench/src/suites/mod.rs @@ -0,0 +1,7 @@ +pub mod concurrent; +pub mod growth; +pub mod jump; +pub mod memory; +pub mod render; +pub mod rotation; +pub mod startup; diff --git a/crates/bench/src/suites/render.rs b/crates/bench/src/suites/render.rs new file mode 100644 index 0000000..59147de --- /dev/null +++ b/crates/bench/src/suites/render.rs @@ -0,0 +1,177 @@ +use std::collections::HashMap; + +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{ + MmapReaderPhaseAware, MmapReaderPlain, MmapReaderPopulate, MmapReaderRandom, + MmapReaderSequential, +}; +use crate::pread_reader::{PreadReaderPlain, PreadReaderRandom, PreadReaderSequential}; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + results.extend(bench_single_frame::( + "mmap", "plain", config, + )); + results.extend(bench_single_frame::( + "mmap", + "sequential", + config, + )); + results.extend(bench_single_frame::( + "mmap", "random", config, + )); + results.extend(bench_single_frame::( + "mmap", "populate", config, + )); + results.extend(bench_single_frame::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_single_frame::( + "pread", "plain", config, + )); + results.extend(bench_single_frame::( + "pread", "random", config, + )); + results.extend(bench_single_frame::( + "pread", + "sequential", + config, + )); + + results.extend(bench_continuous_scroll::( + "mmap", "plain", config, + )); + results.extend(bench_continuous_scroll::( + "mmap", + "sequential", + config, + )); + results.extend(bench_continuous_scroll::( + "mmap", "random", config, + )); + results.extend(bench_continuous_scroll::( + "mmap", "populate", config, + )); + results.extend(bench_continuous_scroll::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_continuous_scroll::( + "pread", "plain", config, + )); + results.extend(bench_continuous_scroll::( + "pread", "random", config, + )); + results.extend(bench_continuous_scroll::( + "pread", + "sequential", + config, + )); + + results +} + +fn bench_single_frame( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + let mut results = Vec::new(); + + let positions = [ + ("head", 0), + ("middle", total / 2), + ("tail", total.saturating_sub(35)), + ]; + + for (pos_name, start_line) in positions { + let mut latencies = Vec::with_capacity(35); + for i in 0..35 { + let line_idx = start_line + i; + if line_idx >= total { + break; + } + let t = std::time::Instant::now(); + let _ = reader.get_line(line_idx); + latencies.push(t.elapsed().as_micros() as u64); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + results.push(BenchmarkResult { + category: "render".into(), + test_name: format!("single_frame_{pos_name}"), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra: HashMap::new(), + }); + } + + reader.close(); + results +} + +fn bench_continuous_scroll( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let path = &config.test_file; + let reader = B::open(path).expect("Failed to open file"); + let total = reader.total_lines(); + + let iterations = if config.quick_mode { 100 } else { 1000 }; + let timeout = if config.quick_mode { + std::time::Duration::from_secs(1) + } else { + std::time::Duration::from_secs(10) + }; + + let mut latencies = Vec::with_capacity(iterations); + let start = std::time::Instant::now(); + + for i in 0..iterations { + if start.elapsed() > timeout { + break; + } + let line_idx = i % total.max(1); + let t = std::time::Instant::now(); + let _ = reader.get_line(line_idx); + latencies.push(t.elapsed().as_micros() as u64); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("total_lines_scrolled".into(), latencies.len() as f64); + + reader.close(); + + vec![BenchmarkResult { + category: "render".into(), + test_name: "continuous_scroll".into(), + backend: backend.into(), + variant: variant.into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/rotation.rs b/crates/bench/src/suites/rotation.rs new file mode 100644 index 0000000..b91d273 --- /dev/null +++ b/crates/bench/src/suites/rotation.rs @@ -0,0 +1,180 @@ +use std::collections::HashMap; + +use crate::data_gen; +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{self, MmapReaderPlain}; +use crate::pread_reader::PreadReaderPlain; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + results.extend(bench_truncate_safety_mmap(config, dir.path())); + results.extend(bench_truncate_safety_pread(config, dir.path())); + results.extend(bench_rotation_detection(config, dir.path())); + + results +} + +fn bench_truncate_safety_mmap( + _config: &BenchConfig, + dir: &std::path::Path, +) -> Vec { + let sub_dir = dir.join("trunc_mmap"); + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file"); + let iterations: usize = if _config.quick_mode { 3 } else { 10 }; + + let mut latencies = Vec::with_capacity(iterations); + let mut sigbus_detected = 0usize; + + for _ in 0..iterations { + mmap_reader::reset_sigbus_flag(); + + let reader = MmapReaderPlain::open(&path).expect("Failed to open file"); + let original_size = reader.file_size(); + + let truncate_size = original_size / 2; + data_gen::truncate_file(&path, truncate_size).expect("Failed to truncate"); + + let t = std::time::Instant::now(); + let mid_offset = original_size as u64 / 2; + let _ = reader.read_range(mid_offset, 64); + latencies.push(t.elapsed().as_micros() as u64); + + if mmap_reader::sigbus_flag() { + sigbus_detected += 1; + } + reader.close(); + + let mut f = std::fs::File::create(&path).expect("Failed to recreate file"); + use std::io::Write; + for i in 0..1000u64 { + writeln!(f, "restored line {i}").unwrap(); + } + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("iterations".into(), iterations as f64); + extra.insert("sigbus_detected".into(), sigbus_detected as f64); + extra.insert("crashed".into(), 0.0); + + vec![BenchmarkResult { + category: "rotation".into(), + test_name: "truncate_safety_mmap".into(), + backend: "mmap".into(), + variant: "plain".into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_truncate_safety_pread( + _config: &BenchConfig, + dir: &std::path::Path, +) -> Vec { + let sub_dir = dir.join("trunc_pread"); + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file"); + let iterations: usize = if _config.quick_mode { 3 } else { 10 }; + + let mut latencies = Vec::with_capacity(iterations); + let mut error_count = 0usize; + + for _ in 0..iterations { + let reader = PreadReaderPlain::open(&path).expect("Failed to open file"); + let original_size = reader.file_size(); + + let truncate_size = original_size / 2; + data_gen::truncate_file(&path, truncate_size).expect("Failed to truncate"); + + let t = std::time::Instant::now(); + let mid_offset = original_size as u64 / 2; + let result = reader.read_range(mid_offset, 64); + latencies.push(t.elapsed().as_micros() as u64); + + if result.is_none() { + error_count += 1; + } + reader.close(); + + let mut f = std::fs::File::create(&path).expect("Failed to recreate file"); + use std::io::Write; + for i in 0..1000u64 { + writeln!(f, "restored line {i}").unwrap(); + } + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("iterations".into(), iterations as f64); + extra.insert("errors_returned".into(), error_count as f64); + extra.insert("crashed".into(), 0.0); + + vec![BenchmarkResult { + category: "rotation".into(), + test_name: "truncate_safety_pread".into(), + backend: "pread".into(), + variant: "plain".into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} + +fn bench_rotation_detection(_config: &BenchConfig, dir: &std::path::Path) -> Vec { + let sub_dir = dir.join("rotation_detect"); + let iterations: usize = if _config.quick_mode { 3 } else { 10 }; + + let mut latencies = Vec::with_capacity(iterations); + let mut detected_count = 0usize; + + for _ in 0..iterations { + let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file"); + let original_inode = MetricsCollector::get_inode(&path).expect("Failed to get inode"); + + let _rotated = data_gen::rotate_file(&path).expect("Failed to rotate file"); + + let t = std::time::Instant::now(); + let detected = MetricsCollector::detect_rotation(original_inode, &path); + latencies.push(t.elapsed().as_micros() as u64); + + if detected { + detected_count += 1; + } + + let _ = std::fs::remove_file(&path); + let rotated = sub_dir.join("growable.log.1"); + let _ = std::fs::remove_file(&rotated); + } + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let mut extra = HashMap::new(); + extra.insert("iterations".into(), iterations as f64); + extra.insert("rotations_detected".into(), detected_count as f64); + + vec![BenchmarkResult { + category: "rotation".into(), + test_name: "rotation_detection".into(), + backend: "both".into(), + variant: "plain".into(), + latency_us: latencies, + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra, + }] +} diff --git a/crates/bench/src/suites/startup.rs b/crates/bench/src/suites/startup.rs new file mode 100644 index 0000000..94edebf --- /dev/null +++ b/crates/bench/src/suites/startup.rs @@ -0,0 +1,129 @@ +use std::collections::HashMap; +use std::path::Path; + +use crate::metrics::MetricsCollector; +use crate::mmap_reader::{ + MmapReaderPhaseAware, MmapReaderPlain, MmapReaderPopulate, MmapReaderRandom, + MmapReaderSequential, +}; +use crate::pread_reader::{PreadReaderPlain, PreadReaderRandom, PreadReaderSequential}; +use crate::runner::BenchConfig; +use crate::types::BenchmarkResult; +use crate::FileReaderBackend; + +pub fn run(config: &BenchConfig) -> Vec { + let mut results = Vec::new(); + + results.extend(bench_hot_open::("mmap", "plain", config)); + results.extend(bench_hot_open::( + "mmap", + "sequential", + config, + )); + results.extend(bench_hot_open::("mmap", "random", config)); + results.extend(bench_hot_open::( + "mmap", "populate", config, + )); + results.extend(bench_hot_open::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_hot_open::("pread", "plain", config)); + results.extend(bench_hot_open::( + "pread", "random", config, + )); + results.extend(bench_hot_open::( + "pread", + "sequential", + config, + )); + + if !config.quick_mode { + if MetricsCollector::clear_file_cache(&config.test_file).is_ok() { + results.extend(bench_cold_open::("mmap", "plain", config)); + results.extend(bench_cold_open::( + "mmap", + "sequential", + config, + )); + results.extend(bench_cold_open::( + "mmap", "random", config, + )); + results.extend(bench_cold_open::( + "mmap", "populate", config, + )); + results.extend(bench_cold_open::( + "mmap", + "phase_aware", + config, + )); + results.extend(bench_cold_open::( + "pread", "plain", config, + )); + results.extend(bench_cold_open::( + "pread", "random", config, + )); + results.extend(bench_cold_open::( + "pread", + "sequential", + config, + )); + } + } + + results +} + +fn bench_hot_open( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + open_and_measure::(backend, variant, &config.test_file, "hot_open") +} + +fn bench_cold_open( + backend: &str, + variant: &str, + config: &BenchConfig, +) -> Vec { + let _ = MetricsCollector::clear_file_cache(&config.test_file); + open_and_measure::(backend, variant, &config.test_file, "cold_open") +} + +fn open_and_measure( + backend: &str, + variant: &str, + path: &Path, + test_name: &str, +) -> Vec { + let start = std::time::Instant::now(); + let reader = B::open(path).expect("Failed to open file"); + let elapsed = start.elapsed(); + + let rss = MetricsCollector::read_rss(); + let faults = MetricsCollector::read_page_faults(); + + let result = BenchmarkResult { + category: "startup".into(), + test_name: test_name.into(), + backend: backend.into(), + variant: variant.into(), + latency_us: vec![elapsed.as_micros() as u64], + rss_kb: rss.vm_rss_kb, + rss_peak_kb: rss.vm_hwm_kb, + page_faults: faults.minor_faults + faults.major_faults, + extra: { + let mut m = HashMap::new(); + m.insert("total_lines".into(), reader.total_lines() as f64); + m.insert( + "file_size_mb".into(), + reader.file_size() as f64 / (1024.0 * 1024.0), + ); + m + }, + }; + reader.close(); + vec![result] +} diff --git a/crates/bench/src/types.rs b/crates/bench/src/types.rs new file mode 100644 index 0000000..9eb0f62 --- /dev/null +++ b/crates/bench/src/types.rs @@ -0,0 +1,13 @@ +use std::collections::HashMap; + +pub struct BenchmarkResult { + pub category: String, + pub test_name: String, + pub backend: String, + pub variant: String, + pub latency_us: Vec, + pub rss_kb: u64, + pub rss_peak_kb: u64, + pub page_faults: u64, + pub extra: HashMap, +}