753 lines
25 KiB
Rust
753 lines
25 KiB
Rust
// ─── mmap_reader.rs ──────────────────────────────────────────────────────────
|
|
// mmap-based FileReaderBackend implementations with 5 variants for benchmarking.
|
|
// Includes SIGBUS handler for file-truncation resilience and remap support
|
|
// for growing-file scenarios.
|
|
// ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
use std::fs::File;
|
|
use std::io::BufReader;
|
|
use std::path::Path;
|
|
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, Ordering};
|
|
use std::sync::Once;
|
|
|
|
use memmap2::{Advice, Mmap, MmapOptions, RemapOptions};
|
|
use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal};
|
|
|
|
use crate::line_index::LineIndex;
|
|
use crate::FileReaderBackend;
|
|
|
|
// ─── SIGBUS Handler ──────────────────────────────────────────────────────────
|
|
//
|
|
// Signal-safety architecture:
|
|
// - Old handler state is stored in raw atomics (AtomicU8 + AtomicPtr) that are
|
|
// async-signal-safe to read — no OnceLock, Mutex, or other non-trivial abstractions
|
|
// in the signal handler path.
|
|
// - Installation uses `Once` for idempotency and follows a strict publish-then-install
|
|
// sequence to close the "handler-active-but-state-unpublished" race window.
|
|
|
|
/// Global flag set by the SIGBUS handler when a bus error is intercepted.
|
|
/// Process-global: concurrent benchmarks that reset/check this flag may interfere.
|
|
/// Currently acceptable because only `rotation` suite (sequential) uses it.
|
|
static SIGBUS_OCCURRED: AtomicBool = AtomicBool::new(false);
|
|
|
|
/// Discriminant values for old SIGBUS handler type.
|
|
const HANDLER_NONE: u8 = 0;
|
|
const HANDLER_DEFAULT: u8 = 1;
|
|
const HANDLER_IGNORE: u8 = 2;
|
|
const HANDLER_PLAIN: u8 = 3; // extern "C" fn(c_int)
|
|
#[allow(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")]
|
|
const HANDLER_SIGACTION: u8 = 4; // extern "C" fn(c_int, *mut siginfo_t, *mut c_void)
|
|
|
|
/// Old SIGBUS handler type — raw atomic, async-signal-safe to read.
|
|
static OLD_HANDLER_KIND: AtomicU8 = AtomicU8::new(HANDLER_NONE);
|
|
|
|
/// Old SIGBUS handler function pointer — raw atomic, async-signal-safe to read.
|
|
static OLD_HANDLER_PTR: AtomicPtr<std::ffi::c_void> = AtomicPtr::new(std::ptr::null_mut());
|
|
|
|
/// Ensures `install_sigbus_handler` runs exactly once across all threads.
|
|
static INSTALL_ONCE: Once = Once::new();
|
|
|
|
/// Returns `true` if a SIGBUS was intercepted since the last reset.
|
|
pub fn sigbus_flag() -> bool {
|
|
SIGBUS_OCCURRED.load(Ordering::SeqCst)
|
|
}
|
|
|
|
/// Resets the SIGBUS flag. Call before operations where you want to detect
|
|
/// a fresh SIGBUS.
|
|
pub fn reset_sigbus_flag() {
|
|
SIGBUS_OCCURRED.store(false, Ordering::SeqCst)
|
|
}
|
|
|
|
/// SIGBUS signal handler.
|
|
///
|
|
/// # Safety Constraints (signal handler context)
|
|
/// - NO TLS access
|
|
/// - NO memory allocation
|
|
/// - NO lock acquisition
|
|
/// - Only: AtomicBool store → mmap → raw atomic loads → chain
|
|
extern "C" fn sigbus_handler(
|
|
sig: libc::c_int,
|
|
info: *mut libc::siginfo_t,
|
|
ctx: *mut std::ffi::c_void,
|
|
) {
|
|
SIGBUS_OCCURRED.store(true, Ordering::SeqCst);
|
|
|
|
// Align fault address down to page boundary.
|
|
// si_addr is NOT guaranteed to be page-aligned; without this, mmap(MAP_FIXED)
|
|
// could replace the wrong page.
|
|
let addr = unsafe { (*info).si_addr() };
|
|
let aligned = (addr as usize & !0xFFF) as *mut libc::c_void;
|
|
|
|
// Map anonymous zero page at fault address to prevent crash on file truncation.
|
|
let result = unsafe {
|
|
libc::mmap(
|
|
aligned,
|
|
4096,
|
|
libc::PROT_READ,
|
|
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_FIXED,
|
|
-1,
|
|
0,
|
|
)
|
|
};
|
|
if result == libc::MAP_FAILED {
|
|
// Chain to old handler via raw atomics (async-signal-safe).
|
|
let kind = OLD_HANDLER_KIND.load(Ordering::Acquire);
|
|
match kind {
|
|
HANDLER_PLAIN => {
|
|
let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire);
|
|
if !ptr.is_null() {
|
|
// SAFETY: ptr was derived from a valid function pointer
|
|
// published by install_sigbus_handler before this handler was installed.
|
|
let f: extern "C" fn(libc::c_int) = unsafe { std::mem::transmute(ptr) };
|
|
f(sig);
|
|
} else {
|
|
unsafe { libc::_exit(128 + sig) };
|
|
}
|
|
}
|
|
HANDLER_SIGACTION => {
|
|
let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire);
|
|
if !ptr.is_null() {
|
|
let f: extern "C" fn(
|
|
libc::c_int,
|
|
*mut libc::siginfo_t,
|
|
*mut std::ffi::c_void,
|
|
) = unsafe { std::mem::transmute(ptr) };
|
|
f(sig, info, ctx);
|
|
} else {
|
|
unsafe { libc::_exit(128 + sig) };
|
|
}
|
|
}
|
|
_ => {
|
|
// HANDLER_NONE / HANDLER_DEFAULT / HANDLER_IGNORE — no safe chaining.
|
|
unsafe { libc::_exit(128 + sig) };
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Install the SIGBUS handler via sigaction(). Chains to any previous handler.
|
|
/// Uses `Once` to guarantee exactly-once installation — safe to call from
|
|
/// multiple threads concurrently.
|
|
///
|
|
/// The install sequence closes the "install-then-publish" race by:
|
|
/// 1. Querying the current handler via raw `libc::sigaction` (no modification).
|
|
/// 2. Publishing old handler state to raw atomics (`Release` stores).
|
|
/// 3. Installing our handler via `nix::sigaction`.
|
|
///
|
|
/// This ensures the atomics are readable *before* our handler can fire.
|
|
fn install_sigbus_handler() {
|
|
INSTALL_ONCE.call_once(|| {
|
|
// Step 1: Query current SIGBUS disposition without changing it.
|
|
let mut old_act: libc::sigaction = unsafe { std::mem::zeroed() };
|
|
let ret = unsafe { libc::sigaction(libc::SIGBUS, std::ptr::null(), &mut old_act) };
|
|
if ret != 0 {
|
|
// Failed to query — skip installation entirely.
|
|
return;
|
|
}
|
|
|
|
// Step 2: Publish old handler to atomics BEFORE installing ours.
|
|
let is_siginfo = (old_act.sa_flags & libc::SA_SIGINFO) != 0;
|
|
|
|
let raw_usize = old_act.sa_sigaction as usize;
|
|
|
|
if raw_usize == 0 {
|
|
// SIG_DFL (null function pointer → default disposition)
|
|
OLD_HANDLER_KIND.store(HANDLER_DEFAULT, Ordering::Release);
|
|
} else if raw_usize == 1 {
|
|
// SIG_IGN (sentinel value 1 → ignore disposition)
|
|
OLD_HANDLER_KIND.store(HANDLER_IGNORE, Ordering::Release);
|
|
} else {
|
|
// Real handler function pointer
|
|
OLD_HANDLER_PTR.store(raw_usize as *mut std::ffi::c_void, Ordering::Release);
|
|
if is_siginfo {
|
|
OLD_HANDLER_KIND.store(HANDLER_SIGACTION, Ordering::Release);
|
|
} else {
|
|
OLD_HANDLER_KIND.store(HANDLER_PLAIN, Ordering::Release);
|
|
}
|
|
}
|
|
|
|
// Step 3: Install our handler. Atomics are already published,
|
|
// so the handler can safely read them from the moment it's installed.
|
|
let new_action = SigAction::new(
|
|
SigHandler::SigAction(sigbus_handler),
|
|
SaFlags::SA_SIGINFO,
|
|
SigSet::empty(),
|
|
);
|
|
let _ = unsafe { sigaction(Signal::SIGBUS, &new_action) };
|
|
});
|
|
}
|
|
|
|
// ─── Core MmapReader ─────────────────────────────────────────────────────────
|
|
|
|
/// Core mmap-based reader. Holds the memory mapping, file handle, and
|
|
/// line index. Used as the engine inside each variant wrapper.
|
|
pub struct MmapReader {
|
|
mmap: Mmap,
|
|
#[allow(dead_code)]
|
|
file: File,
|
|
line_index: LineIndex,
|
|
file_size: u64,
|
|
}
|
|
|
|
impl MmapReader {
|
|
/// Open a file and create an mmap. Does NOT apply madvise.
|
|
/// The caller is responsible for applying the desired advice.
|
|
fn open_raw(path: &Path) -> std::io::Result<Self> {
|
|
let file = File::open(path)?;
|
|
let file_size = file.metadata()?.len();
|
|
|
|
let mmap = unsafe { Mmap::map(&file)? };
|
|
|
|
let line_index = {
|
|
let mut reader = BufReader::new(&file);
|
|
LineIndex::from_reader(&mut reader)?
|
|
};
|
|
|
|
Ok(Self {
|
|
mmap,
|
|
file,
|
|
line_index,
|
|
file_size,
|
|
})
|
|
}
|
|
|
|
/// Open with MmapOptions (for populate, etc.).
|
|
fn open_with_options(path: &Path, opts: &MmapOptions) -> std::io::Result<Self> {
|
|
let file = File::open(path)?;
|
|
let file_size = file.metadata()?.len();
|
|
|
|
let mmap = if file_size == 0 {
|
|
unsafe { Mmap::map(&file)? }
|
|
} else {
|
|
unsafe { opts.map(&file)? }
|
|
};
|
|
|
|
let line_index = {
|
|
let mut reader = BufReader::new(&file);
|
|
LineIndex::from_reader(&mut reader)?
|
|
};
|
|
|
|
Ok(Self {
|
|
mmap,
|
|
file,
|
|
line_index,
|
|
file_size,
|
|
})
|
|
}
|
|
|
|
/// Apply madvise to the entire mapping.
|
|
fn advise(&self, advice: Advice) {
|
|
let _ = self.mmap.advise(advice);
|
|
}
|
|
|
|
/// Grow the mmap to `new_size` bytes.
|
|
///
|
|
/// # Safety
|
|
/// Caller must ensure no `&[u8]` references to the old mmap data exist.
|
|
/// After remap, any pointers derived from the old mapping may be invalid
|
|
/// (if the kernel moved the mapping).
|
|
pub unsafe fn remap(&mut self, new_size: usize) -> std::io::Result<()> {
|
|
unsafe {
|
|
self.mmap
|
|
.remap(new_size, RemapOptions::new().may_move(true))?;
|
|
}
|
|
self.file_size = new_size as u64;
|
|
Ok(())
|
|
}
|
|
|
|
#[inline]
|
|
pub fn file_size(&self) -> u64 {
|
|
self.file_size
|
|
}
|
|
|
|
#[inline]
|
|
pub fn total_lines(&self) -> usize {
|
|
self.line_index.line_count()
|
|
}
|
|
|
|
#[inline]
|
|
pub fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.line_index
|
|
.get_line(&self.mmap, idx)
|
|
.map(|s| s.to_owned())
|
|
}
|
|
|
|
#[inline]
|
|
pub fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
let start = offset as usize;
|
|
let end = start.checked_add(len)?;
|
|
if end > self.mmap.len() {
|
|
return None;
|
|
}
|
|
Some(self.mmap[start..end].to_vec())
|
|
}
|
|
}
|
|
|
|
// ─── 5 Variants ──────────────────────────────────────────────────────────────
|
|
|
|
/// Variant 1: Plain mmap, no madvise.
|
|
pub struct MmapReaderPlain {
|
|
inner: MmapReader,
|
|
}
|
|
|
|
impl MmapReaderPlain {
|
|
pub fn remap(&mut self, new_size: usize) -> std::io::Result<()> {
|
|
unsafe { self.inner.remap(new_size) }
|
|
}
|
|
}
|
|
|
|
impl FileReaderBackend for MmapReaderPlain {
|
|
fn name(&self) -> &str {
|
|
"mmap_plain"
|
|
}
|
|
|
|
fn open(path: &Path) -> std::io::Result<Self>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
install_sigbus_handler();
|
|
let inner = MmapReader::open_raw(path)?;
|
|
Ok(Self { inner })
|
|
}
|
|
|
|
fn file_size(&self) -> u64 {
|
|
self.inner.file_size()
|
|
}
|
|
|
|
fn total_lines(&self) -> usize {
|
|
self.inner.total_lines()
|
|
}
|
|
|
|
fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.inner.get_line(idx)
|
|
}
|
|
|
|
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
self.inner.read_range(offset, len)
|
|
}
|
|
|
|
fn close(self) {}
|
|
}
|
|
|
|
/// Variant 2: mmap with MADV_SEQUENTIAL — optimal for sequential scan (index build).
|
|
pub struct MmapReaderSequential {
|
|
inner: MmapReader,
|
|
}
|
|
|
|
impl FileReaderBackend for MmapReaderSequential {
|
|
fn name(&self) -> &str {
|
|
"mmap_sequential"
|
|
}
|
|
|
|
fn open(path: &Path) -> std::io::Result<Self>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
install_sigbus_handler();
|
|
let inner = MmapReader::open_raw(path)?;
|
|
inner.advise(Advice::Sequential);
|
|
Ok(Self { inner })
|
|
}
|
|
|
|
fn file_size(&self) -> u64 {
|
|
self.inner.file_size()
|
|
}
|
|
|
|
fn total_lines(&self) -> usize {
|
|
self.inner.total_lines()
|
|
}
|
|
|
|
fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.inner.get_line(idx)
|
|
}
|
|
|
|
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
self.inner.read_range(offset, len)
|
|
}
|
|
|
|
fn close(self) {}
|
|
}
|
|
|
|
/// Variant 3: mmap with MADV_RANDOM — optimal for random line access after index is built.
|
|
pub struct MmapReaderRandom {
|
|
inner: MmapReader,
|
|
}
|
|
|
|
impl FileReaderBackend for MmapReaderRandom {
|
|
fn name(&self) -> &str {
|
|
"mmap_random"
|
|
}
|
|
|
|
fn open(path: &Path) -> std::io::Result<Self>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
install_sigbus_handler();
|
|
let inner = MmapReader::open_raw(path)?;
|
|
inner.advise(Advice::Random);
|
|
Ok(Self { inner })
|
|
}
|
|
|
|
fn file_size(&self) -> u64 {
|
|
self.inner.file_size()
|
|
}
|
|
|
|
fn total_lines(&self) -> usize {
|
|
self.inner.total_lines()
|
|
}
|
|
|
|
fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.inner.get_line(idx)
|
|
}
|
|
|
|
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
self.inner.read_range(offset, len)
|
|
}
|
|
|
|
fn close(self) {}
|
|
}
|
|
|
|
/// Variant 4: mmap with MAP_POPULATE — pre-fault all pages at mmap time.
|
|
/// Trades higher upfront cost for smoother subsequent access.
|
|
pub struct MmapReaderPopulate {
|
|
inner: MmapReader,
|
|
}
|
|
|
|
impl FileReaderBackend for MmapReaderPopulate {
|
|
fn name(&self) -> &str {
|
|
"mmap_populate"
|
|
}
|
|
|
|
fn open(path: &Path) -> std::io::Result<Self>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
install_sigbus_handler();
|
|
let mut opts = MmapOptions::new();
|
|
opts.populate();
|
|
let inner = MmapReader::open_with_options(path, &opts)?;
|
|
Ok(Self { inner })
|
|
}
|
|
|
|
fn file_size(&self) -> u64 {
|
|
self.inner.file_size()
|
|
}
|
|
|
|
fn total_lines(&self) -> usize {
|
|
self.inner.total_lines()
|
|
}
|
|
|
|
fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.inner.get_line(idx)
|
|
}
|
|
|
|
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
self.inner.read_range(offset, len)
|
|
}
|
|
|
|
fn close(self) {}
|
|
}
|
|
|
|
/// Variant 5: Phase-aware — MADV_SEQUENTIAL during index build, then MADV_RANDOM
|
|
/// for line access. Best of both worlds for the read-index-then-query pattern.
|
|
pub struct MmapReaderPhaseAware {
|
|
inner: MmapReader,
|
|
}
|
|
|
|
impl FileReaderBackend for MmapReaderPhaseAware {
|
|
fn name(&self) -> &str {
|
|
"mmap_phase_aware"
|
|
}
|
|
|
|
fn open(path: &Path) -> std::io::Result<Self>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
install_sigbus_handler();
|
|
let inner = MmapReader::open_raw(path)?;
|
|
// Index build used sequential streaming via BufReader. Now switch to random.
|
|
inner.advise(Advice::Random);
|
|
Ok(Self { inner })
|
|
}
|
|
|
|
fn file_size(&self) -> u64 {
|
|
self.inner.file_size()
|
|
}
|
|
|
|
fn total_lines(&self) -> usize {
|
|
self.inner.total_lines()
|
|
}
|
|
|
|
fn get_line(&self, idx: usize) -> Option<String> {
|
|
self.inner.get_line(idx)
|
|
}
|
|
|
|
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
|
|
self.inner.read_range(offset, len)
|
|
}
|
|
|
|
fn close(self) {}
|
|
}
|
|
|
|
// ─── Tests ───────────────────────────────────────────────────────────────────
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::io::Write as _;
|
|
use tempfile::NamedTempFile;
|
|
|
|
fn create_temp_file(content: &[u8]) -> NamedTempFile {
|
|
let mut f = NamedTempFile::new().unwrap();
|
|
f.write_all(content).unwrap();
|
|
f.flush().unwrap();
|
|
f
|
|
}
|
|
|
|
#[test]
|
|
fn test_plain_open_and_read_lines() {
|
|
let f = create_temp_file(b"hello\nworld\nfoo\n");
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
assert_eq!(reader.name(), "mmap_plain");
|
|
assert_eq!(reader.total_lines(), 3);
|
|
assert_eq!(reader.get_line(0), Some("hello".to_owned()));
|
|
assert_eq!(reader.get_line(1), Some("world".to_owned()));
|
|
assert_eq!(reader.get_line(2), Some("foo".to_owned()));
|
|
assert_eq!(reader.get_line(3), None);
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_plain_out_of_bounds_returns_none() {
|
|
let f = create_temp_file(b"line1\nline2\n");
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
assert_eq!(reader.total_lines(), 2);
|
|
assert_eq!(reader.get_line(100), None);
|
|
assert_eq!(reader.get_line(usize::MAX), None);
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_plain_read_range() {
|
|
let content = b"hello world\nfoo bar\n";
|
|
let f = create_temp_file(content);
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
let range = reader.read_range(0, 5).unwrap();
|
|
assert_eq!(&range, b"hello");
|
|
let range = reader.read_range(6, 5).unwrap();
|
|
assert_eq!(&range, b"world");
|
|
assert_eq!(reader.read_range(0, 1000), None);
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_plain_empty_file() {
|
|
let f = create_temp_file(b"");
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
assert_eq!(reader.total_lines(), 0);
|
|
assert_eq!(reader.file_size(), 0);
|
|
assert_eq!(reader.get_line(0), None);
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_sequential_variant() {
|
|
let f = create_temp_file(b"alpha\nbeta\ngamma\n");
|
|
let reader = MmapReaderSequential::open(f.path()).unwrap();
|
|
assert_eq!(reader.name(), "mmap_sequential");
|
|
assert_eq!(reader.total_lines(), 3);
|
|
assert_eq!(reader.get_line(1), Some("beta".to_owned()));
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_random_variant() {
|
|
let f = create_temp_file(b"one\ntwo\nthree\n");
|
|
let reader = MmapReaderRandom::open(f.path()).unwrap();
|
|
assert_eq!(reader.name(), "mmap_random");
|
|
assert_eq!(reader.total_lines(), 3);
|
|
assert_eq!(reader.get_line(2), Some("three".to_owned()));
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_populate_variant() {
|
|
let f = create_temp_file(b"x\ny\nz\n");
|
|
let reader = MmapReaderPopulate::open(f.path()).unwrap();
|
|
assert_eq!(reader.name(), "mmap_populate");
|
|
assert_eq!(reader.total_lines(), 3);
|
|
assert_eq!(reader.get_line(0), Some("x".to_owned()));
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_phase_aware_variant() {
|
|
let f = create_temp_file(b"a\nb\nc\n");
|
|
let reader = MmapReaderPhaseAware::open(f.path()).unwrap();
|
|
assert_eq!(reader.name(), "mmap_phase_aware");
|
|
assert_eq!(reader.total_lines(), 3);
|
|
assert_eq!(reader.get_line(1), Some("b".to_owned()));
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_file_size() {
|
|
let content = b"hello world";
|
|
let f = create_temp_file(content);
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
assert_eq!(reader.file_size(), content.len() as u64);
|
|
reader.close();
|
|
}
|
|
|
|
#[test]
|
|
fn test_sigbus_flag_api() {
|
|
reset_sigbus_flag();
|
|
assert!(!sigbus_flag());
|
|
}
|
|
|
|
#[test]
|
|
fn test_concurrent_open_installs_handler_once() {
|
|
let f = create_temp_file(b"line1\nline2\nline3\n");
|
|
let path = f.path().to_owned();
|
|
let num_threads = 8;
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|thread_id| {
|
|
let path = path.clone();
|
|
std::thread::spawn(move || {
|
|
let reader = match thread_id % 5 {
|
|
0 => {
|
|
let r = MmapReaderPlain::open(&path).unwrap();
|
|
assert_eq!(r.total_lines(), 3);
|
|
r.close();
|
|
"plain"
|
|
}
|
|
1 => {
|
|
let r = MmapReaderSequential::open(&path).unwrap();
|
|
assert_eq!(r.total_lines(), 3);
|
|
r.close();
|
|
"sequential"
|
|
}
|
|
2 => {
|
|
let r = MmapReaderRandom::open(&path).unwrap();
|
|
assert_eq!(r.total_lines(), 3);
|
|
r.close();
|
|
"random"
|
|
}
|
|
3 => {
|
|
let r = MmapReaderPopulate::open(&path).unwrap();
|
|
assert_eq!(r.total_lines(), 3);
|
|
r.close();
|
|
"populate"
|
|
}
|
|
_ => {
|
|
let r = MmapReaderPhaseAware::open(&path).unwrap();
|
|
assert_eq!(r.total_lines(), 3);
|
|
r.close();
|
|
"phase_aware"
|
|
}
|
|
};
|
|
reader.to_owned()
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
let results: Vec<String> = handles
|
|
.into_iter()
|
|
.map(|h| h.join().expect("thread panicked"))
|
|
.collect();
|
|
assert_eq!(results.len(), num_threads);
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_trailing_newline() {
|
|
let f = create_temp_file(b"line1\nline2");
|
|
let reader = MmapReaderPlain::open(f.path()).unwrap();
|
|
assert_eq!(reader.total_lines(), 2);
|
|
assert_eq!(reader.get_line(0), Some("line1".to_owned()));
|
|
assert_eq!(reader.get_line(1), Some("line2".to_owned()));
|
|
reader.close();
|
|
}
|
|
|
|
/// Diagnostic test: measure how long each stage of `open_raw()` takes
|
|
/// on a large file (e.g. the 5GB extreme.log).
|
|
/// Run with: cargo test -p log-viewer-bench --release -- --nocapture diag_open_stages
|
|
#[test]
|
|
fn diag_open_stages() {
|
|
let path = std::path::Path::new("/tmp/test-logviewer/extreme.log");
|
|
if !path.exists() {
|
|
eprintln!("SKIP: test file not found");
|
|
return;
|
|
}
|
|
|
|
// Stage 1: File::open + metadata
|
|
let t0 = std::time::Instant::now();
|
|
let file = File::open(path).unwrap();
|
|
let file_size = file.metadata().unwrap().len();
|
|
let stage1 = t0.elapsed();
|
|
eprintln!(
|
|
"Stage 1 - File::open + metadata: {:.2}ms ({:.1}GB)",
|
|
stage1.as_secs_f64() * 1000.0,
|
|
file_size as f64 / 1073741824.0
|
|
);
|
|
|
|
// Stage 2: mmap::map
|
|
let t1 = std::time::Instant::now();
|
|
let _mmap = unsafe { Mmap::map(&file) }.unwrap();
|
|
let stage2 = t1.elapsed();
|
|
eprintln!(
|
|
"Stage 2 - mmap::map: {:.2}ms",
|
|
stage2.as_secs_f64() * 1000.0
|
|
);
|
|
drop(_mmap);
|
|
|
|
// Stage 3: LineIndex::from_reader via BufReader (default 8KB buffer)
|
|
let t2 = std::time::Instant::now();
|
|
let mut reader = BufReader::new(&file);
|
|
let line_index = crate::line_index::LineIndex::from_reader(&mut reader).unwrap();
|
|
let stage3 = t2.elapsed();
|
|
eprintln!(
|
|
"Stage 3 - LineIndex::from_reader: {:.2}ms ({} lines, {} sampled_offsets)",
|
|
stage3.as_secs_f64() * 1000.0,
|
|
line_index.line_count(),
|
|
line_index.sampled_offsets.len()
|
|
);
|
|
|
|
// Stage 3b: Try with 1MB buffer
|
|
let file2 = File::open(path).unwrap();
|
|
let t2b = std::time::Instant::now();
|
|
let mut reader2 = BufReader::with_capacity(1024 * 1024, &file2);
|
|
let line_index2 = crate::line_index::LineIndex::from_reader(&mut reader2).unwrap();
|
|
let stage3b = t2b.elapsed();
|
|
eprintln!(
|
|
"Stage 3b - LineIndex (1MB buffer): {:.2}ms",
|
|
stage3b.as_secs_f64() * 1000.0
|
|
);
|
|
|
|
eprintln!("\n=== 瓶颈分析 ===");
|
|
let total_ms = stage1.as_secs_f64() * 1000.0
|
|
+ stage2.as_secs_f64() * 1000.0
|
|
+ stage3.as_secs_f64() * 1000.0;
|
|
let total_dur = stage1 + stage2 + stage3;
|
|
eprintln!("Total ~{:.0}ms", total_ms);
|
|
eprintln!(
|
|
" File::open: {:.1}% ({:.0}ms)",
|
|
stage1.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
|
|
stage1.as_secs_f64() * 1000.0
|
|
);
|
|
eprintln!(
|
|
" mmap::map: {:.1}% ({:.0}ms)",
|
|
stage2.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
|
|
stage2.as_secs_f64() * 1000.0
|
|
);
|
|
eprintln!(
|
|
" from_reader: {:.1}% ({:.0}ms)",
|
|
stage3.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
|
|
stage3.as_secs_f64() * 1000.0
|
|
);
|
|
|
|
// Suppress unused warnings
|
|
let _ = line_index2;
|
|
}
|
|
}
|