AirLibrary/Indexing/Background/
StartWatcher.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69 sync::{Mutex, RwLock, Semaphore},
70 task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75const MAX_WATCH_PROCESSORS:usize = 5;
77
78pub struct BackgroundIndexerContext {
80 pub app_state:Arc<ApplicationState>,
82
83 pub file_index:Arc<RwLock<FileIndex>>,
85
86 pub corruption_detected:Arc<Mutex<bool>>,
88
89 pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
91
92 pub indexing_semaphore:Arc<Semaphore>,
94
95 pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100 pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101 Self {
102 app_state,
103
104 file_index,
105
106 corruption_detected:Arc::new(Mutex::new(false)),
107
108 file_watcher:Arc::new(Mutex::new(None)),
109
110 indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
111
112 debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
113 }
114 }
115}
116
117pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
125 use notify::Watcher;
126
127 let index = context.file_index.clone();
128
129 let corruption_flag = context.corruption_detected.clone();
130
131 let config = context.app_state.Configuration.Indexing.clone();
132
133 let debounced_handler = context.debounced_handler.clone();
134
135 let mut watcher:notify::RecommendedWatcher = Watcher::new(
137 move |res:std::result::Result<notify::Event, notify::Error>| {
138 if let Ok(event) = res {
139 if *corruption_flag.blocking_lock() {
141 dev_log!(
142 "indexing",
143 "warn: [StartWatcher] Skipping file event - index marked as corrupted"
144 );
145 return;
146 }
147
148 let index = index.clone();
149 let _index = index.clone();
151 let debounced_handler = debounced_handler.clone();
152 let _config_clone = config.clone();
153
154 tokio::spawn(async move {
155 if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
157 for path in &event.paths {
158 if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
159 path,
160 &crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
161 ) {
162 debounced_handler.AddChange(path.clone(), change_type).await;
163 }
164 }
165 }
166 });
167 }
168 },
169 notify::Config::default(),
170 )
171 .map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
172
173 for path in &paths {
175 if path.exists() {
176 match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
177 Ok(()) => {
178 watcher
179 .watch(path, notify::RecursiveMode::Recursive)
180 .map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
181
182 dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
183 },
184
185 Err(e) => {
186 dev_log!(
187 "indexing",
188 "warn: [StartWatcher] Skipping invalid watch path {}: {}",
189 path.display(),
190 e
191 );
192 },
193 }
194 }
195 }
196
197 *context.file_watcher.lock().await = Some(watcher);
198
199 dev_log!(
200 "indexing",
201 "[StartWatcher] File watcher started successfully for {} paths",
202 paths.len()
203 );
204
205 Ok(())
206}
207
208pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
210 tokio::spawn(async move {
211 dev_log!("indexing", "[StartWatcher] Debounce processor started");
212 let interval = Duration::from_millis(100); let debounce_cutoff = Duration::from_millis(500);
215
216 loop {
217 tokio::time::sleep(interval).await;
218 {
219 if *context.corruption_detected.lock().await {
221 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
222 tokio::time::sleep(Duration::from_secs(5)).await;
223 continue;
224 }
225
226 let config = context.app_state.Configuration.Indexing.clone();
228
229 match context
230 .debounced_handler
231 .ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
232 .await
233 {
234 Ok(changes) => {
235 if !changes.is_empty() {
236 dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
237 }
238 },
239 Err(e) => {
240 dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
241 },
242 }
243 }
244 }
245 })
246}
247
248pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
250 let config = &context.app_state.Configuration.Indexing;
251
252 if !config.Enabled {
253 dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
254
255 return Err(AirError::Configuration("Background indexing is disabled".to_string()));
256 }
257
258 let handle = tokio::spawn(BackgroundTask(context));
259
260 dev_log!("indexing", "[StartWatcher] Background tasks started");
261
262 Ok(handle)
263}
264
265pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
267 dev_log!("indexing", "[StartWatcher] Stopping background tasks"); }
269
270pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
272 if let Some(watcher) = context.file_watcher.lock().await.take() {
273 drop(watcher);
274
275 dev_log!("indexing", "[StartWatcher] File watcher stopped");
276 }
277}
278
279async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
281 let config = context.app_state.Configuration.Indexing.clone();
282
283 let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
284
285 let mut interval = tokio::time::interval(interval);
286
287 dev_log!(
288 "indexing",
289 "[StartWatcher] Background indexing configured for {} minute intervals",
290 config.UpdateIntervalMinutes
291 );
292
293 loop {
294 interval.tick().await;
295
296 {
297 if *context.corruption_detected.lock().await {
299 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
300
301 continue;
302 }
303
304 dev_log!("indexing", "[StartWatcher] Running periodic background index...");
305
306 let directories = config.IndexDirectory.clone();
308
309 if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
310 {
311 dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
312 }
313 }
314 }
315}
316
317pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
319 let is_running = context.file_watcher.lock().await.is_some();
320
321 let pending_count = context.debounced_handler.PendingCount().await;
322
323 let is_corrupted = *context.corruption_detected.lock().await;
324
325 WatcherStatus { is_running, pending_count, is_corrupted }
326}
327
328#[derive(Debug, Clone)]
330pub struct WatcherStatus {
331 pub is_running:bool,
332
333 pub pending_count:usize,
334
335 pub is_corrupted:bool,
336}
337
338pub async fn StartAll(
340 context:Arc<BackgroundIndexerContext>,
341
342 watch_paths:Vec<PathBuf>,
343) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
344 let watcher_handle = if config_watch_enabled(&context) {
345 match StartFileWatcher(&context, watch_paths).await {
346 Ok(()) => {
347 Some(StartDebounceProcessor(context.clone()))
349 },
350
351 Err(e) => {
352 dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
353
354 None
355 },
356 }
357 } else {
358 None
359 };
360
361 let background_handle = match StartBackgroundTasks(context.clone()).await {
362 Ok(handle) => Some(handle),
363
364 Err(e) => {
365 dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
366
367 None
368 },
369 };
370
371 Ok((watcher_handle, background_handle))
372}
373
374pub async fn StopAll(context:&BackgroundIndexerContext) {
376 StopBackgroundTasks(context).await;
377
378 StopFileWatcher(context).await;
379}
380
381fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }