From 1350f659fa5ab1cfda9fae3a8fd0377663363d4f Mon Sep 17 00:00:00 2001 From: dailz Date: Thu, 4 Jun 2026 14:10:51 +0800 Subject: [PATCH] fix(io): eliminate SIGBUS risk in background indexer threads (closes #6) Background threads (spawn_indexer, spawn_visual_height_rebuild) previously held mmap during entire file scan, risking SIGBUS if file was truncated externally. Now uses BufReader streaming scan with mmap created only after scan completes, plus stat validation. Changes: - spawn_indexer: replace mmap scan with BufReader fill_buf/consume loop, create mmap post-scan with fd stat validation - spawn_visual_height_rebuild: replace mmap/FileReader with sequential BufReader scan, discard results on line count mismatch - FileReader::open/reload/update_for_append: add stat-after-mmap check - LineIndex: make fields pub(crate) for direct construction from scan loop - Add 3 regression tests for truncation scenarios --- crates/core/src/io/file_reader.rs | 55 ++++- crates/core/src/io/line_index.rs | 6 +- crates/core/src/io/progressive_reader.rs | 251 ++++++++++++++++++----- 3 files changed, 247 insertions(+), 65 deletions(-) diff --git a/crates/core/src/io/file_reader.rs b/crates/core/src/io/file_reader.rs index 00fca93..e9a03a0 100644 --- a/crates/core/src/io/file_reader.rs +++ b/crates/core/src/io/file_reader.rs @@ -33,12 +33,16 @@ impl FileReader { } else { // SAFETY: 使用只读 Mmap(非 MmapMut),文件以只读方式打开。 // memmap2 内部持有文件描述符,确保 mmap 期间文件不会被关闭。 - // - // ⚠️ Known limitation (Phase 5): 如果文件在 mmap 期间被外部进程截断, - // 访问截断区域的内存会触发 SIGBUS(致命信号,无法恢复)。 - // FileWatcher Phase 将添加文件修改检测和 re-mmap 机制来处理此情况。 - // 在 Phase 5 中,假设打开的文件不会被外部修改。 - Some(unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?) + let m = unsafe { memmap2::Mmap::map(&file) } + .map_err(|e| CoreError::Mmap(e.to_string()))?; + + // Layer 3: mmap 后立即 stat 同一 fd,检测截断(TOCTOU 缓解,非安全证明) + let current_size = file.metadata()?.len(); + if current_size < m.len() as u64 { + None + } else { + Some(m) + } }; // 直接从 mmap 快照构建行索引,确保索引与数据来自同一内存映射, @@ -100,9 +104,14 @@ impl FileReader { self.mmap = None; if file_size > 0 { - self.mmap = Some( - unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?, - ); + let m = + unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?; + let current_size = file.metadata()?.len(); + self.mmap = if current_size < m.len() as u64 { + None + } else { + Some(m) + }; } self.line_index = match &self.mmap { @@ -132,6 +141,12 @@ impl FileReader { let mmap = unsafe { memmap2::Mmap::map(&file) }.map_err(|e| CoreError::Mmap(e.to_string()))?; + let current_size = file.metadata()?.len(); + if current_size < mmap.len() as u64 { + self.line_index = LineIndex::from_bytes(&[]); + return Ok(AppendStatus::Reloaded); + } + self.line_index .extend_from_bytes(&mmap[old_size as usize..], old_size); self.mmap = Some(mmap); @@ -516,4 +531,26 @@ mod tests { ); } } + + #[test] + fn test_open_stat_after_mmap_detects_truncation() { + let content = b"line0\nline1\nline2\nline3\n"; + let f = create_temp_file(content); + + let reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 4); + + { + use std::io::Write; + let _ = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(f.path()) + .unwrap(); + } + + let reader = FileReader::open(f.path()).unwrap(); + assert_eq!(reader.line_count(), 0); + assert_eq!(reader.file_size(), 0); + } } diff --git a/crates/core/src/io/line_index.rs b/crates/core/src/io/line_index.rs index eac43b4..c492629 100644 --- a/crates/core/src/io/line_index.rs +++ b/crates/core/src/io/line_index.rs @@ -20,14 +20,14 @@ const BLOCK_SIZE: usize = 256; pub struct LineIndex { // 采样偏移量:每 BLOCK_SIZE 行记录一个起始字节偏移。 // sampled_offsets[i] 存储第 (i * BLOCK_SIZE) 行的字节起始位置。 - sampled_offsets: Vec, + pub(crate) sampled_offsets: Vec, // 文件总行数。 - total_lines: u64, + pub(crate) total_lines: u64, // 文件最后一个字节是否是换行符 \n。 #[allow(dead_code)] - has_trailing_newline: bool, + pub(crate) has_trailing_newline: bool, } impl LineIndex { diff --git a/crates/core/src/io/progressive_reader.rs b/crates/core/src/io/progressive_reader.rs index 49adc02..7652ad4 100644 --- a/crates/core/src/io/progressive_reader.rs +++ b/crates/core/src/io/progressive_reader.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::fmt; +use std::io::BufRead; use std::path::{Path, PathBuf}; use crate::error::{CoreError, Result}; @@ -251,7 +252,7 @@ pub fn spawn_indexer( return; } }; - let file_size = match file.metadata() { + let target_len = match file.metadata() { Ok(m) => m.len(), Err(e) => { let _ = tx.send(IndexerMessage::Error { @@ -262,11 +263,17 @@ pub fn spawn_indexer( } }; - let mmap = if file_size == 0 { - None - } else { - match unsafe { memmap2::Mmap::map(&file) } { - Ok(m) => Some(m), + let mut buf_reader = std::io::BufReader::with_capacity(64 * 1024, file); + let mut sampled_offsets: Vec = vec![0]; + let mut next_line_idx: usize = 1; + let mut newline_count: usize = 0; + let mut chunk_offset: u64 = 0; + let mut last_byte: Option = None; + let mut bytes_since_check: usize = 0; + + loop { + let buf = match buf_reader.fill_buf() { + Ok(b) => b, Err(e) => { let _ = tx.send(IndexerMessage::Error { generation, @@ -274,31 +281,35 @@ pub fn spawn_indexer( }); return; } + }; + if buf.is_empty() { + break; } - }; - let data = mmap.as_deref().unwrap_or(&[]); - - if !data.is_empty() { - let mut newline_count: usize = 0; - let mut chars_since_check: usize = 0; - let mut prev_pos: usize = 0; - - for pos in memchr::memchr_iter(b'\n', data) { - chars_since_check += pos - prev_pos; - prev_pos = pos; - - if chars_since_check >= 1_000_000 { - chars_since_check = 0; - if cancel_rx.try_recv().is_ok() { - return; - } - } + if let Some(&b) = buf.last() { + last_byte = Some(b); + } + for pos in memchr::memchr_iter(b'\n', buf) { newline_count += 1; + if next_line_idx.is_multiple_of(256) { + sampled_offsets.push(chunk_offset + pos as u64 + 1); + } + next_line_idx += 1; + } - if newline_count % 256_000 == 0 { - let percent = (pos as f64 / file_size as f64) * 100.0; + let consumed = buf.len(); + bytes_since_check += consumed; + chunk_offset += consumed as u64; + buf_reader.consume(consumed); + + if bytes_since_check >= 1_000_000 { + bytes_since_check = 0; + if cancel_rx.try_recv().is_ok() { + return; + } + if target_len > 0 { + let percent = (chunk_offset as f64 / target_len as f64) * 100.0; let _ = tx.send(IndexerMessage::Progress { generation, percent, @@ -312,9 +323,64 @@ pub fn spawn_indexer( return; } - let line_index = LineIndex::from_bytes(data); + let line_index = if chunk_offset == 0 { + LineIndex { + sampled_offsets: vec![], + total_lines: 0, + has_trailing_newline: false, + } + } else { + let has_trailing_newline = last_byte == Some(b'\n') && newline_count > 0; + let total_lines = if has_trailing_newline && newline_count > 0 { + newline_count as u64 + } else { + (1 + newline_count) as u64 + }; - let _ = IndexCache::save_with_hash(&path, &line_index, data); + if has_trailing_newline && newline_count > 0 { + let trailing_line_idx = newline_count; + if trailing_line_idx.is_multiple_of(256) { + sampled_offsets.pop(); + } + } + + LineIndex { + sampled_offsets, + total_lines, + has_trailing_newline, + } + }; + + let mmap = if target_len == 0 { + None + } else { + match std::fs::File::open(&path) { + Ok(mmap_file) => match unsafe { memmap2::Mmap::map(&mmap_file) } { + Ok(m) => match mmap_file.metadata() { + Ok(metadata) if metadata.len() >= m.len() as u64 => Some(m), + Ok(_) | Err(_) => None, + }, + Err(e) => { + let _ = tx.send(IndexerMessage::Error { + generation, + message: e.to_string(), + }); + return; + } + }, + Err(e) => { + let _ = tx.send(IndexerMessage::Error { + generation, + message: e.to_string(), + }); + return; + } + } + }; + + if let Some(data) = mmap.as_deref() { + let _ = IndexCache::save_with_hash(&path, &line_index, data); + } let reader = FileReader::from_parts(path, mmap, line_index); @@ -355,26 +421,37 @@ pub fn spawn_visual_height_rebuild( Err(_) => return, }; - let file_size = match file.metadata() { - Ok(m) => m.len(), - Err(_) => return, - }; + let mut reader = std::io::BufReader::with_capacity(64 * 1024, file); + let mut visual_heights = Vec::with_capacity(line_index.line_count()); + let mut line_buf = Vec::new(); - let mmap = if file_size == 0 { - None - } else { - match unsafe { memmap2::Mmap::map(&file) } { - Ok(m) => Some(m), + loop { + if cancel_rx.try_recv().is_ok() { + return; + } + + line_buf.clear(); + match std::io::BufRead::read_until(&mut reader, b'\n', &mut line_buf) { + Ok(0) => break, + Ok(_) => { + let line_text = std::str::from_utf8(&line_buf) + .ok() + .map(|s| s.trim_end_matches(['\r', '\n'])) + .unwrap_or(""); + visual_heights.push(compute_line_visual_height( + line_text, + terminal_width, + json_format, + )); + } Err(_) => return, } - }; + } - if cancel_rx.try_recv().is_ok() { + if visual_heights.len() != line_index.line_count() { return; } - let reader = FileReader::from_parts(path, mmap, line_index); - let visual_heights = compute_visual_heights(&reader, terminal_width, json_format); let index = VisualHeightIndex::build(&visual_heights).with_params(json_format, terminal_width); @@ -1193,18 +1270,20 @@ mod tests { let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); let rx = spawn_indexer(temp.path().to_path_buf(), 1, 80, false, cancel_rx); - let msg = rx.recv_timeout(std::time::Duration::from_secs(10)).unwrap(); - - match msg { - IndexerMessage::Complete { - visual_height_index, - .. - } => { - let idx = visual_height_index.expect("should have visual height index"); - assert_eq!(idx.visual_height_of_line(0), 1); - assert_eq!(idx.visual_height_of_line(1), 1); + loop { + match rx.recv_timeout(std::time::Duration::from_secs(10)).unwrap() { + IndexerMessage::Progress { .. } => continue, + IndexerMessage::Complete { + visual_height_index, + .. + } => { + let idx = visual_height_index.expect("should have visual height index"); + assert_eq!(idx.visual_height_of_line(0), 1); + assert_eq!(idx.visual_height_of_line(1), 1); + break; + } + other => panic!("expected Complete, got {:?}", other), } - other => panic!("expected Complete, got {:?}", other), } } @@ -1258,6 +1337,72 @@ mod tests { idx.extend_from_heights(&[1, 2, 3]); assert_eq!(idx.total_visual_rows(), 6); - assert_eq!(idx.line_count(), 3); + } + + #[test] + fn test_spawn_indexer_file_truncated_during_scan() { + let mut content = Vec::new(); + for i in 0..100_000 { + writeln!(content, "line number {:08}", i).unwrap(); + } + let f = create_temp_file(&content); + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + + let rx = spawn_indexer(f.path().to_path_buf(), 1, 80, false, cancel_rx); + + { + use std::io::Write; + let _ = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(f.path()) + .unwrap(); + } + + let result = rx.recv_timeout(std::time::Duration::from_secs(10)); + match result { + Ok(IndexerMessage::Complete { reader, .. }) => { + assert!(reader.line_count() <= 100_000); + } + Ok(IndexerMessage::Error { .. }) => {} + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {} + Err(crossbeam_channel::RecvTimeoutError::Timeout) => {} + other => panic!("unexpected: {:?}", other), + } + } + + #[test] + fn test_spawn_visual_height_rebuild_line_count_mismatch_discards() { + let content = b"line0\nline1\nline2\n"; + let f = create_temp_file(content); + let data = std::fs::read(f.path()).unwrap(); + let index = LineIndex::from_bytes(&data); + IndexCache::save(f.path(), &index).unwrap(); + + { + use std::io::Write; + let mut file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(f.path()) + .unwrap(); + file.write_all(b"only_one_line\n").unwrap(); + } + + let (_cancel_tx, cancel_rx) = crossbeam_channel::bounded(1); + let rx = spawn_visual_height_rebuild( + f.path().to_path_buf(), + 1, + 80, + false, + cancel_rx, + ); + + let result = rx.recv_timeout(std::time::Duration::from_secs(5)); + match result { + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {} + Err(crossbeam_channel::RecvTimeoutError::Timeout) => {} + Ok(_) => panic!("should have been discarded due to line count mismatch"), + } } }