The reader's line_index and file_size were frozen at open time. After current_line exceeded the initial 150K lines, get_line_impl returned None for all subsequent reads. With the background thread appending ~10K lines/sec, ~40% of measured frame latencies were actually the cost of a None return, not real I/O. - Add PreadReaderCore::refresh_index(&mut self): seek to start, rebuild LineIndex, update file_size, invalidate read cache - Add PreadReaderPlain::refresh_index forwarding method - Add ReadCache::invalidate to force cache miss after reindex - Rewrite bench_scroll_during_append: time-based refresh (250ms), only record latencies for successful reads, assert max_line > initial - Add regression tests for refresh_index with appended lines
296 lines
11 KiB
Rust
296 lines
11 KiB
Rust
use std::collections::HashMap;
|
|
|
|
use crate::data_gen;
|
|
use crate::metrics::MetricsCollector;
|
|
use crate::mmap_reader::MmapReaderPlain;
|
|
use crate::pread_reader::PreadReaderPlain;
|
|
use crate::runner::BenchConfig;
|
|
use crate::types::BenchmarkResult;
|
|
use crate::FileReaderBackend;
|
|
|
|
pub fn run(config: &BenchConfig) -> Vec<BenchmarkResult> {
|
|
let mut results = Vec::new();
|
|
|
|
let dir = tempfile::tempdir().expect("Failed to create temp dir");
|
|
results.extend(bench_append_visibility_mmap(config, dir.path()));
|
|
results.extend(bench_append_visibility_pread(config, dir.path()));
|
|
results.extend(bench_remap_cost(config, dir.path()));
|
|
results.extend(bench_scroll_during_append(config, dir.path()));
|
|
results.extend(bench_high_frequency_append(config, dir.path()));
|
|
|
|
results
|
|
}
|
|
|
|
fn bench_append_visibility_mmap(
|
|
config: &BenchConfig,
|
|
dir: &std::path::Path,
|
|
) -> Vec<BenchmarkResult> {
|
|
let path = data_gen::generate_growable_file(dir).expect("Failed to create growable file");
|
|
let append_count: usize = if config.quick_mode { 100 } else { 1000 };
|
|
|
|
let mut reader = MmapReaderPlain::open(&path).expect("Failed to open growable file");
|
|
let original_lines = reader.total_lines();
|
|
let original_size = reader.file_size();
|
|
|
|
data_gen::append_lines(&path, append_count).expect("Failed to append lines");
|
|
|
|
let new_metadata = std::fs::metadata(&path).expect("Failed to read metadata");
|
|
let new_size = new_metadata.len() as usize;
|
|
|
|
let remap_start = std::time::Instant::now();
|
|
reader.remap(new_size).expect("Failed to remap");
|
|
let remap_elapsed = remap_start.elapsed();
|
|
|
|
let new_line_bytes =
|
|
reader.read_range(original_size, (new_size - original_size as usize).min(256));
|
|
let visible = new_line_bytes.is_some();
|
|
|
|
let rss = MetricsCollector::read_rss();
|
|
let faults = MetricsCollector::read_page_faults();
|
|
|
|
let mut extra = HashMap::new();
|
|
extra.insert("original_lines".into(), original_lines as f64);
|
|
extra.insert("appended_lines".into(), append_count as f64);
|
|
extra.insert("new_bytes_visible".into(), visible as u64 as f64);
|
|
extra.insert("original_size".into(), original_size as f64);
|
|
extra.insert("new_size".into(), new_size as f64);
|
|
extra.insert("remap_us".into(), remap_elapsed.as_micros() as f64);
|
|
|
|
reader.close();
|
|
|
|
vec![BenchmarkResult {
|
|
category: "growth".into(),
|
|
test_name: "append_visibility_mmap".into(),
|
|
backend: "mmap".into(),
|
|
variant: "plain".into(),
|
|
latency_us: vec![remap_elapsed.as_micros() as u64],
|
|
rss_kb: rss.vm_rss_kb,
|
|
rss_peak_kb: rss.vm_hwm_kb,
|
|
page_faults: faults.minor_faults + faults.major_faults,
|
|
extra,
|
|
}]
|
|
}
|
|
|
|
fn bench_append_visibility_pread(
|
|
config: &BenchConfig,
|
|
dir: &std::path::Path,
|
|
) -> Vec<BenchmarkResult> {
|
|
let sub_dir = dir.join("pread_growth");
|
|
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file");
|
|
let append_count: usize = if config.quick_mode { 100 } else { 1000 };
|
|
|
|
let reader = PreadReaderPlain::open(&path).expect("Failed to open growable file");
|
|
let original_lines = reader.total_lines();
|
|
reader.close();
|
|
|
|
data_gen::append_lines(&path, append_count).expect("Failed to append lines");
|
|
|
|
let reopen_start = std::time::Instant::now();
|
|
let new_reader = PreadReaderPlain::open(&path).expect("Failed to reopen file");
|
|
let reopen_elapsed = reopen_start.elapsed();
|
|
|
|
let new_lines = new_reader.total_lines();
|
|
let can_read_new = new_lines > original_lines;
|
|
if can_read_new {
|
|
let last_line = new_lines.saturating_sub(1);
|
|
let _ = new_reader.get_line(last_line);
|
|
}
|
|
|
|
let rss = MetricsCollector::read_rss();
|
|
let faults = MetricsCollector::read_page_faults();
|
|
|
|
let mut extra = HashMap::new();
|
|
extra.insert("original_lines".into(), original_lines as f64);
|
|
extra.insert("appended_lines".into(), append_count as f64);
|
|
extra.insert("new_total_lines".into(), new_lines as f64);
|
|
extra.insert("reopen_us".into(), reopen_elapsed.as_micros() as f64);
|
|
|
|
new_reader.close();
|
|
|
|
vec![BenchmarkResult {
|
|
category: "growth".into(),
|
|
test_name: "append_visibility_pread".into(),
|
|
backend: "pread".into(),
|
|
variant: "plain".into(),
|
|
latency_us: vec![reopen_elapsed.as_micros() as u64],
|
|
rss_kb: rss.vm_rss_kb,
|
|
rss_peak_kb: rss.vm_hwm_kb,
|
|
page_faults: faults.minor_faults + faults.major_faults,
|
|
extra,
|
|
}]
|
|
}
|
|
|
|
fn bench_remap_cost(config: &BenchConfig, dir: &std::path::Path) -> Vec<BenchmarkResult> {
|
|
let sub_dir = dir.join("remap_cost");
|
|
let append_count: usize = if config.quick_mode { 100 } else { 1000 };
|
|
let iterations: usize = if config.quick_mode { 5 } else { 20 };
|
|
|
|
let mut latencies = Vec::with_capacity(iterations);
|
|
|
|
for _ in 0..iterations {
|
|
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create file");
|
|
let mut reader = MmapReaderPlain::open(&path).expect("Failed to open file");
|
|
|
|
data_gen::append_lines(&path, append_count).expect("Failed to append");
|
|
|
|
let new_size = std::fs::metadata(&path).expect("metadata").len() as usize;
|
|
let t = std::time::Instant::now();
|
|
reader.remap(new_size).expect("remap failed");
|
|
latencies.push(t.elapsed().as_micros() as u64);
|
|
|
|
reader.close();
|
|
let _ = std::fs::remove_file(&path);
|
|
}
|
|
|
|
let rss = MetricsCollector::read_rss();
|
|
let faults = MetricsCollector::read_page_faults();
|
|
|
|
let mut extra = HashMap::new();
|
|
extra.insert("appended_per_iter".into(), append_count as f64);
|
|
extra.insert("iterations".into(), iterations as f64);
|
|
|
|
vec![BenchmarkResult {
|
|
category: "growth".into(),
|
|
test_name: "remap_cost".into(),
|
|
backend: "mmap".into(),
|
|
variant: "plain".into(),
|
|
latency_us: latencies,
|
|
rss_kb: rss.vm_rss_kb,
|
|
rss_peak_kb: rss.vm_hwm_kb,
|
|
page_faults: faults.minor_faults + faults.major_faults,
|
|
extra,
|
|
}]
|
|
}
|
|
|
|
fn bench_scroll_during_append(config: &BenchConfig, dir: &std::path::Path) -> Vec<BenchmarkResult> {
|
|
let sub_dir = dir.join("scroll_append");
|
|
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file");
|
|
let duration_secs: u64 = if config.quick_mode { 2 } else { 10 };
|
|
let append_rate: usize = if config.quick_mode { 1000 } else { 10000 };
|
|
|
|
let bg_path = path.clone();
|
|
let bg_handle = std::thread::spawn(move || {
|
|
let batch_size = 100;
|
|
let batch_interval =
|
|
std::time::Duration::from_micros(1_000_000 / (append_rate / batch_size).max(1) as u64);
|
|
let start = std::time::Instant::now();
|
|
while start.elapsed().as_secs() < duration_secs {
|
|
data_gen::append_lines(&bg_path, batch_size).ok();
|
|
std::thread::sleep(batch_interval);
|
|
}
|
|
});
|
|
|
|
let mut reader = PreadReaderPlain::open(&path).expect("Failed to open file");
|
|
let initial_lines = reader.total_lines();
|
|
let mut frame_latencies = Vec::new();
|
|
let mut current_line = 0usize;
|
|
let mut refresh_count: u64 = 0;
|
|
let mut none_count: u64 = 0;
|
|
let scroll_start = std::time::Instant::now();
|
|
let mut last_refresh = std::time::Instant::now();
|
|
let refresh_interval = std::time::Duration::from_millis(250);
|
|
|
|
while scroll_start.elapsed().as_secs() < duration_secs {
|
|
if last_refresh.elapsed() >= refresh_interval {
|
|
reader.refresh_index().ok();
|
|
refresh_count += 1;
|
|
last_refresh = std::time::Instant::now();
|
|
continue;
|
|
}
|
|
|
|
if let Some(_line) = reader.get_line(current_line) {
|
|
let t = std::time::Instant::now();
|
|
frame_latencies.push(t.elapsed().as_micros() as u64);
|
|
current_line += 1;
|
|
} else {
|
|
none_count += 1;
|
|
std::thread::sleep(std::time::Duration::from_millis(1));
|
|
}
|
|
}
|
|
|
|
let max_line = current_line;
|
|
assert!(
|
|
max_line > initial_lines,
|
|
"benchmark never read past initial {initial_lines} lines (max={max_line})"
|
|
);
|
|
|
|
reader.close();
|
|
bg_handle.join().ok();
|
|
|
|
let rss = MetricsCollector::read_rss();
|
|
let faults = MetricsCollector::read_page_faults();
|
|
|
|
let mut extra = HashMap::new();
|
|
extra.insert("duration_secs".into(), duration_secs as f64);
|
|
extra.insert("append_rate_per_sec".into(), append_rate as f64);
|
|
extra.insert("frames_rendered".into(), frame_latencies.len() as f64);
|
|
extra.insert("refresh_count".into(), refresh_count as f64);
|
|
extra.insert("none_count".into(), none_count as f64);
|
|
extra.insert("initial_lines".into(), initial_lines as f64);
|
|
extra.insert("max_line_seen".into(), max_line as f64);
|
|
|
|
vec![BenchmarkResult {
|
|
category: "growth".into(),
|
|
test_name: "scroll_during_append".into(),
|
|
backend: "pread".into(),
|
|
variant: "plain".into(),
|
|
latency_us: frame_latencies,
|
|
rss_kb: rss.vm_rss_kb,
|
|
rss_peak_kb: rss.vm_hwm_kb,
|
|
page_faults: faults.minor_faults + faults.major_faults,
|
|
extra,
|
|
}]
|
|
}
|
|
|
|
fn bench_high_frequency_append(
|
|
config: &BenchConfig,
|
|
dir: &std::path::Path,
|
|
) -> Vec<BenchmarkResult> {
|
|
let sub_dir = dir.join("high_freq_append");
|
|
let path = data_gen::generate_growable_file(&sub_dir).expect("Failed to create growable file");
|
|
let duration_secs: u64 = if config.quick_mode { 3 } else { 30 };
|
|
let append_rate: usize = if config.quick_mode { 1000 } else { 10000 };
|
|
let batch_size: usize = 100;
|
|
let batches_per_sec = append_rate / batch_size;
|
|
let total_batches = (duration_secs as usize * batches_per_sec).max(1);
|
|
|
|
let mut detect_latencies = Vec::with_capacity(total_batches);
|
|
|
|
for _ in 0..total_batches {
|
|
data_gen::append_lines(&path, batch_size).expect("Failed to append");
|
|
|
|
let t = std::time::Instant::now();
|
|
if let Ok(reader) = PreadReaderPlain::open(&path) {
|
|
let total = reader.total_lines();
|
|
let _ = reader.get_line(total.saturating_sub(1));
|
|
reader.close();
|
|
}
|
|
detect_latencies.push(t.elapsed().as_micros() as u64);
|
|
|
|
std::thread::sleep(std::time::Duration::from_micros(
|
|
1_000_000 / batches_per_sec as u64,
|
|
));
|
|
}
|
|
|
|
let rss = MetricsCollector::read_rss();
|
|
let faults = MetricsCollector::read_page_faults();
|
|
|
|
let mut extra = HashMap::new();
|
|
extra.insert("duration_secs".into(), duration_secs as f64);
|
|
extra.insert("append_rate_per_sec".into(), append_rate as f64);
|
|
extra.insert("batch_size".into(), batch_size as f64);
|
|
extra.insert("total_batches".into(), total_batches as f64);
|
|
|
|
vec![BenchmarkResult {
|
|
category: "growth".into(),
|
|
test_name: "high_frequency_append".into(),
|
|
backend: "pread".into(),
|
|
variant: "plain".into(),
|
|
latency_us: detect_latencies,
|
|
rss_kb: rss.vm_rss_kb,
|
|
rss_peak_kb: rss.vm_hwm_kb,
|
|
page_faults: faults.minor_faults + faults.major_faults,
|
|
extra,
|
|
}]
|
|
}
|