fix(bench): eliminate SIGBUS handler static mut UB with Once + raw atomics (closes #33)

Replace `static mut OLD_SIGBUS_HANDLER` with AtomicU8 + AtomicPtr to
remove data race UB when concurrent benchmarks call open() from multiple
threads.

Key changes:
- Use `Once::call_once` to guarantee single handler installation
- Publish old handler to atomics BEFORE installing new handler (closes
  the handler-active-but-state-unpublished race window)
- Read atomics with Acquire in signal handler (async-signal-safe)
- Align si_addr to page boundary before mmap(MAP_FIXED)
- Add concurrent test: 8 threads open all 5 variants simultaneously
This commit is contained in:
dailz
2026-06-05 13:22:02 +08:00
parent 534a089b58
commit dad5f5a635
19 changed files with 3562 additions and 0 deletions

View File

@@ -0,0 +1,752 @@
// ─── mmap_reader.rs ──────────────────────────────────────────────────────────
// mmap-based FileReaderBackend implementations with 5 variants for benchmarking.
// Includes SIGBUS handler for file-truncation resilience and remap support
// for growing-file scenarios.
// ──────────────────────────────────────────────────────────────────────────────
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, Ordering};
use std::sync::Once;
use memmap2::{Advice, Mmap, MmapOptions, RemapOptions};
use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal};
use crate::line_index::LineIndex;
use crate::FileReaderBackend;
// ─── SIGBUS Handler ──────────────────────────────────────────────────────────
//
// Signal-safety architecture:
// - Old handler state is stored in raw atomics (AtomicU8 + AtomicPtr) that are
// async-signal-safe to read — no OnceLock, Mutex, or other non-trivial abstractions
// in the signal handler path.
// - Installation uses `Once` for idempotency and follows a strict publish-then-install
// sequence to close the "handler-active-but-state-unpublished" race window.
/// Global flag set by the SIGBUS handler when a bus error is intercepted.
/// Process-global: concurrent benchmarks that reset/check this flag may interfere.
/// Currently acceptable because only `rotation` suite (sequential) uses it.
static SIGBUS_OCCURRED: AtomicBool = AtomicBool::new(false);
/// Discriminant values for old SIGBUS handler type.
const HANDLER_NONE: u8 = 0;
const HANDLER_DEFAULT: u8 = 1;
const HANDLER_IGNORE: u8 = 2;
const HANDLER_PLAIN: u8 = 3; // extern "C" fn(c_int)
#[expect(clippy::unseparated_literal_suffix, reason = "clarity: this is the SA_SIGACTION variant")]
const HANDLER_SIGACTION: u8 = 4; // extern "C" fn(c_int, *mut siginfo_t, *mut c_void)
/// Old SIGBUS handler type — raw atomic, async-signal-safe to read.
static OLD_HANDLER_KIND: AtomicU8 = AtomicU8::new(HANDLER_NONE);
/// Old SIGBUS handler function pointer — raw atomic, async-signal-safe to read.
static OLD_HANDLER_PTR: AtomicPtr<std::ffi::c_void> = AtomicPtr::new(std::ptr::null_mut());
/// Ensures `install_sigbus_handler` runs exactly once across all threads.
static INSTALL_ONCE: Once = Once::new();
/// Returns `true` if a SIGBUS was intercepted since the last reset.
pub fn sigbus_flag() -> bool {
SIGBUS_OCCURRED.load(Ordering::SeqCst)
}
/// Resets the SIGBUS flag. Call before operations where you want to detect
/// a fresh SIGBUS.
pub fn reset_sigbus_flag() {
SIGBUS_OCCURRED.store(false, Ordering::SeqCst)
}
/// SIGBUS signal handler.
///
/// # Safety Constraints (signal handler context)
/// - NO TLS access
/// - NO memory allocation
/// - NO lock acquisition
/// - Only: AtomicBool store → mmap → raw atomic loads → chain
extern "C" fn sigbus_handler(
sig: libc::c_int,
info: *mut libc::siginfo_t,
ctx: *mut std::ffi::c_void,
) {
SIGBUS_OCCURRED.store(true, Ordering::SeqCst);
// Align fault address down to page boundary.
// si_addr is NOT guaranteed to be page-aligned; without this, mmap(MAP_FIXED)
// could replace the wrong page.
let addr = unsafe { (*info).si_addr() };
let aligned = (addr as usize & !0xFFF) as *mut libc::c_void;
// Map anonymous zero page at fault address to prevent crash on file truncation.
let result = unsafe {
libc::mmap(
aligned,
4096,
libc::PROT_READ,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_FIXED,
-1,
0,
)
};
if result == libc::MAP_FAILED {
// Chain to old handler via raw atomics (async-signal-safe).
let kind = OLD_HANDLER_KIND.load(Ordering::Acquire);
match kind {
HANDLER_PLAIN => {
let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire);
if !ptr.is_null() {
// SAFETY: ptr was derived from a valid function pointer
// published by install_sigbus_handler before this handler was installed.
let f: extern "C" fn(libc::c_int) = unsafe { std::mem::transmute(ptr) };
f(sig);
} else {
unsafe { libc::_exit(128 + sig) };
}
}
HANDLER_SIGACTION => {
let ptr = OLD_HANDLER_PTR.load(Ordering::Acquire);
if !ptr.is_null() {
let f: extern "C" fn(
libc::c_int,
*mut libc::siginfo_t,
*mut std::ffi::c_void,
) = unsafe { std::mem::transmute(ptr) };
f(sig, info, ctx);
} else {
unsafe { libc::_exit(128 + sig) };
}
}
_ => {
// HANDLER_NONE / HANDLER_DEFAULT / HANDLER_IGNORE — no safe chaining.
unsafe { libc::_exit(128 + sig) };
}
}
}
}
/// Install the SIGBUS handler via sigaction(). Chains to any previous handler.
/// Uses `Once` to guarantee exactly-once installation — safe to call from
/// multiple threads concurrently.
///
/// The install sequence closes the "install-then-publish" race by:
/// 1. Querying the current handler via raw `libc::sigaction` (no modification).
/// 2. Publishing old handler state to raw atomics (`Release` stores).
/// 3. Installing our handler via `nix::sigaction`.
///
/// This ensures the atomics are readable *before* our handler can fire.
fn install_sigbus_handler() {
INSTALL_ONCE.call_once(|| {
// Step 1: Query current SIGBUS disposition without changing it.
let mut old_act: libc::sigaction = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::sigaction(libc::SIGBUS, std::ptr::null(), &mut old_act) };
if ret != 0 {
// Failed to query — skip installation entirely.
return;
}
// Step 2: Publish old handler to atomics BEFORE installing ours.
let is_siginfo = (old_act.sa_flags & libc::SA_SIGINFO) != 0;
let raw_usize = old_act.sa_sigaction as usize;
if raw_usize == 0 {
// SIG_DFL (null function pointer → default disposition)
OLD_HANDLER_KIND.store(HANDLER_DEFAULT, Ordering::Release);
} else if raw_usize == 1 {
// SIG_IGN (sentinel value 1 → ignore disposition)
OLD_HANDLER_KIND.store(HANDLER_IGNORE, Ordering::Release);
} else {
// Real handler function pointer
OLD_HANDLER_PTR.store(raw_usize as *mut std::ffi::c_void, Ordering::Release);
if is_siginfo {
OLD_HANDLER_KIND.store(HANDLER_SIGACTION, Ordering::Release);
} else {
OLD_HANDLER_KIND.store(HANDLER_PLAIN, Ordering::Release);
}
}
// Step 3: Install our handler. Atomics are already published,
// so the handler can safely read them from the moment it's installed.
let new_action = SigAction::new(
SigHandler::SigAction(sigbus_handler),
SaFlags::SA_SIGINFO,
SigSet::empty(),
);
let _ = unsafe { sigaction(Signal::SIGBUS, &new_action) };
});
}
// ─── Core MmapReader ─────────────────────────────────────────────────────────
/// Core mmap-based reader. Holds the memory mapping, file handle, and
/// line index. Used as the engine inside each variant wrapper.
pub struct MmapReader {
mmap: Mmap,
#[allow(dead_code)]
file: File,
line_index: LineIndex,
file_size: u64,
}
impl MmapReader {
/// Open a file and create an mmap. Does NOT apply madvise.
/// The caller is responsible for applying the desired advice.
fn open_raw(path: &Path) -> std::io::Result<Self> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let mmap = unsafe { Mmap::map(&file)? };
let line_index = {
let mut reader = BufReader::new(&file);
LineIndex::from_reader(&mut reader)?
};
Ok(Self {
mmap,
file,
line_index,
file_size,
})
}
/// Open with MmapOptions (for populate, etc.).
fn open_with_options(path: &Path, opts: &MmapOptions) -> std::io::Result<Self> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let mmap = if file_size == 0 {
unsafe { Mmap::map(&file)? }
} else {
unsafe { opts.map(&file)? }
};
let line_index = {
let mut reader = BufReader::new(&file);
LineIndex::from_reader(&mut reader)?
};
Ok(Self {
mmap,
file,
line_index,
file_size,
})
}
/// Apply madvise to the entire mapping.
fn advise(&self, advice: Advice) {
let _ = self.mmap.advise(advice);
}
/// Grow the mmap to `new_size` bytes.
///
/// # Safety
/// Caller must ensure no `&[u8]` references to the old mmap data exist.
/// After remap, any pointers derived from the old mapping may be invalid
/// (if the kernel moved the mapping).
pub unsafe fn remap(&mut self, new_size: usize) -> std::io::Result<()> {
unsafe {
self.mmap
.remap(new_size, RemapOptions::new().may_move(true))?;
}
self.file_size = new_size as u64;
Ok(())
}
#[inline]
pub fn file_size(&self) -> u64 {
self.file_size
}
#[inline]
pub fn total_lines(&self) -> usize {
self.line_index.line_count()
}
#[inline]
pub fn get_line(&self, idx: usize) -> Option<String> {
self.line_index
.get_line(&self.mmap, idx)
.map(|s| s.to_owned())
}
#[inline]
pub fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
let start = offset as usize;
let end = start.checked_add(len)?;
if end > self.mmap.len() {
return None;
}
Some(self.mmap[start..end].to_vec())
}
}
// ─── 5 Variants ──────────────────────────────────────────────────────────────
/// Variant 1: Plain mmap, no madvise.
pub struct MmapReaderPlain {
inner: MmapReader,
}
impl MmapReaderPlain {
pub fn remap(&mut self, new_size: usize) -> std::io::Result<()> {
unsafe { self.inner.remap(new_size) }
}
}
impl FileReaderBackend for MmapReaderPlain {
fn name(&self) -> &str {
"mmap_plain"
}
fn open(path: &Path) -> std::io::Result<Self>
where
Self: Sized,
{
install_sigbus_handler();
let inner = MmapReader::open_raw(path)?;
Ok(Self { inner })
}
fn file_size(&self) -> u64 {
self.inner.file_size()
}
fn total_lines(&self) -> usize {
self.inner.total_lines()
}
fn get_line(&self, idx: usize) -> Option<String> {
self.inner.get_line(idx)
}
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
self.inner.read_range(offset, len)
}
fn close(self) {}
}
/// Variant 2: mmap with MADV_SEQUENTIAL — optimal for sequential scan (index build).
pub struct MmapReaderSequential {
inner: MmapReader,
}
impl FileReaderBackend for MmapReaderSequential {
fn name(&self) -> &str {
"mmap_sequential"
}
fn open(path: &Path) -> std::io::Result<Self>
where
Self: Sized,
{
install_sigbus_handler();
let inner = MmapReader::open_raw(path)?;
inner.advise(Advice::Sequential);
Ok(Self { inner })
}
fn file_size(&self) -> u64 {
self.inner.file_size()
}
fn total_lines(&self) -> usize {
self.inner.total_lines()
}
fn get_line(&self, idx: usize) -> Option<String> {
self.inner.get_line(idx)
}
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
self.inner.read_range(offset, len)
}
fn close(self) {}
}
/// Variant 3: mmap with MADV_RANDOM — optimal for random line access after index is built.
pub struct MmapReaderRandom {
inner: MmapReader,
}
impl FileReaderBackend for MmapReaderRandom {
fn name(&self) -> &str {
"mmap_random"
}
fn open(path: &Path) -> std::io::Result<Self>
where
Self: Sized,
{
install_sigbus_handler();
let inner = MmapReader::open_raw(path)?;
inner.advise(Advice::Random);
Ok(Self { inner })
}
fn file_size(&self) -> u64 {
self.inner.file_size()
}
fn total_lines(&self) -> usize {
self.inner.total_lines()
}
fn get_line(&self, idx: usize) -> Option<String> {
self.inner.get_line(idx)
}
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
self.inner.read_range(offset, len)
}
fn close(self) {}
}
/// Variant 4: mmap with MAP_POPULATE — pre-fault all pages at mmap time.
/// Trades higher upfront cost for smoother subsequent access.
pub struct MmapReaderPopulate {
inner: MmapReader,
}
impl FileReaderBackend for MmapReaderPopulate {
fn name(&self) -> &str {
"mmap_populate"
}
fn open(path: &Path) -> std::io::Result<Self>
where
Self: Sized,
{
install_sigbus_handler();
let mut opts = MmapOptions::new();
opts.populate();
let inner = MmapReader::open_with_options(path, &opts)?;
Ok(Self { inner })
}
fn file_size(&self) -> u64 {
self.inner.file_size()
}
fn total_lines(&self) -> usize {
self.inner.total_lines()
}
fn get_line(&self, idx: usize) -> Option<String> {
self.inner.get_line(idx)
}
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
self.inner.read_range(offset, len)
}
fn close(self) {}
}
/// Variant 5: Phase-aware — MADV_SEQUENTIAL during index build, then MADV_RANDOM
/// for line access. Best of both worlds for the read-index-then-query pattern.
pub struct MmapReaderPhaseAware {
inner: MmapReader,
}
impl FileReaderBackend for MmapReaderPhaseAware {
fn name(&self) -> &str {
"mmap_phase_aware"
}
fn open(path: &Path) -> std::io::Result<Self>
where
Self: Sized,
{
install_sigbus_handler();
let inner = MmapReader::open_raw(path)?;
// Index build used sequential streaming via BufReader. Now switch to random.
inner.advise(Advice::Random);
Ok(Self { inner })
}
fn file_size(&self) -> u64 {
self.inner.file_size()
}
fn total_lines(&self) -> usize {
self.inner.total_lines()
}
fn get_line(&self, idx: usize) -> Option<String> {
self.inner.get_line(idx)
}
fn read_range(&self, offset: u64, len: usize) -> Option<Vec<u8>> {
self.inner.read_range(offset, len)
}
fn close(self) {}
}
// ─── Tests ───────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write as _;
use tempfile::NamedTempFile;
fn create_temp_file(content: &[u8]) -> NamedTempFile {
let mut f = NamedTempFile::new().unwrap();
f.write_all(content).unwrap();
f.flush().unwrap();
f
}
#[test]
fn test_plain_open_and_read_lines() {
let f = create_temp_file(b"hello\nworld\nfoo\n");
let reader = MmapReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.name(), "mmap_plain");
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(0), Some("hello".to_owned()));
assert_eq!(reader.get_line(1), Some("world".to_owned()));
assert_eq!(reader.get_line(2), Some("foo".to_owned()));
assert_eq!(reader.get_line(3), None);
reader.close();
}
#[test]
fn test_plain_out_of_bounds_returns_none() {
let f = create_temp_file(b"line1\nline2\n");
let reader = MmapReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.total_lines(), 2);
assert_eq!(reader.get_line(100), None);
assert_eq!(reader.get_line(usize::MAX), None);
reader.close();
}
#[test]
fn test_plain_read_range() {
let content = b"hello world\nfoo bar\n";
let f = create_temp_file(content);
let reader = MmapReaderPlain::open(f.path()).unwrap();
let range = reader.read_range(0, 5).unwrap();
assert_eq!(&range, b"hello");
let range = reader.read_range(6, 5).unwrap();
assert_eq!(&range, b"world");
assert_eq!(reader.read_range(0, 1000), None);
reader.close();
}
#[test]
fn test_plain_empty_file() {
let f = create_temp_file(b"");
let reader = MmapReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.total_lines(), 0);
assert_eq!(reader.file_size(), 0);
assert_eq!(reader.get_line(0), None);
reader.close();
}
#[test]
fn test_sequential_variant() {
let f = create_temp_file(b"alpha\nbeta\ngamma\n");
let reader = MmapReaderSequential::open(f.path()).unwrap();
assert_eq!(reader.name(), "mmap_sequential");
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(1), Some("beta".to_owned()));
reader.close();
}
#[test]
fn test_random_variant() {
let f = create_temp_file(b"one\ntwo\nthree\n");
let reader = MmapReaderRandom::open(f.path()).unwrap();
assert_eq!(reader.name(), "mmap_random");
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(2), Some("three".to_owned()));
reader.close();
}
#[test]
fn test_populate_variant() {
let f = create_temp_file(b"x\ny\nz\n");
let reader = MmapReaderPopulate::open(f.path()).unwrap();
assert_eq!(reader.name(), "mmap_populate");
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(0), Some("x".to_owned()));
reader.close();
}
#[test]
fn test_phase_aware_variant() {
let f = create_temp_file(b"a\nb\nc\n");
let reader = MmapReaderPhaseAware::open(f.path()).unwrap();
assert_eq!(reader.name(), "mmap_phase_aware");
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(1), Some("b".to_owned()));
reader.close();
}
#[test]
fn test_file_size() {
let content = b"hello world";
let f = create_temp_file(content);
let reader = MmapReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.file_size(), content.len() as u64);
reader.close();
}
#[test]
fn test_sigbus_flag_api() {
reset_sigbus_flag();
assert!(!sigbus_flag());
}
#[test]
fn test_concurrent_open_installs_handler_once() {
let f = create_temp_file(b"line1\nline2\nline3\n");
let path = f.path().to_owned();
let num_threads = 8;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let path = path.clone();
std::thread::spawn(move || {
let reader = match thread_id % 5 {
0 => {
let r = MmapReaderPlain::open(&path).unwrap();
assert_eq!(r.total_lines(), 3);
r.close();
"plain"
}
1 => {
let r = MmapReaderSequential::open(&path).unwrap();
assert_eq!(r.total_lines(), 3);
r.close();
"sequential"
}
2 => {
let r = MmapReaderRandom::open(&path).unwrap();
assert_eq!(r.total_lines(), 3);
r.close();
"random"
}
3 => {
let r = MmapReaderPopulate::open(&path).unwrap();
assert_eq!(r.total_lines(), 3);
r.close();
"populate"
}
_ => {
let r = MmapReaderPhaseAware::open(&path).unwrap();
assert_eq!(r.total_lines(), 3);
r.close();
"phase_aware"
}
};
reader.to_owned()
})
})
.collect();
let results: Vec<String> = handles
.into_iter()
.map(|h| h.join().expect("thread panicked"))
.collect();
assert_eq!(results.len(), num_threads);
}
#[test]
fn test_no_trailing_newline() {
let f = create_temp_file(b"line1\nline2");
let reader = MmapReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.total_lines(), 2);
assert_eq!(reader.get_line(0), Some("line1".to_owned()));
assert_eq!(reader.get_line(1), Some("line2".to_owned()));
reader.close();
}
/// Diagnostic test: measure how long each stage of `open_raw()` takes
/// on a large file (e.g. the 5GB extreme.log).
/// Run with: cargo test -p log-viewer-bench --release -- --nocapture diag_open_stages
#[test]
fn diag_open_stages() {
let path = std::path::Path::new("/tmp/test-logviewer/extreme.log");
if !path.exists() {
eprintln!("SKIP: test file not found");
return;
}
// Stage 1: File::open + metadata
let t0 = std::time::Instant::now();
let file = File::open(path).unwrap();
let file_size = file.metadata().unwrap().len();
let stage1 = t0.elapsed();
eprintln!(
"Stage 1 - File::open + metadata: {:.2}ms ({:.1}GB)",
stage1.as_secs_f64() * 1000.0,
file_size as f64 / 1073741824.0
);
// Stage 2: mmap::map
let t1 = std::time::Instant::now();
let _mmap = unsafe { Mmap::map(&file) }.unwrap();
let stage2 = t1.elapsed();
eprintln!(
"Stage 2 - mmap::map: {:.2}ms",
stage2.as_secs_f64() * 1000.0
);
drop(_mmap);
// Stage 3: LineIndex::from_reader via BufReader (default 8KB buffer)
let t2 = std::time::Instant::now();
let mut reader = BufReader::new(&file);
let line_index = crate::line_index::LineIndex::from_reader(&mut reader).unwrap();
let stage3 = t2.elapsed();
eprintln!(
"Stage 3 - LineIndex::from_reader: {:.2}ms ({} lines, {} sampled_offsets)",
stage3.as_secs_f64() * 1000.0,
line_index.line_count(),
line_index.sampled_offsets.len()
);
// Stage 3b: Try with 1MB buffer
let file2 = File::open(path).unwrap();
let t2b = std::time::Instant::now();
let mut reader2 = BufReader::with_capacity(1024 * 1024, &file2);
let line_index2 = crate::line_index::LineIndex::from_reader(&mut reader2).unwrap();
let stage3b = t2b.elapsed();
eprintln!(
"Stage 3b - LineIndex (1MB buffer): {:.2}ms",
stage3b.as_secs_f64() * 1000.0
);
eprintln!("\n=== 瓶颈分析 ===");
let total_ms = stage1.as_secs_f64() * 1000.0
+ stage2.as_secs_f64() * 1000.0
+ stage3.as_secs_f64() * 1000.0;
let total_dur = stage1 + stage2 + stage3;
eprintln!("Total ~{:.0}ms", total_ms);
eprintln!(
" File::open: {:.1}% ({:.0}ms)",
stage1.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
stage1.as_secs_f64() * 1000.0
);
eprintln!(
" mmap::map: {:.1}% ({:.0}ms)",
stage2.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
stage2.as_secs_f64() * 1000.0
);
eprintln!(
" from_reader: {:.1}% ({:.0}ms)",
stage3.as_secs_f64() * 1000.0 / total_dur.as_secs_f64() / 1000.0 * 100.0,
stage3.as_secs_f64() * 1000.0
);
// Suppress unused warnings
let _ = line_index2;
}
}