AirLibrary/Indexing/Store/
UpdateIndex.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
71
72use tokio::{
73 sync::{RwLock, Semaphore},
74 time::Instant,
75};
76
77use crate::{
78 AirError,
79 Configuration::IndexingConfig,
80 Indexing::State::CreateState::{FileIndex, FileMetadata},
81 Result,
82 dev_log,
83};
84
85pub async fn UpdateSingleFile(
87 index:&mut FileIndex,
88
89 file_path:&PathBuf,
90
91 config:&IndexingConfig,
92) -> Result<Option<FileMetadata>> {
93 let start_time = Instant::now();
94
95 if !file_path.exists() {
97 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
99
100 dev_log!("indexing", "[UpdateIndex] Removed deleted file: {}", file_path.display());
101
102 return Ok(None);
103 }
104
105 let current_metadata = std::fs::metadata(file_path)
107 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
108
109 let current_modified = current_metadata
110 .modified()
111 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
112
113 let _current_modified_time = chrono::DateTime::<chrono::Utc>::from(current_modified);
114
115 let needs_update = match index.files.get(file_path) {
117 Some(old_metadata) => {
118 let checksum = crate::Indexing::Scan::ScanFile::CalculateChecksum(
120 &tokio::fs::read(file_path).await.unwrap_or_default(),
121 );
122
123 old_metadata.checksum != checksum
124 },
125
126 None => {
127 true
129 },
130 };
131
132 if !needs_update {
133 dev_log!("indexing", "file unchanged: {}", file_path.display());
134
135 return Ok(index.files.get(file_path).cloned());
136 }
137
138 use crate::Indexing::{Scan::ScanFile::IndexFileInternal, State::UpdateState::UpdateIndexMetadata};
140
141 let (metadata, symbols) = IndexFileInternal(file_path, config, &[]).await?;
142
143 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
145
146 crate::Indexing::State::UpdateState::AddFileToIndex(index, file_path.clone(), metadata.clone(), symbols)?;
147
148 UpdateIndexMetadata(index)?;
150
151 let elapsed = start_time.elapsed();
152
153 dev_log!(
154 "indexing",
155 "updated {} in {}ms ({} symbols)",
156 file_path.display(),
157 elapsed.as_millis(),
158 metadata.symbol_count
159 );
160
161 Ok(Some(metadata))
162}
163
164pub async fn UpdateFileContent(index:&mut FileIndex, file_path:&PathBuf, metadata:&FileMetadata) -> Result<()> {
166 if !metadata.mime_type.starts_with("text/") && !metadata.mime_type.contains("json") {
168 return Ok(());
169 }
170
171 let content = tokio::fs::read_to_string(file_path)
172 .await
173 .map_err(|e| AirError::FileSystem(format!("Failed to read file content: {}", e)))?;
174
175 for (_, files) in index.content_index.iter_mut() {
177 files.retain(|p| p != file_path);
178 }
179
180 let tokens = crate::Indexing::Process::ProcessContent::TokenizeContent(&content);
182
183 for token in tokens {
184 if token.len() > 2 {
185 index
187 .content_index
188 .entry(token)
189 .or_insert_with(Vec::new)
190 .push(file_path.clone());
191 }
192 }
193
194 Ok(())
195}
196
197pub async fn UpdateFilesBatch(
199 index:&mut FileIndex,
200
201 file_paths:Vec<PathBuf>,
202
203 config:&IndexingConfig,
204) -> Result<UpdateBatchResult> {
205 let start_time = Instant::now();
206
207 let mut updated_count = 0u32;
208
209 let mut removed_count = 0u32;
210
211 let mut error_count = 0u32;
212
213 let mut total_size = 0u64;
214
215 for file_path in file_paths {
216 match UpdateSingleFile(index, &file_path, config).await {
217 Ok(Some(metadata)) => {
218 updated_count += 1;
219
220 total_size += metadata.size;
221 },
222
223 Ok(None) => {
224 removed_count += 1;
225 },
226
227 Err(e) => {
228 dev_log!(
229 "indexing",
230 "warn: [UpdateIndex] Failed to update file {}: {}",
231 file_path.display(),
232 e
233 );
234
235 error_count += 1;
236 },
237 }
238 }
239
240 crate::Indexing::State::UpdateState::UpdateIndexMetadata(index)?;
242
243 Ok(UpdateBatchResult {
244 updated_count,
245 removed_count,
246 error_count,
247 total_size,
248 duration_seconds:start_time.elapsed().as_secs_f64(),
249 })
250}
251
252#[derive(Debug, Clone)]
254pub struct UpdateBatchResult {
255 pub updated_count:u32,
256
257 pub removed_count:u32,
258
259 pub error_count:u32,
260
261 pub total_size:u64,
262
263 pub duration_seconds:f64,
264}
265
266pub struct DebouncedUpdate {
268 file_path:PathBuf,
269
270 last_seen:Instant,
271
272 index:*const RwLock<FileIndex>,
273
274 config:IndexingConfig,
275
276 duration:Duration,
277
278 pending:bool,
279}
280
281unsafe impl Send for DebouncedUpdate {}
282
283impl DebouncedUpdate {
284 pub fn new(file_path:PathBuf, index:&RwLock<FileIndex>, config:&IndexingConfig, duration:Duration) -> Self {
285 Self {
286 file_path,
287
288 last_seen:Instant::now(),
289
290 index:index as *const RwLock<FileIndex>,
291
292 config:config.clone(),
293
294 duration,
295
296 pending:false,
297 }
298 }
299
300 pub async fn trigger(&mut self) {
301 self.last_seen = Instant::now();
302
303 self.pending = true;
304 }
305
306 pub async fn process_if_ready(&mut self) -> Result<bool> {
307 if !self.pending {
308 return Ok(false);
309 }
310
311 if self.last_seen.elapsed() >= self.duration {
312 self.pending = false;
313
314 let index_ref = unsafe { &*self.index };
316
317 let mut index = index_ref.write().await;
318
319 match UpdateSingleFile(&mut index, &self.file_path, &self.config).await {
320 Ok(_) => {
321 dev_log!(
322 "indexing",
323 "[UpdateIndex] Debounced update completed: {}",
324 self.file_path.display()
325 );
326
327 return Ok(true);
328 },
329
330 Err(e) => {
331 dev_log!("indexing", "warn: [UpdateIndex] Debounced update failed: {}", e);
332
333 return Err(e);
334 },
335 }
336 }
337
338 Ok(false)
339 }
340
341 pub fn clear_pending(&mut self) { self.pending = false; }
342}
343
344pub async fn ProcessWatcherEvent(
346 index:&mut FileIndex,
347
348 event:notify::Event,
349
350 config:&IndexingConfig,
351) -> Result<WatcherEventResult> {
352 let mut updated = 0u32;
353
354 let mut removed = 0u32;
355
356 for file_path in event.paths {
357 match event.kind {
358 notify::EventKind::Create(notify::event::CreateKind::File) => {
359 dev_log!("indexing", "[UpdateIndex] File created: {}", file_path.display());
360
361 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
362 updated += 1;
363 }
364 },
365
366 notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
367 | notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
368 dev_log!("indexing", "[UpdateIndex] File modified: {}", file_path.display());
369
370 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
371 updated += 1;
372 }
373 },
374
375 notify::EventKind::Remove(notify::event::RemoveKind::File) => {
376 dev_log!("indexing", "[UpdateIndex] File removed: {}", file_path.display());
377
378 if super::super::State::UpdateState::RemoveFileFromIndex(index, &file_path).is_ok() {
379 removed += 1;
380 }
381 },
382
383 _ => {},
384 }
385 }
386
387 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
389
390 Ok(WatcherEventResult { updated, removed })
391}
392
393#[derive(Debug, Clone)]
395pub struct WatcherEventResult {
396 pub updated:u32,
397
398 pub removed:u32,
399}
400
401pub async fn CleanupRemovedFiles(index:&mut FileIndex) -> Result<u32> {
403 let mut paths_to_remove = Vec::new();
404
405 let all_paths:Vec<_> = index.files.keys().cloned().collect();
406
407 for path in all_paths {
408 if !path.exists() {
409 paths_to_remove.push(path);
410 }
411 }
412
413 for path in &paths_to_remove {
414 super::super::State::UpdateState::RemoveFileFromIndex(index, path)?;
415 }
416
417 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
419
420 dev_log!("indexing", "[UpdateIndex] Cleaned up {} removed files", paths_to_remove.len());
421
422 Ok(paths_to_remove.len() as u32)
423}
424
425pub async fn RebuildIndex(
427 index:&mut FileIndex,
428
429 directories:Vec<String>,
430
431 patterns:Vec<String>,
432
433 config:&IndexingConfig,
434) -> Result<UpdateBatchResult> {
435 let start_time = Instant::now();
436
437 index.files.clear();
439
440 index.content_index.clear();
441
442 index.symbol_index.clear();
443
444 index.file_symbols.clear();
445
446 let (files_to_index, scan_result) =
448 crate::Indexing::Scan::ScanDirectory::ScanDirectoriesParallel(directories, patterns, config, 10).await?;
449
450 let semaphore = Arc::new(Semaphore::new(config.MaxParallelIndexing as usize));
452
453 let index_arc = Arc::new(RwLock::new(index.clone()));
454
455 let mut tasks = Vec::new();
456
457 for file_path in files_to_index {
458 let permit = semaphore.clone().acquire_owned().await.unwrap();
459
460 let _index_ref = index_arc.clone();
462
463 let config_clone = config.clone();
464
465 let task = tokio::spawn(async move {
466 let _permit = permit;
467
468 crate::Indexing::Scan::ScanFile::IndexFileInternal(&file_path, &config_clone, &[]).await
469 });
470
471 tasks.push(task);
472 }
473
474 let mut updated_count = 0u32;
475
476 let mut total_size = 0u64;
477
478 for task in tasks {
479 match task.await {
480 Ok(Ok((metadata, symbols))) => {
481 let file_size = metadata.size;
482
483 super::super::State::UpdateState::AddFileToIndex(index, metadata.path.clone(), metadata, symbols)?;
484
485 updated_count += 1;
486
487 total_size += file_size;
488 },
489
490 Ok(Err(e)) => {
491 dev_log!("indexing", "warn: [UpdateIndex] Rebuild task failed: {}", e);
492 },
493
494 Err(e) => {
495 dev_log!("indexing", "warn: [UpdateIndex] Rebuild task join failed: {}", e);
496 },
497 }
498 }
499
500 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
502
503 Ok(UpdateBatchResult {
504 updated_count,
505 removed_count:0,
506 error_count:scan_result.errors,
507 total_size,
508 duration_seconds:start_time.elapsed().as_secs_f64(),
509 })
510}
511
512pub async fn ValidateAndRepairIndex(index:&mut FileIndex) -> Result<RepairResult> {
514 let start_time = Instant::now();
515
516 let mut repaired_files = 0u32;
517
518 let removed_orphans;
519
520 match super::super::State::UpdateState::ValidateIndexConsistency(index) {
522 Ok(()) => {},
523
524 Err(e) => {
525 dev_log!("indexing", "warn: [UpdateIndex] Index validation failed: {}", e);
526
527 repaired_files += 1;
528 },
529 }
530
531 removed_orphans = super::super::State::UpdateState::CleanupOrphanedEntries(index)?;
533
534 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
536
537 Ok(RepairResult {
538 repaired_files,
539 removed_orphans,
540 duration_seconds:start_time.elapsed().as_secs_f64(),
541 })
542}
543
544#[derive(Debug, Clone)]
546pub struct RepairResult {
547 pub repaired_files:u32,
548
549 pub removed_orphans:u32,
550
551 pub duration_seconds:f64,
552}