fix(io): eliminate SIGBUS risk in background indexer threads (closes #6)

Background threads (spawn_indexer, spawn_visual_height_rebuild) previously
held mmap during entire file scan, risking SIGBUS if file was truncated
externally. Now uses BufReader streaming scan with mmap created only
after scan completes, plus stat validation.

Changes:
- spawn_indexer: replace mmap scan with BufReader fill_buf/consume loop,
  create mmap post-scan with fd stat validation
- spawn_visual_height_rebuild: replace mmap/FileReader with sequential
  BufReader scan, discard results on line count mismatch
- FileReader::open/reload/update_for_append: add stat-after-mmap check
- LineIndex: make fields pub(crate) for direct construction from scan loop
- Add 3 regression tests for truncation scenarios
This commit is contained in:
dailz
2026-06-04 14:10:51 +08:00
parent 1bb6b2e9f3
commit 1350f659fa
3 changed files with 247 additions and 65 deletions

View File

@@ -33,12 +33,16 @@ impl FileReader {
} else { } else {
// SAFETY: 使用只读 Mmap非 MmapMut文件以只读方式打开。 // SAFETY: 使用只读 Mmap非 MmapMut文件以只读方式打开。
// memmap2 内部持有文件描述符,确保 mmap 期间文件不会被关闭。 // memmap2 内部持有文件描述符,确保 mmap 期间文件不会被关闭。
// let m = unsafe { memmap2::Mmap::map(&file) }
// ⚠️ Known limitation (Phase 5): 如果文件在 mmap 期间被外部进程截断, .map_err(|e| CoreError::Mmap(e.to_string()))?;
// 访问截断区域的内存会触发 SIGBUS致命信号无法恢复
// FileWatcher Phase 将添加文件修改检测和 re-mmap 机制来处理此情况。 // Layer 3: mmap 后立即 stat 同一 fd检测截断TOCTOU 缓解,非安全证明)
// 在 Phase 5 中,假设打开的文件不会被外部修改。 let current_size = file.metadata()?.len();
Some(unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?) if current_size < m.len() as u64 {
None
} else {
Some(m)
}
}; };
// 直接从 mmap 快照构建行索引,确保索引与数据来自同一内存映射, // 直接从 mmap 快照构建行索引,确保索引与数据来自同一内存映射,
@@ -100,9 +104,14 @@ impl FileReader {
self.mmap = None; self.mmap = None;
if file_size > 0 { if file_size > 0 {
self.mmap = Some( let m =
unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?, unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?;
); let current_size = file.metadata()?.len();
self.mmap = if current_size < m.len() as u64 {
None
} else {
Some(m)
};
} }
self.line_index = match &self.mmap { self.line_index = match &self.mmap {
@@ -132,6 +141,12 @@ impl FileReader {
let mmap = let mmap =
unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?; unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?;
let current_size = file.metadata()?.len();
if current_size < mmap.len() as u64 {
self.line_index = LineIndex::from_bytes(&[]);
return Ok(AppendStatus::Reloaded);
}
self.line_index self.line_index
.extend_from_bytes(&mmap[old_size as usize..], old_size); .extend_from_bytes(&mmap[old_size as usize..], old_size);
self.mmap = Some(mmap); self.mmap = Some(mmap);
@@ -516,4 +531,26 @@ mod tests {
); );
} }
} }
#[test]
fn test_open_stat_after_mmap_detects_truncation() {
let content = b"line0\nline1\nline2\nline3\n";
let f = create_temp_file(content);
let reader = FileReader::open(f.path()).unwrap();
assert_eq!(reader.line_count(), 4);
{
use std::io::Write;
let _ = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(f.path())
.unwrap();
}
let reader = FileReader::open(f.path()).unwrap();
assert_eq!(reader.line_count(), 0);
assert_eq!(reader.file_size(), 0);
}
} }

View File

@@ -20,14 +20,14 @@ const BLOCK_SIZE: usize = 256;
pub struct LineIndex { pub struct LineIndex {
// 采样偏移量:每 BLOCK_SIZE 行记录一个起始字节偏移。 // 采样偏移量:每 BLOCK_SIZE 行记录一个起始字节偏移。
// sampled_offsets[i] 存储第 (i * BLOCK_SIZE) 行的字节起始位置。 // sampled_offsets[i] 存储第 (i * BLOCK_SIZE) 行的字节起始位置。
sampled_offsets: Vec<u64>, pub(crate) sampled_offsets: Vec<u64>,
// 文件总行数。 // 文件总行数。
total_lines: u64, pub(crate) total_lines: u64,
// 文件最后一个字节是否是换行符 \n。 // 文件最后一个字节是否是换行符 \n。
#[allow(dead_code)] #[allow(dead_code)]
has_trailing_newline: bool, pub(crate) has_trailing_newline: bool,
} }
impl LineIndex { impl LineIndex {

View File

@@ -1,5 +1,6 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::io::BufRead;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use crate::error::{CoreError, Result}; use crate::error::{CoreError, Result};
@@ -251,7 +252,7 @@ pub fn spawn_indexer(
return; return;
} }
}; };
let file_size = match file.metadata() { let target_len = match file.metadata() {
Ok(m) => m.len(), Ok(m) => m.len(),
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { let _ = tx.send(IndexerMessage::Error {
@@ -262,11 +263,17 @@ pub fn spawn_indexer(
} }
}; };
let mmap = if file_size == 0 { let mut buf_reader = std::io::BufReader::with_capacity(64 * 1024, file);
None let mut sampled_offsets: Vec<u64> = vec![0];
} else { let mut next_line_idx: usize = 1;
match unsafe { memmap2::Mmap::map(&file) } { let mut newline_count: usize = 0;
Ok(m) => Some(m), let mut chunk_offset: u64 = 0;
let mut last_byte: Option<u8> = None;
let mut bytes_since_check: usize = 0;
loop {
let buf = match buf_reader.fill_buf() {
Ok(b) => b,
Err(e) => { Err(e) => {
let _ = tx.send(IndexerMessage::Error { let _ = tx.send(IndexerMessage::Error {
generation, generation,
@@ -274,31 +281,35 @@ pub fn spawn_indexer(
}); });
return; return;
} }
};
if buf.is_empty() {
break;
} }
};
let data = mmap.as_deref().unwrap_or(&[]); if let Some(&b) = buf.last() {
last_byte = Some(b);
if !data.is_empty() { }
let mut newline_count: usize = 0;
let mut chars_since_check: usize = 0;
let mut prev_pos: usize = 0;
for pos in memchr::memchr_iter(b'\n', data) {
chars_since_check += pos - prev_pos;
prev_pos = pos;
if chars_since_check >= 1_000_000 {
chars_since_check = 0;
if cancel_rx.try_recv().is_ok() {
return;
}
}
for pos in memchr::memchr_iter(b'\n', buf) {
newline_count += 1; newline_count += 1;
if next_line_idx.is_multiple_of(256) {
sampled_offsets.push(chunk_offset + pos as u64 + 1);
}
next_line_idx += 1;
}
if newline_count % 256_000 == 0 { let consumed = buf.len();
let percent = (pos as f64 / file_size as f64) * 100.0; bytes_since_check += consumed;
chunk_offset += consumed as u64;
buf_reader.consume(consumed);
if bytes_since_check >= 1_000_000 {
bytes_since_check = 0;
if cancel_rx.try_recv().is_ok() {
return;
}
if target_len > 0 {
let percent = (chunk_offset as f64 / target_len as f64) * 100.0;
let _ = tx.send(IndexerMessage::Progress { let _ = tx.send(IndexerMessage::Progress {
generation, generation,
percent, percent,
@@ -312,9 +323,64 @@ pub fn spawn_indexer(
return; return;
} }
let line_index = LineIndex::from_bytes(data); let line_index = if chunk_offset == 0 {
LineIndex {
sampled_offsets: vec![],
total_lines: 0,
has_trailing_newline: false,
}
} else {
let has_trailing_newline = last_byte == Some(b'\n') && newline_count > 0;
let total_lines = if has_trailing_newline && newline_count > 0 {
newline_count as u64
} else {
(1 + newline_count) as u64
};
let _ = IndexCache::save_with_hash(&path, &line_index, data); if has_trailing_newline && newline_count > 0 {
let trailing_line_idx = newline_count;
if trailing_line_idx.is_multiple_of(256) {
sampled_offsets.pop();
}
}
LineIndex {
sampled_offsets,
total_lines,
has_trailing_newline,
}
};
let mmap = if target_len == 0 {
None
} else {
match std::fs::File::open(&path) {
Ok(mmap_file) => match unsafe { memmap2::Mmap::map(&mmap_file) } {
Ok(m) => match mmap_file.metadata() {
Ok(metadata) if metadata.len() >= m.len() as u64 => Some(m),
Ok(_) | Err(_) => None,
},
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
generation,
message: e.to_string(),
});
return;
}
},
Err(e) => {
let _ = tx.send(IndexerMessage::Error {
generation,
message: e.to_string(),
});
return;
}
}
};
if let Some(data) = mmap.as_deref() {
let _ = IndexCache::save_with_hash(&path, &line_index, data);
}
let reader = FileReader::from_parts(path, mmap, line_index); let reader = FileReader::from_parts(path, mmap, line_index);
@@ -355,26 +421,37 @@ pub fn spawn_visual_height_rebuild(
Err(_) => return, Err(_) => return,
}; };
let file_size = match file.metadata() { let mut reader = std::io::BufReader::with_capacity(64 * 1024, file);
Ok(m) => m.len(), let mut visual_heights = Vec::with_capacity(line_index.line_count());
Err(_) => return, let mut line_buf = Vec::new();
};
let mmap = if file_size == 0 { loop {
None if cancel_rx.try_recv().is_ok() {
} else { return;
match unsafe { memmap2::Mmap::map(&file) } { }
Ok(m) => Some(m),
line_buf.clear();
match std::io::BufRead::read_until(&mut reader, b'\n', &mut line_buf) {
Ok(0) => break,
Ok(_) => {
let line_text = std::str::from_utf8(&line_buf)
.ok()
.map(|s| s.trim_end_matches(['\r', '\n']))
.unwrap_or("");
visual_heights.push(compute_line_visual_height(
line_text,
terminal_width,
json_format,
));
}
Err(_) => return, Err(_) => return,
} }
}; }
if cancel_rx.try_recv().is_ok() { if visual_heights.len() != line_index.line_count() {
return; return;
} }
let reader = FileReader::from_parts(path, mmap, line_index);
let visual_heights = compute_visual_heights(&reader, terminal_width, json_format);
let index = let index =
VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width); VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width);
@@ -1193,18 +1270,20 @@ mod tests {
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(temp.path().to_path_buf(), 1, 80, false, cancel_rx); let rx = spawn_indexer(temp.path().to_path_buf(), 1, 80, false, cancel_rx);
let msg = rx.recv_timeout(std::time::Duration::from_secs(10)).unwrap(); loop {
match rx.recv_timeout(std::time::Duration::from_secs(10)).unwrap() {
match msg { IndexerMessage::Progress { .. } => continue,
IndexerMessage::Complete { IndexerMessage::Complete {
visual_height_index, visual_height_index,
.. ..
} => { } => {
let idx = visual_height_index.expect("should have visual height index"); let idx = visual_height_index.expect("should have visual height index");
assert_eq!(idx.visual_height_of_line(0), 1); assert_eq!(idx.visual_height_of_line(0), 1);
assert_eq!(idx.visual_height_of_line(1), 1); assert_eq!(idx.visual_height_of_line(1), 1);
break;
}
other => panic!("expected Complete, got {:?}", other),
} }
other => panic!("expected Complete, got {:?}", other),
} }
} }
@@ -1258,6 +1337,72 @@ mod tests {
idx.extend_from_heights(&[1, 2, 3]); idx.extend_from_heights(&[1, 2, 3]);
assert_eq!(idx.total_visual_rows(), 6); assert_eq!(idx.total_visual_rows(), 6);
assert_eq!(idx.line_count(), 3); }
#[test]
fn test_spawn_indexer_file_truncated_during_scan() {
let mut content = Vec::new();
for i in 0..100_000 {
writeln!(content, "line number {:08}", i).unwrap();
}
let f = create_temp_file(&content);
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx);
{
use std::io::Write;
let _ = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(f.path())
.unwrap();
}
let result = rx.recv_timeout(std::time::Duration::from_secs(10));
match result {
Ok(IndexerMessage::Complete { reader, .. }) => {
assert!(reader.line_count() <= 100_000);
}
Ok(IndexerMessage::Error { .. }) => {}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn test_spawn_visual_height_rebuild_line_count_mismatch_discards() {
let content = b"line0\nline1\nline2\n";
let f = create_temp_file(content);
let data = std::fs::read(f.path()).unwrap();
let index = LineIndex::from_bytes(&data);
IndexCache::save(f.path(), &index).unwrap();
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(f.path())
.unwrap();
file.write_all(b"only_one_line\n").unwrap();
}
let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1);
let rx = spawn_visual_height_rebuild(
f.path().to_path_buf(),
1,
80,
false,
cancel_rx,
);
let result = rx.recv_timeout(std::time::Duration::from_secs(5));
match result {
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
Ok(_) => panic!("should have been discarded due to line count mismatch"),
}
} }
} }