Skip to main content

AirLibrary/Indexing/Watch/
WatchFile.rs

1//! # WatchFile
2//!
3//! ## File: Indexing/Watch/WatchFile.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides file watching functionality for the File Indexer service,
8//! handling file system events for incremental index updates.
9//!
10//! ## Primary Responsibility
11//!
12//! Handle file system change events and trigger index updates for
13//! created, modified, and deleted files.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File creation event handling
18//! - File modification event handling
19//! - File deletion event handling
20//! - Directory change event handling
21//! - Event debouncing for rapid changes
22//!
23//! ## Dependencies
24//!
25//! **External Crates:**
26//! - `notify` - File system watching
27//! - `tokio` - Async runtime for event handling
28//!
29//! **Internal Modules:**
30//! - `crate::Result` - Error handling type
31//! - `crate::AirError` - Error types
32//! - `super::super::FileIndex` - Index structure definitions
33//! - `super::super::Store::UpdateIndex` - Index update operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::Background::StartWatcher` - Watcher setup and management
38//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
39//!
40//! ## VSCode Pattern Reference
41//!
42//! Inspired by VSCode's file watching in
43//! `src/vs/base/node/watcher/`
44//!
45//! ## Security Considerations
46//!
47//! - Path validation before watching
48//! - Symbolic link following disabled
49//! - Permission checking on watch paths
50//!
51//! ## Performance Considerations
52//!
53//! - Event debouncing prevents excessive updates
54//! - Batch processing of multiple events
55//! - Efficient event filtering
56//!
57//! ## Error Handling Strategy
58//!
59//! Event operations log warnings for individual errors and continue,
60//! ensuring a single event failure doesn't stop the watcher.
61//!
62//! ## Thread Safety
63//!
64//! Event handlers acquire write locks on shared state and process
65//! events asynchronously to avoid blocking the watcher loop.
66
67use std::path::PathBuf;
68
69use tokio::sync::{Mutex, RwLock};
70
71use crate::{AirError, Configuration::IndexingConfig, Indexing::State::CreateState::FileIndex, Result, dev_log};
72
73/// Handle file watcher event for incremental indexing
74///
75/// This function processes file system events and updates the index
76/// accordingly:
77/// - File Created: Index the new file
78/// - File Modified: Re-index the modified file
79/// - File Removed: Remove from index
80pub async fn HandleFileEvent(event:notify::Event, index_arc:&RwLock<FileIndex>, config:&IndexingConfig) -> Result<()> {
81	match event.kind {
82		notify::EventKind::Create(notify::event::CreateKind::File) => {
83			for path in event.paths {
84				dev_log!("indexing", "[WatchFile] File created: {}", path.display());
85
86				let mut index = index_arc.write().await;
87
88				if let Err(e) = crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config).await {
89					dev_log!(
90						"indexing",
91						"warn: [WatchFile] Failed to index new file {}: {}",
92						path.display(),
93						e
94					);
95				}
96			}
97		},
98
99		notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
100		| notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
101			for path in event.paths {
102				dev_log!("indexing", "[WatchFile] File modified: {}", path.display());
103
104				let mut index = index_arc.write().await;
105
106				if let Err(e) = crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config).await {
107					dev_log!(
108						"indexing",
109						"warn: [WatchFile] Failed to re-index modified file {}: {}",
110						path.display(),
111						e
112					);
113				}
114			}
115		},
116
117		notify::EventKind::Remove(notify::event::RemoveKind::File) => {
118			for path in event.paths {
119				dev_log!("indexing", "[WatchFile] File removed: {}", path.display());
120
121				let mut index = index_arc.write().await;
122
123				if let Err(e) = crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &path) {
124					dev_log!(
125						"indexing",
126						"warn: [WatchFile] Failed to remove file from index {}: {}",
127						path.display(),
128						e
129					);
130				}
131			}
132		},
133
134		notify::EventKind::Create(notify::event::CreateKind::Folder) => {
135			for path in event.paths {
136				dev_log!("indexing", "[WatchFile] Directory created: {}", path.display()); // Directories themselves don't need indexing, just their
137				// contents
138			}
139		},
140
141		notify::EventKind::Remove(notify::event::RemoveKind::Folder) => {
142			for path in event.paths {
143				dev_log!("indexing", "[WatchFile] Directory removed: {}", path.display()); // Remove all files from this directory
144				let mut index = index_arc.write().await;
145
146				let mut paths_to_remove = Vec::new();
147
148				for indexed_path in index.files.keys() {
149					if indexed_path.starts_with(&path) {
150						paths_to_remove.push(indexed_path.clone());
151					}
152				}
153
154				for indexed_path in paths_to_remove {
155					if let Err(e) = crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &indexed_path)
156					{
157						dev_log!(
158							"indexing",
159							"warn: [WatchFile] Failed to remove file {}: {}",
160							indexed_path.display(),
161							e
162						);
163					}
164				}
165			}
166		},
167
168		_ => {
169			// Ignore other event types
170			dev_log!("indexing", "ignored event kind: {:?}", event.kind);
171		},
172	}
173
174	Ok(())
175}
176
177/// Debounced file change handler
178///
179/// Prevents rapid successive changes from causing excessive re-indexing
180pub struct DebouncedEventHandler {
181	pending_changes:Mutex<std::collections::HashMap<PathBuf, FileChangeInfo>>,
182}
183
184impl DebouncedEventHandler {
185	pub fn new() -> Self { Self { pending_changes:Mutex::new(std::collections::HashMap::new()) } }
186
187	/// Add a file change event
188	pub async fn AddChange(&self, path:PathBuf, change_type:FileChangeType) {
189		let mut pending = self.pending_changes.lock().await;
190
191		let now = std::time::Instant::now();
192
193		match pending.get_mut(&path) {
194			Some(change_info) => {
195				change_info.last_seen = now;
196
197				change_info.change_type = change_type.max(change_info.change_type);
198
199				change_info.suppressed_count += 1;
200			},
201
202			None => {
203				pending.insert(
204					path.clone(),
205					FileChangeInfo { path:path.clone(), change_type, last_seen:now, suppressed_count:0 },
206				);
207			},
208		}
209	}
210
211	/// Process pending changes older than the specified duration
212	pub async fn ProcessPendingChanges(
213		&self,
214
215		age_cutoff:std::time::Duration,
216
217		index_arc:&RwLock<FileIndex>,
218
219		config:&IndexingConfig,
220	) -> Result<Vec<ProcessedChange>> {
221		let mut processed = Vec::new();
222
223		let expired_paths = {
224			let mut pending = self.pending_changes.lock().await;
225
226			let mut expired = Vec::new();
227
228			for (path, change_info) in pending.iter() {
229				if change_info.last_seen.elapsed() >= age_cutoff {
230					expired.push((path.clone(), change_info.clone()));
231				}
232			}
233
234			// Remove expired entries
235			for (path, _) in &expired {
236				pending.remove(path);
237			}
238
239			expired
240		};
241
242		for (path, change_info) in expired_paths {
243			dev_log!(
244				"indexing",
245				"[WatchFile] Processing debounced change for {} (suppressed: {})",
246				path.display(),
247				change_info.suppressed_count
248			);
249
250			let result = match change_info.change_type {
251				FileChangeType::Created => {
252					let mut index = index_arc.write().await;
253
254					crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config)
255						.await
256						.map(|_| ProcessedChangeResult::Success)
257						.unwrap_or(ProcessedChangeResult::Failed)
258				},
259
260				FileChangeType::Modified => {
261					let mut index = index_arc.write().await;
262
263					super::super::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config)
264						.await
265						.map(|_| ProcessedChangeResult::Success)
266						.unwrap_or(ProcessedChangeResult::Failed)
267				},
268
269				FileChangeType::Removed => {
270					let mut index = index_arc.write().await;
271
272					crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &path)
273						.map(|_| ProcessedChangeResult::Success)
274						.unwrap_or(ProcessedChangeResult::Failed)
275				},
276			};
277
278			processed.push(ProcessedChange {
279				path,
280				change_type:change_info.change_type,
281				suppressed_count:change_info.suppressed_count,
282				result,
283			});
284		}
285
286		Ok(processed)
287	}
288
289	/// Clear all pending changes
290	pub async fn ClearPending(&self) -> usize {
291		let mut pending = self.pending_changes.lock().await;
292
293		let count = pending.len();
294
295		pending.clear();
296
297		count
298	}
299
300	/// Get the number of pending changes
301	pub async fn PendingCount(&self) -> usize {
302		let pending = self.pending_changes.lock().await;
303
304		pending.len()
305	}
306}
307
308impl Default for DebouncedEventHandler {
309	fn default() -> Self { Self::new() }
310}
311
312/// File change type for debouncing
313#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
314pub enum FileChangeType {
315	Created,
316
317	Modified,
318
319	Removed,
320}
321
322impl FileChangeType {
323	pub fn max(self, other:Self) -> Self {
324		// Removed takes precedence over Modified, which takes precedence over Created
325		match (self, other) {
326			(Self::Removed, _) | (_, Self::Removed) => Self::Removed,
327
328			(Self::Modified, _) | (_, Self::Modified) => Self::Modified,
329
330			(Self::Created, Self::Created) => Self::Created,
331		}
332	}
333}
334
335/// File change information for debouncing
336#[derive(Debug, Clone)]
337struct FileChangeInfo {
338	#[allow(dead_code)]
339	path:PathBuf,
340
341	change_type:FileChangeType,
342
343	last_seen:std::time::Instant,
344
345	suppressed_count:usize,
346}
347
348/// Result of processing a debounced change
349#[derive(Debug, Clone)]
350pub enum ProcessedChangeResult {
351	Success,
352
353	Failed,
354}
355
356/// Describes a processed file change
357#[derive(Debug, Clone)]
358pub struct ProcessedChange {
359	pub path:PathBuf,
360
361	pub change_type:FileChangeType,
362
363	pub suppressed_count:usize,
364
365	pub result:ProcessedChangeResult,
366}
367
368/// Convert notify event kind to FileChangeType
369pub fn EventKindToChangeType(kind:notify::EventKind) -> Option<FileChangeType> {
370	match kind {
371		notify::EventKind::Create(_) => Some(FileChangeType::Created),
372
373		notify::EventKind::Modify(_) => Some(FileChangeType::Modified),
374
375		notify::EventKind::Remove(_) => Some(FileChangeType::Removed),
376
377		_ => None,
378	}
379}
380
381/// Check if a path should be watched (not in ignored paths)
382pub fn ShouldWatchPath(path:&PathBuf, ignored_patterns:&[String]) -> bool {
383	let path_str = path.to_string_lossy();
384
385	// Check against ignore patterns
386	for pattern in ignored_patterns {
387		if path_str.contains(pattern) {
388			return false;
389		}
390	}
391
392	true
393}
394
395/// Get default ignored patterns for file watching
396pub fn GetDefaultIgnoredPatterns() -> Vec<String> {
397	vec![
398		"node_modules".to_string(),
399		"target".to_string(),
400		".git".to_string(),
401		".svn".to_string(),
402		".hg".to_string(),
403		".bzr".to_string(),
404		"dist".to_string(),
405		"build".to_string(),
406		".next".to_string(),
407		".nuxt".to_string(),
408		"__pycache__".to_string(),
409		"*.pyc".to_string(),
410		".venv".to_string(),
411		"venv".to_string(),
412		"env".to_string(),
413		".env".to_string(),
414		".idea".to_string(),
415		".vscode".to_string(),
416		".DS_Store".to_string(),
417		"Thumbs.db".to_string(),
418		"*.swp".to_string(),
419		"*.tmp".to_string(),
420	]
421}
422
423/// Validate that a watch path exists and is accessible
424pub fn ValidateWatchPath(path:&PathBuf) -> Result<()> {
425	if !path.exists() {
426		return Err(AirError::FileSystem(format!("Watch path does not exist: {}", path.display())));
427	}
428
429	if !path.is_dir() {
430		return Err(AirError::FileSystem(format!(
431			"Watch path is not a directory: {}",
432			path.display()
433		)));
434	}
435
436	// Check read access
437	std::fs::read_dir(path)
438		.map_err(|e| AirError::FileSystem(format!("Cannot access watch path {}: {}", path.display(), e)))?;
439
440	Ok(())
441}