feat(core): implement FileWatcher for live file tailing
Complete FileWatcher implementation using notify 8.x crate with get_inode() for cross-platform file identity. Support append detection for incremental index updates and truncate detection for full reloads. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -1,2 +1,226 @@
|
||||
pub struct FileWatcher {/* TODO */}
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crossbeam_channel::{bounded, Receiver, Sender};
|
||||
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
// ─── FileEvent ──────────────────────────────────────────────────────────────
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum FileEvent {
|
||||
Appended { new_size: u64 },
|
||||
Truncated { new_size: u64 },
|
||||
Rotated { new_inode: u64 },
|
||||
}
|
||||
|
||||
// ─── get_inode ──────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(unix)]
|
||||
fn get_inode(path: &Path) -> std::io::Result<u64> {
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
Ok(std::fs::metadata(path)?.ino())
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn get_inode(_path: &Path) -> std::io::Result<u64> {
|
||||
Ok(0) // rotation detection not supported on non-Unix
|
||||
}
|
||||
|
||||
// ─── WatchState ─────────────────────────────────────────────────────────────
|
||||
struct WatchState {
|
||||
last_size: u64,
|
||||
last_inode: u64,
|
||||
}
|
||||
|
||||
// ─── FileWatcher ────────────────────────────────────────────────────────────
|
||||
pub struct FileWatcher {
|
||||
rx: Receiver<FileEvent>,
|
||||
_watcher: RecommendedWatcher,
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub fn watch(path: &Path) -> Result<Self> {
|
||||
let (tx, rx): (Sender<FileEvent>, Receiver<FileEvent>) = bounded(100);
|
||||
|
||||
let initial_size = std::fs::metadata(path)?.len();
|
||||
let initial_inode = get_inode(path).unwrap_or(0);
|
||||
let state = Arc::new(Mutex::new(WatchState {
|
||||
last_size: initial_size,
|
||||
last_inode: initial_inode,
|
||||
}));
|
||||
let watch_path: PathBuf = path.to_path_buf();
|
||||
|
||||
let mut watcher =
|
||||
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
|
||||
let event = match res {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
match event.kind {
|
||||
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {}
|
||||
_ => return,
|
||||
}
|
||||
|
||||
if !event.paths.iter().any(|p| p == &watch_path) {
|
||||
return;
|
||||
}
|
||||
|
||||
let current_inode = get_inode(&watch_path).unwrap_or(0);
|
||||
let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
let mut st = state.lock().unwrap();
|
||||
|
||||
if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode {
|
||||
let _ = tx.send(FileEvent::Rotated {
|
||||
new_inode: current_inode,
|
||||
});
|
||||
st.last_inode = current_inode;
|
||||
st.last_size = current_size;
|
||||
} else if current_size > st.last_size {
|
||||
let _ = tx.send(FileEvent::Appended {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
} else if current_size < st.last_size {
|
||||
let _ = tx.send(FileEvent::Truncated {
|
||||
new_size: current_size,
|
||||
});
|
||||
st.last_size = current_size;
|
||||
st.last_inode = current_inode;
|
||||
}
|
||||
})?;
|
||||
|
||||
watcher.watch(path, RecursiveMode::NonRecursive)?;
|
||||
|
||||
Ok(Self {
|
||||
rx,
|
||||
_watcher: watcher,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_recv(&self) -> Option<FileEvent> {
|
||||
self.rx.try_recv().ok()
|
||||
}
|
||||
}
|
||||
|
||||
// ─── SmartFollow ────────────────────────────────────────────────────────────
|
||||
|
||||
pub struct SmartFollow {/* TODO */}
|
||||
|
||||
// ─── Tests ──────────────────────────────────────────────────────────────────
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Write;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
fn collect_events(watcher: &FileWatcher, timeout_ms: u64) -> Vec<FileEvent> {
|
||||
let mut events = Vec::new();
|
||||
let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms);
|
||||
while std::time::Instant::now() < deadline {
|
||||
if let Some(e) = watcher.try_recv() {
|
||||
events.push(e);
|
||||
}
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
// final drain
|
||||
while let Some(e) = watcher.try_recv() {
|
||||
events.push(e);
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watcher_append() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let path = dir.path().join("test.log");
|
||||
|
||||
std::fs::write(&path, b"hello\n").expect("write initial");
|
||||
|
||||
let watcher = FileWatcher::watch(&path).expect("start watcher");
|
||||
|
||||
{
|
||||
let mut f = std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.expect("open for append");
|
||||
f.write_all(b"world\n").expect("append data");
|
||||
}
|
||||
|
||||
let events = collect_events(&watcher, 1000);
|
||||
let appended: Vec<&FileEvent> = events
|
||||
.iter()
|
||||
.filter(|e| matches!(e, FileEvent::Appended { .. }))
|
||||
.collect();
|
||||
assert!(
|
||||
!appended.is_empty(),
|
||||
"should detect append event, got: {events:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watcher_truncate() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let path = dir.path().join("test.log");
|
||||
|
||||
std::fs::write(&path, b"hello world this is a long line\n").expect("write initial");
|
||||
|
||||
let watcher = FileWatcher::watch(&path).expect("start watcher");
|
||||
|
||||
// File::create truncates to 0 bytes
|
||||
let _ = std::fs::File::create(&path).expect("truncate file");
|
||||
|
||||
let events = collect_events(&watcher, 1000);
|
||||
let truncated: Vec<&FileEvent> = events
|
||||
.iter()
|
||||
.filter(|e| matches!(e, FileEvent::Truncated { .. }))
|
||||
.collect();
|
||||
assert!(
|
||||
!truncated.is_empty(),
|
||||
"should detect truncate event, got: {events:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watcher_idle() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let path = dir.path().join("idle.log");
|
||||
|
||||
std::fs::write(&path, b"static content\n").expect("write");
|
||||
|
||||
let watcher = FileWatcher::watch(&path).expect("start watcher");
|
||||
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
assert_eq!(watcher.try_recv(), None, "no events on idle file");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_inode() {
|
||||
let dir = tempfile::tempdir().expect("create temp dir");
|
||||
let path = dir.path().join("inode_test.txt");
|
||||
std::fs::write(&path, b"test").expect("write file");
|
||||
|
||||
let inode = get_inode(&path).expect("get inode");
|
||||
#[cfg(unix)]
|
||||
assert!(inode > 0, "inode should be positive on Unix");
|
||||
#[cfg(not(unix))]
|
||||
assert_eq!(inode, 0, "inode should be 0 on non-Unix");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_event_equality() {
|
||||
let a = FileEvent::Appended { new_size: 100 };
|
||||
let b = FileEvent::Appended { new_size: 100 };
|
||||
assert_eq!(a, b);
|
||||
|
||||
let c = FileEvent::Truncated { new_size: 0 };
|
||||
assert_ne!(a, c);
|
||||
|
||||
let d = FileEvent::Rotated { new_inode: 42 };
|
||||
assert_ne!(a, d);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user