Files
logViewer/crates/core/src/io/index_cache.rs
dailz 2cebbd94c4 fix: concurrent cache save uses unique temp files (#17)
Replace deterministic .index.tmp path with per-save unique temp files
via tempfile::Builder. Eliminates race condition where background
indexer thread and TUI main thread could collide on the same temp path,
causing data truncation or corrupt cache writes.

Changes:
- Add write_cache_atomically() helper using tempfile::Builder
- Refactor save_with_hash() and save() to use the helper
- Extract encode_cache() to deduplicate serialization logic
- Move tempfile from [dev-dependencies] to [dependencies]
- Add 2 concurrent tests validating no corruption under parallel writes

Fixes #17
2026-06-09 16:13:39 +08:00

387 lines
13 KiB
Rust

use std::io::{Read as _, Write as _};
use std::path::Path;
use crate::io::cache_util::{cache_path, CACHE_VERSION};
use crate::io::line_index::LineIndex;
pub struct IndexCache;
/// Write `buf` to `dest` atomically using a unique temporary file in the same directory.
///
/// Each call creates its own temp file via `tempfile::Builder`, eliminating collisions
/// when multiple threads (or processes) save to the same cache path concurrently.
/// The temp file is created in `dest.parent()` so the final `rename` stays on the
/// same filesystem and remains atomic.
fn write_cache_atomically(dest: &Path, buf: &[u8]) -> std::io::Result<()> {
let dir = dest.parent().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cache destination has no parent directory",
)
})?;
let mut tmp = tempfile::Builder::new()
.prefix("index-cache-")
.suffix(".tmp")
.tempfile_in(dir)?;
tmp.as_file_mut().write_all(buf)?;
tmp.as_file_mut().sync_all()?;
tmp.persist(dest).map(|_| ()).map_err(|e| e.error)
}
fn encode_cache(file_hash: u64, index: &LineIndex) -> std::io::Result<Vec<u8>> {
let index_bytes = bincode::serialize(index)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut buf = Vec::with_capacity(1 + 8 + index_bytes.len());
buf.push(CACHE_VERSION);
buf.extend_from_slice(&file_hash.to_le_bytes());
buf.extend_from_slice(&index_bytes);
Ok(buf)
}
impl IndexCache {
/// Save a `LineIndex` to disk using atomic write (unique temp file, then rename).
///
/// The file hash is derived from `data` (the same byte slice used to build the index),
/// avoiding TOCTOU issues from re-reading the file from disk.
pub fn save_with_hash(
file_path: &Path,
index: &LineIndex,
data: &[u8],
) -> std::io::Result<()> {
let dest = cache_path(file_path).ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "cannot determine cache path")
})?;
let file_hash = compute_data_hash(data);
let buf = encode_cache(file_hash, index)?;
write_cache_atomically(&dest, &buf)
}
/// Save a `LineIndex` to disk, computing the hash by re-reading the file.
///
/// Prefer [`save_with_hash`] when the source data is already in memory
/// to avoid a TOCTOU race between indexing and hash computation.
pub fn save(file_path: &Path, index: &LineIndex) -> std::io::Result<()> {
let dest = cache_path(file_path).ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "cannot determine cache path")
})?;
let file_hash = compute_file_hash(file_path)?;
let buf = encode_cache(file_hash, index)?;
write_cache_atomically(&dest, &buf)
}
/// Load a cached `LineIndex` from disk.
/// Returns `None` if: cache missing, version mismatch, file modified, or corruption.
pub fn load(file_path: &Path) -> Option<LineIndex> {
let path = cache_path(file_path)?;
let data = std::fs::read(&path).ok()?;
if data.len() < 9 {
return None;
}
// Validate version byte
if data[0] != CACHE_VERSION {
return None;
}
// Validate file hash
let stored_hash = u64::from_le_bytes(data[1..9].try_into().ok()?);
let current_hash = compute_file_hash(file_path).ok()?;
if stored_hash != current_hash {
return None;
}
// Deserialize index
bincode::deserialize(&data[9..]).ok()
}
}
/// Compute a fingerprint from in-memory data, using the same algorithm as `compute_file_hash`.
/// Returns 0 for empty data.
pub fn compute_data_hash(data: &[u8]) -> u64 {
let len = data.len();
if len == 0 {
return 0;
}
let head_size = 4096.min(len);
let tail_size = 4096.min(len);
let mut hasher_state = xxhash_rust::xxh3::Xxh3::new();
hasher_state.update(&data[..head_size]);
if len > head_size + tail_size {
hasher_state.update(&data[len - tail_size..]);
} else {
// For small files the tail overlaps with the head; hash from the real tail start.
let tail_start = len.saturating_sub(tail_size);
hasher_state.update(&data[tail_start..]);
}
hasher_state.update(&(len as u64).to_le_bytes());
hasher_state.digest()
}
/// Compute a fast fingerprint of the file: xxhash of (head 4KB + tail 4KB + file size).
/// Returns 0 for empty files.
fn compute_file_hash(file_path: &Path) -> std::io::Result<u64> {
let file = std::fs::File::open(file_path)?;
let file_size = file.metadata()?.len();
if file_size == 0 {
return Ok(0);
}
let mut f = std::io::BufReader::new(file);
let head_size = 4096.min(file_size as usize);
let mut head = vec![0u8; head_size];
f.read_exact(&mut head)?;
let tail_size = 4096.min(file_size as usize);
let mut tail = vec![0u8; tail_size];
if file_size as usize > head_size + tail_size {
// Need to seek to tail region
let mut file = std::fs::File::open(file_path)?;
std::io::Seek::seek(&mut file, std::io::SeekFrom::End(-(tail_size as i64)))?;
let mut tf = std::io::BufReader::new(file);
tf.read_exact(&mut tail)?;
} else {
// File fits within head+tail window. Seek to read the real tail
// for correctness — approximating from head misses bytes beyond head_size.
let tail_start = file_size.saturating_sub(tail_size as u64);
let mut file = std::fs::File::open(file_path)?;
std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(tail_start))?;
file.read_exact(&mut tail)?;
}
let mut hasher_state = xxhash_rust::xxh3::Xxh3::new();
hasher_state.update(&head);
hasher_state.update(&tail);
hasher_state.update(&file_size.to_le_bytes());
Ok(hasher_state.digest())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write as _;
use tempfile::NamedTempFile;
fn make_test_file(lines: usize) -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
for i in 0..lines {
writeln!(file, "line {}", i).unwrap();
}
file.flush().unwrap();
file
}
#[test]
fn test_cache_roundtrip() {
let file = make_test_file(300);
let data = std::fs::read(file.path()).unwrap();
let index = LineIndex::from_bytes(&data);
IndexCache::save(file.path(), &index).expect("save should succeed");
let loaded = IndexCache::load(file.path()).expect("load should return Some");
assert_eq!(loaded.line_count(), index.line_count());
assert_eq!(loaded.sampled_offsets(), index.sampled_offsets());
assert_eq!(loaded.has_trailing_newline(), index.has_trailing_newline());
}
#[test]
fn test_cache_invalidation_file_modified() {
let file = make_test_file(300);
let data = std::fs::read(file.path()).unwrap();
let index = LineIndex::from_bytes(&data);
IndexCache::save(file.path(), &index).expect("save should succeed");
// Append to file, changing its content
{
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(file.path())
.unwrap();
writeln!(f, "extra line").unwrap();
}
let loaded = IndexCache::load(file.path());
assert!(
loaded.is_none(),
"cache should be invalidated after file modification"
);
}
#[test]
fn test_cache_corruption() {
let file = make_test_file(300);
let data = std::fs::read(file.path()).unwrap();
let index = LineIndex::from_bytes(&data);
IndexCache::save(file.path(), &index).expect("save should succeed");
// Corrupt the cache file
let cache_path = cache_path(file.path()).expect("cache path");
let mut cache_data = std::fs::read(&cache_path).unwrap();
// Truncate the file (remove last 10 bytes)
let new_len = cache_data.len().saturating_sub(10);
cache_data.truncate(new_len);
std::fs::write(&cache_path, &cache_data).unwrap();
let loaded = IndexCache::load(file.path());
assert!(loaded.is_none(), "corrupt cache should return None");
}
#[test]
fn test_cache_version_mismatch() {
let file = make_test_file(300);
let data = std::fs::read(file.path()).unwrap();
let index = LineIndex::from_bytes(&data);
IndexCache::save(file.path(), &index).expect("save should succeed");
// Modify first byte (version)
let cache_path = cache_path(file.path()).expect("cache path");
let mut cache_data = std::fs::read(&cache_path).unwrap();
cache_data[0] = cache_data[0].wrapping_add(1);
std::fs::write(&cache_path, &cache_data).unwrap();
let loaded = IndexCache::load(file.path());
assert!(loaded.is_none(), "version mismatch should return None");
}
#[test]
fn test_cache_empty_file() {
let file = NamedTempFile::new().unwrap();
let index = LineIndex::from_bytes(b"");
IndexCache::save(file.path(), &index).expect("save should succeed");
let loaded = IndexCache::load(file.path()).expect("load should return Some");
assert_eq!(loaded.line_count(), 0);
}
#[test]
fn test_cache_nonexistent_source() {
let loaded = IndexCache::load(Path::new("/nonexistent/file.log"));
assert!(loaded.is_none(), "nonexistent file should return None");
}
#[test]
fn test_compute_file_hash_empty() {
let file = NamedTempFile::new().unwrap();
let hash = compute_file_hash(file.path()).unwrap();
assert_eq!(hash, 0, "empty file should hash to 0");
}
#[test]
fn test_compute_file_hash_deterministic() {
let mut file = NamedTempFile::new().unwrap();
write!(file, "hello world").unwrap();
file.flush().unwrap();
let h1 = compute_file_hash(file.path()).unwrap();
let h2 = compute_file_hash(file.path()).unwrap();
assert_eq!(h1, h2, "same file must produce same hash");
}
#[test]
fn test_compute_file_hash_changes_on_content_change() {
let mut file = NamedTempFile::new().unwrap();
write!(file, "version 1").unwrap();
file.flush().unwrap();
let h1 = compute_file_hash(file.path()).unwrap();
// Overwrite with different content
let mut file2 = std::fs::File::create(file.path()).unwrap();
write!(file2, "version 2 with more data").unwrap();
file2.flush().unwrap();
let h2 = compute_file_hash(file.path()).unwrap();
assert_ne!(h1, h2, "hash should change when content changes");
}
#[test]
fn test_concurrent_save_with_hash_no_corruption() {
use std::sync::{Arc, Barrier};
let file = make_test_file(300);
let data = std::fs::read(file.path()).unwrap();
let num_threads = 8;
let iterations = 50;
let barrier = Arc::new(Barrier::new(num_threads));
let path = file.path().to_path_buf();
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let barrier = Arc::clone(&barrier);
let path = path.clone();
let data = data.clone();
std::thread::spawn(move || {
let index = LineIndex::from_bytes(&data);
barrier.wait();
for _ in 0..iterations {
IndexCache::save_with_hash(&path, &index, &data)
.expect("concurrent save_with_hash should succeed");
}
})
})
.collect();
for h in handles {
h.join().expect("thread should not panic");
}
let loaded = IndexCache::load(file.path()).expect("final cache should load successfully");
let expected = LineIndex::from_bytes(&data);
assert_eq!(loaded.line_count(), expected.line_count());
assert_eq!(
loaded.sampled_offsets(),
expected.sampled_offsets(),
"concurrent writes must not produce interleaved data"
);
}
#[test]
fn test_concurrent_save_same_dest_all_succeed() {
use std::sync::{Arc, Barrier};
let dir = tempfile::tempdir().unwrap();
let dest = dir.path().join("test.index");
let payloads: Vec<Vec<u8>> = (0..8).map(|i| vec![i; 64 * 1024]).collect();
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = payloads
.into_iter()
.map(|payload| {
let barrier = Arc::clone(&barrier);
let dest = dest.clone();
std::thread::spawn(move || {
barrier.wait();
write_cache_atomically(&dest, &payload)
.expect("concurrent atomic write should succeed");
})
})
.collect();
for h in handles {
h.join().expect("thread should not panic");
}
let final_data = std::fs::read(&dest).expect("dest file should exist");
assert_eq!(final_data.len(), 64 * 1024, "final file must be exactly one payload");
}
}