diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index bc695e2..d193ff0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -18,7 +18,7 @@ directories.workspace = true xxhash-rust.workspace = true bincode.workspace = true unicode-width.workspace = true +tempfile.workspace = true [dev-dependencies] insta.workspace = true -tempfile.workspace = true diff --git a/crates/core/src/io/index_cache.rs b/crates/core/src/io/index_cache.rs index b5e1455..e9f9674 100644 --- a/crates/core/src/io/index_cache.rs +++ b/crates/core/src/io/index_cache.rs @@ -6,8 +6,44 @@ use crate::io::line_index::LineIndex; pub struct IndexCache; +/// Write `buf` to `dest` atomically using a unique temporary file in the same directory. +/// +/// Each call creates its own temp file via `tempfile::Builder`, eliminating collisions +/// when multiple threads (or processes) save to the same cache path concurrently. +/// The temp file is created in `dest.parent()` so the final `rename` stays on the +/// same filesystem and remains atomic. +fn write_cache_atomically(dest: &Path, buf: &[u8]) -> std::io::Result<()> { + let dir = dest.parent().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cache destination has no parent directory", + ) + })?; + + let mut tmp = tempfile::Builder::new() + .prefix("index-cache-") + .suffix(".tmp") + .tempfile_in(dir)?; + + tmp.as_file_mut().write_all(buf)?; + tmp.as_file_mut().sync_all()?; + + tmp.persist(dest).map(|_| ()).map_err(|e| e.error) +} + +fn encode_cache(file_hash: u64, index: &LineIndex) -> std::io::Result> { + let index_bytes = bincode::serialize(index) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; + + let mut buf = Vec::with_capacity(1 + 8 + index_bytes.len()); + buf.push(CACHE_VERSION); + buf.extend_from_slice(&file_hash.to_le_bytes()); + buf.extend_from_slice(&index_bytes); + Ok(buf) +} + impl IndexCache { - /// Save a `LineIndex` to disk using atomic write (write to .tmp, then rename). + /// Save a `LineIndex` to disk using atomic write (unique temp file, then rename). /// /// The file hash is derived from `data` (the same byte slice used to build the index), /// avoiding TOCTOU issues from re-reading the file from disk. @@ -21,23 +57,8 @@ impl IndexCache { })?; let file_hash = compute_data_hash(data); - let index_bytes = bincode::serialize(index) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - - let mut buf = Vec::with_capacity(1 + 8 + index_bytes.len()); - buf.push(CACHE_VERSION); - buf.extend_from_slice(&file_hash.to_le_bytes()); - buf.extend_from_slice(&index_bytes); - - let tmp_path = dest.with_extension("index.tmp"); - { - let mut f = std::fs::File::create(&tmp_path)?; - f.write_all(&buf)?; - f.sync_all()?; - } - std::fs::rename(&tmp_path, &dest)?; - - Ok(()) + let buf = encode_cache(file_hash, index)?; + write_cache_atomically(&dest, &buf) } /// Save a `LineIndex` to disk, computing the hash by re-reading the file. @@ -50,23 +71,8 @@ impl IndexCache { })?; let file_hash = compute_file_hash(file_path)?; - let index_bytes = bincode::serialize(index) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - - let mut buf = Vec::with_capacity(1 + 8 + index_bytes.len()); - buf.push(CACHE_VERSION); - buf.extend_from_slice(&file_hash.to_le_bytes()); - buf.extend_from_slice(&index_bytes); - - let tmp_path = dest.with_extension("index.tmp"); - { - let mut f = std::fs::File::create(&tmp_path)?; - f.write_all(&buf)?; - f.sync_all()?; - } - std::fs::rename(&tmp_path, &dest)?; - - Ok(()) + let buf = encode_cache(file_hash, index)?; + write_cache_atomically(&dest, &buf) } /// Load a cached `LineIndex` from disk. @@ -305,4 +311,77 @@ mod tests { assert_ne!(h1, h2, "hash should change when content changes"); } + + #[test] + fn test_concurrent_save_with_hash_no_corruption() { + use std::sync::{Arc, Barrier}; + + let file = make_test_file(300); + let data = std::fs::read(file.path()).unwrap(); + + let num_threads = 8; + let iterations = 50; + let barrier = Arc::new(Barrier::new(num_threads)); + let path = file.path().to_path_buf(); + + let handles: Vec<_> = (0..num_threads) + .map(|_| { + let barrier = Arc::clone(&barrier); + let path = path.clone(); + let data = data.clone(); + std::thread::spawn(move || { + let index = LineIndex::from_bytes(&data); + barrier.wait(); + for _ in 0..iterations { + IndexCache::save_with_hash(&path, &index, &data) + .expect("concurrent save_with_hash should succeed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread should not panic"); + } + + let loaded = IndexCache::load(file.path()).expect("final cache should load successfully"); + let expected = LineIndex::from_bytes(&data); + assert_eq!(loaded.line_count(), expected.line_count()); + assert_eq!( + loaded.sampled_offsets(), + expected.sampled_offsets(), + "concurrent writes must not produce interleaved data" + ); + } + + #[test] + fn test_concurrent_save_same_dest_all_succeed() { + use std::sync::{Arc, Barrier}; + + let dir = tempfile::tempdir().unwrap(); + let dest = dir.path().join("test.index"); + + let payloads: Vec> = (0..8).map(|i| vec![i; 64 * 1024]).collect(); + let barrier = Arc::new(Barrier::new(8)); + + let handles: Vec<_> = payloads + .into_iter() + .map(|payload| { + let barrier = Arc::clone(&barrier); + let dest = dest.clone(); + std::thread::spawn(move || { + barrier.wait(); + write_cache_atomically(&dest, &payload) + .expect("concurrent atomic write should succeed"); + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread should not panic"); + } + + let final_data = std::fs::read(&dest).expect("dest file should exist"); + assert_eq!(final_data.len(), 64 * 1024, "final file must be exactly one payload"); + } } \ No newline at end of file