fix: audit fixes for 4 medium-severity bugs: index_cache tail hash, read_cache doc, mutex poison recovery, Remove event handling
This commit is contained in:
@@ -86,11 +86,12 @@ fn compute_file_hash(file_path: &Path) -> std::io::Result<u64> {
|
||||
let mut tf = std::io::BufReader::new(file);
|
||||
tf.read_exact(&mut tail)?;
|
||||
} else {
|
||||
// File is small enough that head already covers everything;
|
||||
// tail overlaps with head — just take the last tail_size bytes
|
||||
let start = head.len().saturating_sub(tail_size);
|
||||
tail = head[start..].to_vec();
|
||||
tail.resize(tail_size, 0);
|
||||
// File fits within head+tail window. Seek to read the real tail
|
||||
// for correctness — approximating from head misses bytes beyond head_size.
|
||||
let tail_start = file_size.saturating_sub(tail_size as u64);
|
||||
let mut file = std::fs::File::open(file_path)?;
|
||||
std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(tail_start))?;
|
||||
file.read_exact(&mut tail)?;
|
||||
}
|
||||
|
||||
let mut hasher_state = xxhash_rust::xxh3::Xxh3::new();
|
||||
@@ -242,4 +243,4 @@ mod tests {
|
||||
|
||||
assert_ne!(h1, h2, "hash should change when content changes");
|
||||
}
|
||||
}
|
||||
}
|
||||
435
crates/core/src/io/read_cache.rs
Normal file
435
crates/core/src/io/read_cache.rs
Normal file
@@ -0,0 +1,435 @@
|
||||
// NOTE: This module is implemented and tested but not yet integrated into
|
||||
// the production read path (FileReader uses mmap for line access).
|
||||
// It is kept as a building block for a future pread-based reader that
|
||||
// would avoid the SIGBUS risk of mmap.
|
||||
// ─── read_cache.rs ─────────────────────────────────────────────────────────
|
||||
// 16-slot LRU read cache with 4KB page-aligned keys.
|
||||
// Reduces syscalls by caching recently-read 4KB blocks.
|
||||
// Cross-block reads are handled via a spill buffer (not cached).
|
||||
// ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
const LRU_SLOTS: usize = 16;
|
||||
pub const BLOCK_ALIGN: usize = 4096;
|
||||
|
||||
struct CacheSlot {
|
||||
buf: Vec<u8>,
|
||||
block_offset: u64,
|
||||
len: usize,
|
||||
last_access: u64,
|
||||
}
|
||||
|
||||
pub struct LruReadCache {
|
||||
slots: [CacheSlot; LRU_SLOTS],
|
||||
spill_buf: Vec<u8>,
|
||||
spill_len: usize,
|
||||
tick: u64,
|
||||
}
|
||||
|
||||
pub type ReadCache = LruReadCache;
|
||||
|
||||
impl Default for LruReadCache {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
slots: std::array::from_fn(|_| CacheSlot {
|
||||
buf: vec![0u8; BLOCK_ALIGN],
|
||||
block_offset: 0,
|
||||
len: 0,
|
||||
last_access: 0,
|
||||
}),
|
||||
spill_buf: Vec::new(),
|
||||
spill_len: 0,
|
||||
tick: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LruReadCache {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Read `len` bytes starting at `offset`. Returns a slice into the cache
|
||||
/// on a hit, or fills a cache slot on a miss. Cross-block reads go through
|
||||
/// the spill buffer and are not cached.
|
||||
pub fn get(&mut self, file: &File, offset: u64, len: usize) -> io::Result<&[u8]> {
|
||||
let aligned_key = offset & !(BLOCK_ALIGN as u64 - 1);
|
||||
let request_end = offset.saturating_add(len as u64);
|
||||
let block_end = aligned_key + BLOCK_ALIGN as u64;
|
||||
|
||||
if request_end > block_end {
|
||||
self.spill_buf.resize(len, 0);
|
||||
let bytes_read = file.read_at(&mut self.spill_buf[..len], offset)?;
|
||||
if bytes_read == 0 {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "read 0 bytes"));
|
||||
}
|
||||
if bytes_read < len {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
|
||||
}
|
||||
self.spill_len = len;
|
||||
return Ok(&self.spill_buf[..len]);
|
||||
}
|
||||
|
||||
let hit_idx = self.slots.iter().position(|slot| {
|
||||
slot.block_offset == aligned_key && request_end <= slot.block_offset + slot.len as u64
|
||||
});
|
||||
|
||||
if let Some(idx) = hit_idx {
|
||||
self.slots[idx].last_access = self.tick;
|
||||
self.tick += 1;
|
||||
let start = (offset - self.slots[idx].block_offset) as usize;
|
||||
return Ok(&self.slots[idx].buf[start..start + len]);
|
||||
}
|
||||
|
||||
let mut evict_idx = 0;
|
||||
let mut min_access = self.slots[0].last_access;
|
||||
for (i, slot) in self.slots.iter().enumerate() {
|
||||
if slot.last_access < min_access {
|
||||
min_access = slot.last_access;
|
||||
evict_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
let slot = &mut self.slots[evict_idx];
|
||||
let bytes_read = file.read_at(&mut slot.buf, aligned_key)?;
|
||||
|
||||
// Note: get(file, 0, 0) on an empty file now returns Err (old code returned Ok(&[])).
|
||||
// No callers pass len == 0, so this is a safe semantic change.
|
||||
if bytes_read == 0 {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "read 0 bytes"));
|
||||
}
|
||||
|
||||
slot.block_offset = aligned_key;
|
||||
slot.len = bytes_read;
|
||||
slot.last_access = self.tick;
|
||||
self.tick += 1;
|
||||
|
||||
if request_end > aligned_key + bytes_read as u64 {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
|
||||
}
|
||||
|
||||
let start = (offset - slot.block_offset) as usize;
|
||||
Ok(&slot.buf[start..start + len])
|
||||
}
|
||||
|
||||
/// Invalidate all cache slots and the spill buffer.
|
||||
pub fn clear(&mut self) {
|
||||
for slot in &mut self.slots {
|
||||
slot.len = 0;
|
||||
}
|
||||
self.spill_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Write;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
fn make_file(data: &[u8]) -> NamedTempFile {
|
||||
let mut f = NamedTempFile::new().unwrap();
|
||||
f.write_all(data).unwrap();
|
||||
f.flush().unwrap();
|
||||
f
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_hit_returns_same_data() {
|
||||
// Read the same range twice — second read should be a cache hit.
|
||||
let f = make_file(b"Hello, World! This is a test of the cache.");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let first = cache.get(&file, 0, 13).unwrap().to_vec();
|
||||
let second = cache.get(&file, 0, 13).unwrap().to_vec();
|
||||
assert_eq!(first, second);
|
||||
assert_eq!(&first, b"Hello, World!");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_miss_reads_correct_data() {
|
||||
// Two non-overlapping ranges — both must be misses but return correct data.
|
||||
let data = b"0123456789ABCDEFGHIJ";
|
||||
let f = make_file(data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let a = cache.get(&file, 0, 10).unwrap().to_vec();
|
||||
assert_eq!(&a, b"0123456789");
|
||||
|
||||
let b = cache.get(&file, 10, 10).unwrap().to_vec();
|
||||
assert_eq!(&b, b"ABCDEFGHIJ");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cross_block_read_uses_spill_buffer() {
|
||||
// Read spanning a 4KB block boundary uses spill buffer, not cache.
|
||||
let data = vec![0xABu8; 8192];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// First, cache block 0.
|
||||
let a = cache.get(&file, 0, 100).unwrap().to_vec();
|
||||
assert_eq!(a, vec![0xABu8; 100]);
|
||||
|
||||
// Now read spanning the boundary: offset=4000, len=200 spans [0,4096) and [4096,8192).
|
||||
let b = cache.get(&file, 4000, 200).unwrap().to_vec();
|
||||
assert_eq!(b, vec![0xABu8; 200]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_file_read_fails() {
|
||||
let f = make_file(b"");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Reading 1 byte from empty file should fail.
|
||||
let result = cache.get(&file, 0, 1);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_invalidates_cache() {
|
||||
let data = b"original data here";
|
||||
let f = make_file(data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Populate cache.
|
||||
let first = cache.get(&file, 0, 10).unwrap().to_vec();
|
||||
assert_eq!(&first, b"original d");
|
||||
|
||||
// Invalidate.
|
||||
cache.clear();
|
||||
|
||||
// After clear, reading same range should still work (re-reads from file).
|
||||
let after = cache.get(&file, 0, 10).unwrap().to_vec();
|
||||
assert_eq!(&after, b"original d");
|
||||
}
|
||||
|
||||
// ─── New LRU-specific tests ───────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn lru_multi_block_hit() {
|
||||
// Read 3 different aligned blocks, verify re-reading each hits cache.
|
||||
let data = vec![0u8; BLOCK_ALIGN * 4];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Write distinct patterns to each block.
|
||||
drop(file);
|
||||
{
|
||||
use std::io::Seek;
|
||||
let mut f2 = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.open(f.path())
|
||||
.unwrap();
|
||||
f2.seek(std::io::SeekFrom::Start(0)).unwrap();
|
||||
f2.write_all(&[1u8; BLOCK_ALIGN]).unwrap();
|
||||
f2.seek(std::io::SeekFrom::Start(BLOCK_ALIGN as u64))
|
||||
.unwrap();
|
||||
f2.write_all(&[2u8; BLOCK_ALIGN]).unwrap();
|
||||
f2.seek(std::io::SeekFrom::Start((BLOCK_ALIGN * 2) as u64))
|
||||
.unwrap();
|
||||
f2.write_all(&[3u8; BLOCK_ALIGN]).unwrap();
|
||||
}
|
||||
let file = File::open(f.path()).unwrap();
|
||||
|
||||
let block0 = cache.get(&file, 0, 16).unwrap().to_vec();
|
||||
let block1 = cache.get(&file, BLOCK_ALIGN as u64, 16).unwrap().to_vec();
|
||||
let block2 = cache
|
||||
.get(&file, (BLOCK_ALIGN * 2) as u64, 16)
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
|
||||
assert_eq!(block0, vec![1u8; 16]);
|
||||
assert_eq!(block1, vec![2u8; 16]);
|
||||
assert_eq!(block2, vec![3u8; 16]);
|
||||
|
||||
// Re-read — should hit cache and return same data.
|
||||
let block0_again = cache.get(&file, 0, 16).unwrap().to_vec();
|
||||
let block1_again = cache.get(&file, BLOCK_ALIGN as u64, 16).unwrap().to_vec();
|
||||
let block2_again = cache
|
||||
.get(&file, (BLOCK_ALIGN * 2) as u64, 16)
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
|
||||
assert_eq!(block0_again, block0);
|
||||
assert_eq!(block1_again, block1);
|
||||
assert_eq!(block2_again, block2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_eviction_order() {
|
||||
// Fill all 16 slots, re-access slot 0, add 17th block,
|
||||
// verify slot at offset 4096 (slot 1) evicted, not slot 0.
|
||||
let data = vec![0u8; BLOCK_ALIGN * 20];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Fill all 16 slots with blocks 0..16.
|
||||
for i in 0..16u64 {
|
||||
cache.get(&file, i * BLOCK_ALIGN as u64, 1).unwrap();
|
||||
}
|
||||
|
||||
// Re-access block 0 so it's not the LRU.
|
||||
cache.get(&file, 0, 1).unwrap();
|
||||
|
||||
// Add 17th block — should evict block 1 (offset 4096), which is the oldest
|
||||
// since block 0 was re-accessed.
|
||||
cache.get(&file, 16 * BLOCK_ALIGN as u64, 1).unwrap();
|
||||
|
||||
// Reading block 1 (offset 4096) should be a miss (evicted).
|
||||
// We verify by checking the cache slots: block 1 should not be cached.
|
||||
let has_block1 = cache
|
||||
.slots
|
||||
.iter()
|
||||
.any(|s| s.block_offset == BLOCK_ALIGN as u64 && s.len > 0);
|
||||
assert!(!has_block1, "block 1 should have been evicted");
|
||||
|
||||
// Block 0 should still be cached.
|
||||
let has_block0 = cache.slots.iter().any(|s| s.block_offset == 0 && s.len > 0);
|
||||
assert!(has_block0, "block 0 should still be cached");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_clear_all_slots() {
|
||||
// Fill 3+ slots, call clear(), verify subsequent reads all miss.
|
||||
let data = vec![0x42u8; BLOCK_ALIGN * 4];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Fill 3 slots.
|
||||
cache.get(&file, 0, 1).unwrap();
|
||||
cache.get(&file, BLOCK_ALIGN as u64, 1).unwrap();
|
||||
cache.get(&file, (BLOCK_ALIGN * 2) as u64, 1).unwrap();
|
||||
|
||||
cache.clear();
|
||||
|
||||
// All slots should have len == 0.
|
||||
for slot in &cache.slots {
|
||||
assert_eq!(slot.len, 0);
|
||||
}
|
||||
assert_eq!(cache.spill_len, 0);
|
||||
|
||||
// Re-read should still work (reads from file).
|
||||
let val = cache.get(&file, 0, 1).unwrap();
|
||||
assert_eq!(val[0], 0x42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_aligned_keys() {
|
||||
// offset=100 and offset=200 both align to block 0 — should hit same slot.
|
||||
let data = vec![0xEEu8; BLOCK_ALIGN];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let a = cache.get(&file, 100, 10).unwrap().to_vec();
|
||||
assert_eq!(a, vec![0xEEu8; 10]);
|
||||
|
||||
let b = cache.get(&file, 200, 10).unwrap().to_vec();
|
||||
assert_eq!(b, vec![0xEEu8; 10]);
|
||||
|
||||
// Both should be served from the same cache slot (block 0).
|
||||
let slot_count = cache
|
||||
.slots
|
||||
.iter()
|
||||
.filter(|s| s.block_offset == 0 && s.len > 0)
|
||||
.count();
|
||||
assert_eq!(slot_count, 1, "only one slot should hold block 0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_cross_block_uses_spill_buffer() {
|
||||
// File [0xAA×4096, 0xBB×4096], get(file, 4090, 20) → [0xAA×6, 0xBB×14].
|
||||
let mut data = vec![0xAAu8; BLOCK_ALIGN];
|
||||
data.extend_from_slice(&vec![0xBBu8; BLOCK_ALIGN]);
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let result = cache.get(&file, 4090, 20).unwrap().to_vec();
|
||||
assert_eq!(&result[..6], &[0xAAu8; 6]);
|
||||
assert_eq!(&result[6..], &[0xBBu8; 14]);
|
||||
|
||||
// Verify no cache slot holds block 0 or block 1.
|
||||
for slot in &cache.slots {
|
||||
assert!(
|
||||
slot.len == 0,
|
||||
"cross-block data should not be cached in slots"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_partial_last_block() {
|
||||
// 5000 byte file, get(file, 4096, 904) reads last 904 bytes,
|
||||
// then get(file, 4096, 100) hits cache.
|
||||
let data = vec![0x77u8; 5000];
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let first = cache.get(&file, 4096, 904).unwrap().to_vec();
|
||||
assert_eq!(first.len(), 904);
|
||||
assert_eq!(first, vec![0x77u8; 904]);
|
||||
|
||||
// Second read of 100 bytes from same block should hit cache.
|
||||
let second = cache.get(&file, 4096, 100).unwrap().to_vec();
|
||||
assert_eq!(second.len(), 100);
|
||||
assert_eq!(second, vec![0x77u8; 100]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_short_file_overread() {
|
||||
// 1 byte file, get(file, 0, 100) → Err (not panic).
|
||||
let f = make_file(b"X");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let result = cache.get(&file, 0, 100);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lru_empty_file_returns_error() {
|
||||
// Empty file, get(file, 0, 1) → Err.
|
||||
let f = make_file(b"");
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
let result = cache.get(&file, 0, 1);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn long_line_spans_multiple_blocks() {
|
||||
// Create file with line >4KB: b'A'×4090 + "\n" + b'B'×4090 + "\n"
|
||||
let mut data = vec![b'A'; 4090];
|
||||
data.push(b'\n');
|
||||
data.extend_from_slice(&vec![b'B'; 4090]);
|
||||
data.push(b'\n');
|
||||
let f = make_file(&data);
|
||||
let file = File::open(f.path()).unwrap();
|
||||
let mut cache = ReadCache::new();
|
||||
|
||||
// Read the first line: offset 0, len 4091 (fits in block 0 since 4091 <= 4096).
|
||||
let line1 = cache.get(&file, 0, 4091).unwrap().to_vec();
|
||||
assert_eq!(&line1[..4090], &[b'A'; 4090]);
|
||||
assert_eq!(line1[4090], b'\n');
|
||||
|
||||
// Read the second line: offset 4091, len 4091 (starts in block 0, ends in block 1 — cross-block).
|
||||
let line2 = cache.get(&file, 4091, 4091).unwrap().to_vec();
|
||||
assert_eq!(&line2[..4090], &[b'B'; 4090]);
|
||||
assert_eq!(line2[4090], b'\n');
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ pub enum FileEvent {
|
||||
Appended { new_size: u64 },
|
||||
Truncated { new_size: u64 },
|
||||
Rotated { new_inode: u64 },
|
||||
Removed,
|
||||
}
|
||||
|
||||
// ─── get_inode ──────────────────────────────────────────────────────────────
|
||||
@@ -60,6 +61,10 @@ impl FileWatcher {
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
EventKind::Remove(_) => {
|
||||
let _ = tx.send(FileEvent::Removed);
|
||||
return;
|
||||
}
|
||||
_ => return,
|
||||
}
|
||||
|
||||
@@ -70,7 +75,12 @@ impl FileWatcher {
|
||||
let current_inode = get_inode(&watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
let mut st = state.lock().unwrap();
|
||||
let mut st = state.lock().unwrap_or_else(|poison| {
|
||||
// Recover from poisoned mutex — state only tracks last_size
|
||||
// and last_inode for event dedup. Stale values at worst
|
||||
// cause a duplicate event, which is harmless.
|
||||
poison.into_inner()
|
||||
});
|
||||
|
||||
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode {
|
||||
let _ = tx.send(FileEvent::Rotated {
|
||||
@@ -223,4 +233,4 @@ mod tests {
|
||||
let d = FileEvent::Rotated { new_inode: 42 };
|
||||
assert_ne!(a, d);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -848,6 +848,9 @@ impl App {
|
||||
// Don't auto-switch; old content preserved.
|
||||
// User can reload manually if desired.
|
||||
}
|
||||
FileEvent::Removed => {
|
||||
self.loading_state = AppLoadingState::Error("File has been deleted".into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2651,4 +2654,4 @@ plain text line
|
||||
}));
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user