2 Commits

Author SHA1 Message Date
dailz
9baec5ab69 fix(bench): refresh PreadReader index periodically in scroll_during_append (closes #36)
The reader's line_index and file_size were frozen at open time.
After current_line exceeded the initial 150K lines, get_line_impl
returned None for all subsequent reads. With the background thread
appending ~10K lines/sec, ~40% of measured frame latencies were
actually the cost of a None return, not real I/O.

- Add PreadReaderCore::refresh_index(&mut self): seek to start,
  rebuild LineIndex, update file_size, invalidate read cache
- Add PreadReaderPlain::refresh_index forwarding method
- Add ReadCache::invalidate to force cache miss after reindex
- Rewrite bench_scroll_during_append: time-based refresh (250ms),
  only record latencies for successful reads, assert max_line > initial
- Add regression tests for refresh_index with appended lines
2026-06-05 14:40:32 +08:00
dailz
6dd87d2872 fix(bench): wrap file writes with BufWriter to reduce syscall overhead
Add BufWriter::with_capacity(64KB) to generate_test_file,
generate_growable_file, and append_lines in data_gen.rs.

Previously each writeln! triggered an individual write syscall,
making 5GB/74M-line benchmark data generation extremely slow.
BufWriter batches writes into 64KB chunks, reducing syscalls
by ~1000x.

Explicit flush()? + drop before subsequent reads ensures data
visibility and propagates flush errors (BufWriter::drop swallows
them).

Closes #35
2026-06-05 14:01:35 +08:00
3 changed files with 115 additions and 10 deletions

View File

@@ -1,5 +1,5 @@
use std::fs; use std::fs;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
pub struct TestFileInfo { pub struct TestFileInfo {
@@ -51,7 +51,7 @@ fn generate_test_file(path: &Path) -> std::io::Result<TestFileInfo> {
fs::create_dir_all(parent)?; fs::create_dir_all(parent)?;
} }
let mut file = fs::File::create(path)?; let mut file = BufWriter::with_capacity(64 * 1024, fs::File::create(path)?);
let target_lines: u64 = 74_000_000; let target_lines: u64 = 74_000_000;
for i in 0..target_lines { for i in 0..target_lines {
writeln!( writeln!(
@@ -63,6 +63,8 @@ fn generate_test_file(path: &Path) -> std::io::Result<TestFileInfo> {
i * 7 i * 7
)?; )?;
} }
file.flush()?;
drop(file);
get_file_info(path) get_file_info(path)
} }
@@ -72,7 +74,7 @@ pub fn generate_growable_file(dir: &Path) -> std::io::Result<PathBuf> {
fs::create_dir_all(dir)?; fs::create_dir_all(dir)?;
let path = dir.join("growable.log"); let path = dir.join("growable.log");
let mut file = fs::File::create(&path)?; let mut file = BufWriter::with_capacity(64 * 1024, fs::File::create(&path)?);
for i in 0..150_000u64 { for i in 0..150_000u64 {
writeln!( writeln!(
file, file,
@@ -82,13 +84,17 @@ pub fn generate_growable_file(dir: &Path) -> std::io::Result<PathBuf> {
i i
)?; )?;
} }
file.flush()?;
Ok(path) Ok(path)
} }
/// Append `count` lines to the file /// Append `count` lines to the file
pub fn append_lines(path: &Path, count: usize) -> std::io::Result<()> { pub fn append_lines(path: &Path, count: usize) -> std::io::Result<()> {
let mut file = fs::OpenOptions::new().append(true).open(path)?; let mut file = BufWriter::with_capacity(
64 * 1024,
fs::OpenOptions::new().append(true).open(path)?,
);
let existing_lines = count_existing_lines(path).unwrap_or(0); let existing_lines = count_existing_lines(path).unwrap_or(0);
for i in 0..count { for i in 0..count {
writeln!( writeln!(
@@ -97,6 +103,7 @@ pub fn append_lines(path: &Path, count: usize) -> std::io::Result<()> {
existing_lines + i as u64 existing_lines + i as u64
)?; )?;
} }
file.flush()?;
Ok(()) Ok(())
} }

View File

@@ -10,7 +10,7 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::{BufReader, Seek as _, SeekFrom};
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::path::Path; use std::path::Path;
@@ -43,6 +43,10 @@ impl ReadCache {
/// Read `len` bytes starting at `offset`. Returns a slice into the cache. /// Read `len` bytes starting at `offset`. Returns a slice into the cache.
/// On cache hit (range fully within cached block), no syscall needed. /// On cache hit (range fully within cached block), no syscall needed.
/// On miss, performs a `read_exact_at` syscall. /// On miss, performs a `read_exact_at` syscall.
fn invalidate(&mut self) {
self.buf_len = 0;
}
fn get(&mut self, file: &File, offset: u64, len: usize) -> std::io::Result<&[u8]> { fn get(&mut self, file: &File, offset: u64, len: usize) -> std::io::Result<&[u8]> {
let end = offset + len as u64; let end = offset + len as u64;
if offset >= self.buf_offset && end <= self.buf_offset + self.buf_len as u64 { if offset >= self.buf_offset && end <= self.buf_offset + self.buf_len as u64 {
@@ -189,6 +193,19 @@ impl PreadReaderCore {
pub fn total_lines(&self) -> usize { pub fn total_lines(&self) -> usize {
self.line_index.line_count() self.line_index.line_count()
} }
pub fn refresh_index(&mut self) -> std::io::Result<()> {
let new_size = self.file.metadata()?.len();
self.file.seek(SeekFrom::Start(0))?;
let new_index = {
let mut reader = BufReader::new(&self.file);
LineIndex::from_reader(&mut reader)?
};
self.file_size = new_size;
self.line_index = new_index;
self.cache.get_mut().invalidate();
Ok(())
}
} }
// ─── 3 Variants ─────────────────────────────────────────────────────────────── // ─── 3 Variants ───────────────────────────────────────────────────────────────
@@ -198,6 +215,12 @@ pub struct PreadReaderPlain {
inner: PreadReaderCore, inner: PreadReaderCore,
} }
impl PreadReaderPlain {
pub fn refresh_index(&mut self) -> std::io::Result<()> {
self.inner.refresh_index()
}
}
impl FileReaderBackend for PreadReaderPlain { impl FileReaderBackend for PreadReaderPlain {
fn name(&self) -> &str { fn name(&self) -> &str {
"pread_plain" "pread_plain"
@@ -441,6 +464,55 @@ mod tests {
reader.close(); reader.close();
} }
#[test]
fn test_refresh_index_sees_appended_lines() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("refresh_test.log");
// Phase 1: write 3 initial lines
{
let mut f = std::fs::File::create(&path).unwrap();
std::io::Write::write_all(&mut f, b"alpha\nbeta\ngamma\n").unwrap();
}
let mut reader = PreadReaderPlain::open(&path).unwrap();
assert_eq!(reader.total_lines(), 3);
assert_eq!(reader.get_line(0), Some("alpha".to_owned()));
assert_eq!(reader.get_line(3), None, "should be out of bounds before append");
// Phase 2: append 2 more lines
{
use std::io::Write as _;
let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(b"delta\nepsilon\n").unwrap();
}
// Still stale — refresh needed
assert_eq!(reader.get_line(3), None, "stale index before refresh");
reader.refresh_index().unwrap();
assert_eq!(reader.total_lines(), 5);
assert_eq!(reader.get_line(3), Some("delta".to_owned()));
assert_eq!(reader.get_line(4), Some("epsilon".to_owned()));
assert_eq!(reader.get_line(5), None);
reader.close();
}
#[test]
fn test_refresh_index_no_change_is_noop() {
let f = create_temp_file(b"one\ntwo\n");
let mut reader = PreadReaderPlain::open(f.path()).unwrap();
assert_eq!(reader.total_lines(), 2);
reader.refresh_index().unwrap();
assert_eq!(reader.total_lines(), 2);
assert_eq!(reader.get_line(0), Some("one".to_owned()));
assert_eq!(reader.get_line(1), Some("two".to_owned()));
reader.close();
}
#[test] #[test]
fn test_truncation_graceful_error() { fn test_truncation_graceful_error() {
// Verify that truncated file access returns None/Err instead of panicking // Verify that truncated file access returns None/Err instead of panicking

View File

@@ -180,17 +180,39 @@ fn bench_scroll_during_append(config: &BenchConfig, dir: &std::path::Path) -> Ve
} }
}); });
let reader = PreadReaderPlain::open(&path).expect("Failed to open file"); let mut reader = PreadReaderPlain::open(&path).expect("Failed to open file");
let initial_lines = reader.total_lines();
let mut frame_latencies = Vec::new(); let mut frame_latencies = Vec::new();
let mut current_line = 0usize; let mut current_line = 0usize;
let mut refresh_count: u64 = 0;
let mut none_count: u64 = 0;
let scroll_start = std::time::Instant::now(); let scroll_start = std::time::Instant::now();
let mut last_refresh = std::time::Instant::now();
let refresh_interval = std::time::Duration::from_millis(250);
while scroll_start.elapsed().as_secs() < duration_secs { while scroll_start.elapsed().as_secs() < duration_secs {
if last_refresh.elapsed() >= refresh_interval {
reader.refresh_index().ok();
refresh_count += 1;
last_refresh = std::time::Instant::now();
continue;
}
if let Some(_line) = reader.get_line(current_line) {
let t = std::time::Instant::now(); let t = std::time::Instant::now();
let _ = reader.get_line(current_line);
frame_latencies.push(t.elapsed().as_micros() as u64); frame_latencies.push(t.elapsed().as_micros() as u64);
current_line += 1; current_line += 1;
} else {
none_count += 1;
std::thread::sleep(std::time::Duration::from_millis(1));
} }
}
let max_line = current_line;
assert!(
max_line > initial_lines,
"benchmark never read past initial {initial_lines} lines (max={max_line})"
);
reader.close(); reader.close();
bg_handle.join().ok(); bg_handle.join().ok();
@@ -202,6 +224,10 @@ fn bench_scroll_during_append(config: &BenchConfig, dir: &std::path::Path) -> Ve
extra.insert("duration_secs".into(), duration_secs as f64); extra.insert("duration_secs".into(), duration_secs as f64);
extra.insert("append_rate_per_sec".into(), append_rate as f64); extra.insert("append_rate_per_sec".into(), append_rate as f64);
extra.insert("frames_rendered".into(), frame_latencies.len() as f64); extra.insert("frames_rendered".into(), frame_latencies.len() as f64);
extra.insert("refresh_count".into(), refresh_count as f64);
extra.insert("none_count".into(), none_count as f64);
extra.insert("initial_lines".into(), initial_lines as f64);
extra.insert("max_line_seen".into(), max_line as f64);
vec![BenchmarkResult { vec![BenchmarkResult {
category: "growth".into(), category: "growth".into(),