diff --git a/crates/core/src/watcher/file_watcher.rs b/crates/core/src/watcher/file_watcher.rs index 001a391..c80aec5 100644 --- a/crates/core/src/watcher/file_watcher.rs +++ b/crates/core/src/watcher/file_watcher.rs @@ -1,2 +1,226 @@ -pub struct FileWatcher {/* TODO */} +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use crossbeam_channel::{bounded, Receiver, Sender}; +use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; + +use crate::error::Result; + +// ─── FileEvent ────────────────────────────────────────────────────────────── +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileEvent { + Appended { new_size: u64 }, + Truncated { new_size: u64 }, + Rotated { new_inode: u64 }, +} + +// ─── get_inode ────────────────────────────────────────────────────────────── + +#[cfg(unix)] +fn get_inode(path: &Path) -> std::io::Result { + use std::os::unix::fs::MetadataExt; + Ok(std::fs::metadata(path)?.ino()) +} + +#[cfg(not(unix))] +fn get_inode(_path: &Path) -> std::io::Result { + Ok(0) // rotation detection not supported on non-Unix +} + +// ─── WatchState ───────────────────────────────────────────────────────────── +struct WatchState { + last_size: u64, + last_inode: u64, +} + +// ─── FileWatcher ──────────────────────────────────────────────────────────── +pub struct FileWatcher { + rx: Receiver, + _watcher: RecommendedWatcher, +} + +impl FileWatcher { + pub fn watch(path: &Path) -> Result { + let (tx, rx): (Sender, Receiver) = bounded(100); + + let initial_size = std::fs::metadata(path)?.len(); + let initial_inode = get_inode(path).unwrap_or(0); + let state = Arc::new(Mutex::new(WatchState { + last_size: initial_size, + last_inode: initial_inode, + })); + let watch_path: PathBuf = path.to_path_buf(); + + let mut watcher = + notify::recommended_watcher(move |res: std::result::Result| { + let event = match res { + Ok(e) => e, + Err(_) => return, + }; + + match event.kind { + EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any => {} + _ => return, + } + + if !event.paths.iter().any(|p| p == &watch_path) { + return; + } + + let current_inode = get_inode(&watch_path).unwrap_or(0); + let current_size = std::fs::metadata(&watch_path).map(|m| m.len()).unwrap_or(0); + + let mut st = state.lock().unwrap(); + + if current_inode != 0 && st.last_inode != 0 && current_inode != st.last_inode { + let _ = tx.send(FileEvent::Rotated { + new_inode: current_inode, + }); + st.last_inode = current_inode; + st.last_size = current_size; + } else if current_size > st.last_size { + let _ = tx.send(FileEvent::Appended { + new_size: current_size, + }); + st.last_size = current_size; + st.last_inode = current_inode; + } else if current_size < st.last_size { + let _ = tx.send(FileEvent::Truncated { + new_size: current_size, + }); + st.last_size = current_size; + st.last_inode = current_inode; + } + })?; + + watcher.watch(path, RecursiveMode::NonRecursive)?; + + Ok(Self { + rx, + _watcher: watcher, + }) + } + + pub fn try_recv(&self) -> Option { + self.rx.try_recv().ok() + } +} + +// ─── SmartFollow ──────────────────────────────────────────────────────────── + pub struct SmartFollow {/* TODO */} + +// ─── Tests ────────────────────────────────────────────────────────────────── +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use std::thread; + use std::time::Duration; + + fn collect_events(watcher: &FileWatcher, timeout_ms: u64) -> Vec { + let mut events = Vec::new(); + let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms); + while std::time::Instant::now() < deadline { + if let Some(e) = watcher.try_recv() { + events.push(e); + } + thread::sleep(Duration::from_millis(50)); + } + // final drain + while let Some(e) = watcher.try_recv() { + events.push(e); + } + events + } + + #[test] + fn test_watcher_append() { + let dir = tempfile::tempdir().expect("create temp dir"); + let path = dir.path().join("test.log"); + + std::fs::write(&path, b"hello\n").expect("write initial"); + + let watcher = FileWatcher::watch(&path).expect("start watcher"); + + { + let mut f = std::fs::OpenOptions::new() + .append(true) + .open(&path) + .expect("open for append"); + f.write_all(b"world\n").expect("append data"); + } + + let events = collect_events(&watcher, 1000); + let appended: Vec<&FileEvent> = events + .iter() + .filter(|e| matches!(e, FileEvent::Appended { .. })) + .collect(); + assert!( + !appended.is_empty(), + "should detect append event, got: {events:?}" + ); + } + + #[test] + fn test_watcher_truncate() { + let dir = tempfile::tempdir().expect("create temp dir"); + let path = dir.path().join("test.log"); + + std::fs::write(&path, b"hello world this is a long line\n").expect("write initial"); + + let watcher = FileWatcher::watch(&path).expect("start watcher"); + + // File::create truncates to 0 bytes + let _ = std::fs::File::create(&path).expect("truncate file"); + + let events = collect_events(&watcher, 1000); + let truncated: Vec<&FileEvent> = events + .iter() + .filter(|e| matches!(e, FileEvent::Truncated { .. })) + .collect(); + assert!( + !truncated.is_empty(), + "should detect truncate event, got: {events:?}" + ); + } + + #[test] + fn test_watcher_idle() { + let dir = tempfile::tempdir().expect("create temp dir"); + let path = dir.path().join("idle.log"); + + std::fs::write(&path, b"static content\n").expect("write"); + + let watcher = FileWatcher::watch(&path).expect("start watcher"); + + thread::sleep(Duration::from_millis(300)); + assert_eq!(watcher.try_recv(), None, "no events on idle file"); + } + + #[test] + fn test_get_inode() { + let dir = tempfile::tempdir().expect("create temp dir"); + let path = dir.path().join("inode_test.txt"); + std::fs::write(&path, b"test").expect("write file"); + + let inode = get_inode(&path).expect("get inode"); + #[cfg(unix)] + assert!(inode > 0, "inode should be positive on Unix"); + #[cfg(not(unix))] + assert_eq!(inode, 0, "inode should be 0 on non-Unix"); + } + + #[test] + fn test_file_event_equality() { + let a = FileEvent::Appended { new_size: 100 }; + let b = FileEvent::Appended { new_size: 100 }; + assert_eq!(a, b); + + let c = FileEvent::Truncated { new_size: 0 }; + assert_ne!(a, c); + + let d = FileEvent::Rotated { new_inode: 42 }; + assert_ne!(a, d); + } +}