1use std::{
93 path::{Path, PathBuf},
94 sync::Arc,
95 time::{Duration, Instant},
96};
97
98use serde::{Deserialize, Serialize};
99use tokio::{
100 fs,
101 sync::{RwLock, broadcast, mpsc},
102 time::sleep,
103};
104use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Result as NotifyResult, Watcher};
105use chrono::{DateTime, Utc};
106
107use crate::{AirError, Configuration::AirConfiguration, Result, dev_log};
108
109pub struct ConfigHotReload {
115 active_config:Arc<RwLock<AirConfiguration>>,
117
118 previous_config:Arc<RwLock<Option<AirConfiguration>>>,
120
121 last_config_hash:Arc<RwLock<Option<String>>>,
123
124 config_path:PathBuf,
126
127 watcher:Option<Arc<RwLock<notify::RecommendedWatcher>>>,
129
130 change_sender:broadcast::Sender<ConfigChangeEvent>,
132
133 reload_tx:mpsc::Sender<ReloadRequest>,
135
136 change_history:Arc<RwLock<Vec<ConfigChangeRecord>>>,
138
139 last_reload:Arc<RwLock<Option<DateTime<Utc>>>>,
141
142 last_reload_duration:Arc<RwLock<Option<Duration>>>,
144
145 enabled:Arc<RwLock<bool>>,
147
148 debounce_delay:Duration,
150
151 last_change_time:Arc<RwLock<Option<Instant>>>,
153
154 stats:Arc<RwLock<ReloadStats>>,
156
157 validators:Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
159
160 max_retries:u32,
162
163 retry_delay:Duration,
165
166 auto_rollback_enabled:Arc<RwLock<bool>>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct ConfigChangeEvent {
173 pub timestamp:DateTime<Utc>,
174
175 pub old_config_hash:Option<String>,
176
177 pub new_config_hash:String,
178
179 pub changes:Vec<ConfigChange>,
180
181 pub success:bool,
182}
183
184pub enum ReloadRequest {
186 Manual,
188
189 Signal,
191
192 FileChange,
194
195 Periodic,
197}
198
199#[derive(Debug, Clone, Default)]
201pub struct ReloadStats {
202 total_attempts:u64,
203
204 successful_reloads:u64,
205
206 failed_reloads:u64,
207
208 validation_errors:u64,
209
210 parse_errors:u64,
211
212 rollback_attempts:u64,
213
214 last_error:Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ConfigChangeRecord {
220 pub timestamp:DateTime<Utc>,
221
222 pub changes:Vec<ConfigChange>,
223
224 pub validated:bool,
225
226 pub reason:String,
227
228 pub rollback_performed:bool,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ConfigChange {
234 pub path:String,
235
236 pub old_value:serde_json::Value,
237
238 pub new_value:serde_json::Value,
239}
240
241pub trait ConfigValidator: Send + Sync {
243 fn validate(&self, config:&AirConfiguration) -> Result<()>;
245
246 fn name(&self) -> &str;
248
249 fn priority(&self) -> u32 { 0 }
251}
252
253pub struct gRPCConfigValidator;
259
260impl ConfigValidator for gRPCConfigValidator {
261 fn validate(&self, config:&AirConfiguration) -> Result<()> {
262 if config.gRPC.BindAddress.is_empty() {
263 return Err(AirError::Configuration("gRPC bind address cannot be empty".to_string()));
264 }
265
266 if !crate::Configuration::ConfigurationManager::IsValidAddress(&config.gRPC.BindAddress) {
268 return Err(AirError::Configuration(format!(
269 "Invalid gRPC bind address '{}': must be host:port or [IPv6]:port",
270 config.gRPC.BindAddress
271 )));
272 }
273
274 if config.gRPC.MaxConnections < 10 || config.gRPC.MaxConnections > 10000 {
276 return Err(AirError::Configuration(format!(
277 "gRPC MaxConnections {} is out of range [10, 10000]",
278 config.gRPC.MaxConnections
279 )));
280 }
281
282 if config.gRPC.RequestTimeoutSecs < 1 || config.gRPC.RequestTimeoutSecs > 3600 {
284 return Err(AirError::Configuration(format!(
285 "gRPC RequestTimeoutSecs {} is out of range [1, 3600]",
286 config.gRPC.RequestTimeoutSecs
287 )));
288 }
289
290 Ok(())
291 }
292
293 fn name(&self) -> &str { "gRPCConfigValidator" }
294
295 fn priority(&self) -> u32 {
296 100 }
298}
299
300pub struct AuthConfigValidator;
302
303impl ConfigValidator for AuthConfigValidator {
304 fn validate(&self, config:&AirConfiguration) -> Result<()> {
305 if config.Authentication.Enabled {
306 if config.Authentication.CredentialsPath.is_empty() {
307 return Err(AirError::Configuration(
308 "Authentication credentials path cannot be empty when enabled".to_string(),
309 ));
310 }
311
312 if config.Authentication.CredentialsPath.contains("..") {
314 return Err(AirError::Configuration(
315 "Authentication credentials path contains '..' which is not allowed".to_string(),
316 ));
317 }
318 }
319
320 if config.Authentication.TokenExpirationHours < 1 || config.Authentication.TokenExpirationHours > 8760 {
322 return Err(AirError::Configuration(format!(
323 "Token expiration {} hours is out of range [1, 8760]",
324 config.Authentication.TokenExpirationHours
325 )));
326 }
327
328 if config.Authentication.MaxSessions < 1 || config.Authentication.MaxSessions > 1000 {
330 return Err(AirError::Configuration(format!(
331 "Max sessions {} is out of range [1, 1000]",
332 config.Authentication.MaxSessions
333 )));
334 }
335
336 Ok(())
337 }
338
339 fn name(&self) -> &str { "AuthConfigValidator" }
340
341 fn priority(&self) -> u32 {
342 90 }
344}
345
346pub struct UpdateConfigValidator;
348
349impl ConfigValidator for UpdateConfigValidator {
350 fn validate(&self, config:&AirConfiguration) -> Result<()> {
351 if config.Updates.Enabled {
352 if config.Updates.UpdateServerUrl.is_empty() {
353 return Err(AirError::Configuration(
354 "Update server URL cannot be empty when updates are enabled".to_string(),
355 ));
356 }
357
358 if !config.Updates.UpdateServerUrl.starts_with("https://") {
360 return Err(AirError::Configuration(format!(
361 "Update server URL must use HTTPS: {}",
362 config.Updates.UpdateServerUrl
363 )));
364 }
365
366 if !crate::Configuration::ConfigurationManager::IsValidUrl(&config.Updates.UpdateServerUrl) {
368 return Err(AirError::Configuration(format!(
369 "Invalid update server URL: {}",
370 config.Updates.UpdateServerUrl
371 )));
372 }
373 }
374
375 if config.Updates.CheckIntervalHours < 1 || config.Updates.CheckIntervalHours > 168 {
377 return Err(AirError::Configuration(format!(
378 "Update check interval {} hours is out of range [1, 168]",
379 config.Updates.CheckIntervalHours
380 )));
381 }
382
383 Ok(())
384 }
385
386 fn name(&self) -> &str { "UpdateConfigValidator" }
387
388 fn priority(&self) -> u32 {
389 50 }
391}
392
393pub struct DownloadConfigValidator;
395
396impl ConfigValidator for DownloadConfigValidator {
397 fn validate(&self, config:&AirConfiguration) -> Result<()> {
398 if config.Downloader.Enabled {
399 if config.Downloader.CacheDirectory.is_empty() {
400 return Err(AirError::Configuration(
401 "Download cache directory cannot be empty when enabled".to_string(),
402 ));
403 }
404
405 if config.Downloader.CacheDirectory.contains("..") {
407 return Err(AirError::Configuration(
408 "Download cache directory contains '..' which is not allowed".to_string(),
409 ));
410 }
411
412 if config.Downloader.MaxConcurrentDownloads < 1 || config.Downloader.MaxConcurrentDownloads > 50 {
414 return Err(AirError::Configuration(format!(
415 "Max concurrent downloads {} is out of range [1, 50]",
416 config.Downloader.MaxConcurrentDownloads
417 )));
418 }
419
420 if config.Downloader.DownloadTimeoutSecs < 10 || config.Downloader.DownloadTimeoutSecs > 3600 {
422 return Err(AirError::Configuration(format!(
423 "Download timeout {} seconds is out of range [10, 3600]",
424 config.Downloader.DownloadTimeoutSecs
425 )));
426 }
427
428 if config.Downloader.MaxRetries > 10 {
430 return Err(AirError::Configuration(format!(
431 "Max retries {} exceeds maximum (10)",
432 config.Downloader.MaxRetries
433 )));
434 }
435 }
436
437 Ok(())
438 }
439
440 fn name(&self) -> &str { "DownloadConfigValidator" }
441
442 fn priority(&self) -> u32 {
443 50 }
445}
446
447pub struct IndexingConfigValidator;
449
450impl ConfigValidator for IndexingConfigValidator {
451 fn validate(&self, config:&AirConfiguration) -> Result<()> {
452 if config.Indexing.Enabled {
453 if config.Indexing.IndexDirectory.is_empty() {
454 return Err(AirError::Configuration(
455 "Index directory cannot be empty when indexing is enabled".to_string(),
456 ));
457 }
458
459 if config.Indexing.IndexDirectory.contains("..") {
461 return Err(AirError::Configuration(
462 "Index directory contains '..' which is not allowed".to_string(),
463 ));
464 }
465
466 if config.Indexing.FileTypes.is_empty() {
468 return Err(AirError::Configuration(
469 "File types to index cannot be empty when indexing is enabled".to_string(),
470 ));
471 }
472
473 if config.Indexing.MaxFileSizeMb < 1 || config.Indexing.MaxFileSizeMb > 1024 {
475 return Err(AirError::Configuration(format!(
476 "Max file size {} MB is out of range [1, 1024]",
477 config.Indexing.MaxFileSizeMb
478 )));
479 }
480
481 if config.Indexing.UpdateIntervalMinutes < 1 || config.Indexing.UpdateIntervalMinutes > 1440 {
483 return Err(AirError::Configuration(format!(
484 "Index update interval {} minutes is out of range [1, 1440]",
485 config.Indexing.UpdateIntervalMinutes
486 )));
487 }
488 }
489
490 Ok(())
491 }
492
493 fn name(&self) -> &str { "IndexingConfigValidator" }
494
495 fn priority(&self) -> u32 {
496 40 }
498}
499
500pub struct LoggingConfigValidator;
502
503impl ConfigValidator for LoggingConfigValidator {
504 fn validate(&self, config:&AirConfiguration) -> Result<()> {
505 let valid_levels = ["trace", "debug", "info", "warn", "error"];
506
507 if !valid_levels.contains(&config.Logging.Level.as_str()) {
508 return Err(AirError::Configuration(format!(
509 "Invalid log level '{}': must be one of: {}",
510 config.Logging.Level,
511 valid_levels.join(", ")
512 )));
513 }
514
515 if config.Logging.MaxFileSizeMb < 1 || config.Logging.MaxFileSizeMb > 1000 {
517 return Err(AirError::Configuration(format!(
518 "Max log file size {} MB is out of range [1, 1000]",
519 config.Logging.MaxFileSizeMb
520 )));
521 }
522
523 if config.Logging.MaxFiles < 1 || config.Logging.MaxFiles > 50 {
525 return Err(AirError::Configuration(format!(
526 "Max log files {} is out of range [1, 50]",
527 config.Logging.MaxFiles
528 )));
529 }
530
531 Ok(())
532 }
533
534 fn name(&self) -> &str { "LoggingConfigValidator" }
535
536 fn priority(&self) -> u32 {
537 30 }
539}
540
541pub struct PerformanceConfigValidator;
543
544impl ConfigValidator for PerformanceConfigValidator {
545 fn validate(&self, config:&AirConfiguration) -> Result<()> {
546 if config.Performance.MemoryLimitMb < 64 || config.Performance.MemoryLimitMb > 16384 {
548 return Err(AirError::Configuration(format!(
549 "Memory limit {} MB is out of range [64, 16384]",
550 config.Performance.MemoryLimitMb
551 )));
552 }
553
554 if config.Performance.CPULimitPercent < 10 || config.Performance.CPULimitPercent > 100 {
556 return Err(AirError::Configuration(format!(
557 "CPU limit {}% is out of range [10, 100]",
558 config.Performance.CPULimitPercent
559 )));
560 }
561
562 if config.Performance.DiskLimitMb < 100 || config.Performance.DiskLimitMb > 102400 {
564 return Err(AirError::Configuration(format!(
565 "Disk limit {} MB is out of range [100, 102400]",
566 config.Performance.DiskLimitMb
567 )));
568 }
569
570 if config.Performance.BackgroundTaskIntervalSecs < 1 || config.Performance.BackgroundTaskIntervalSecs > 3600 {
572 return Err(AirError::Configuration(format!(
573 "Background task interval {} seconds is out of range [1, 3600]",
574 config.Performance.BackgroundTaskIntervalSecs
575 )));
576 }
577
578 Ok(())
579 }
580
581 fn name(&self) -> &str { "PerformanceConfigValidator" }
582
583 fn priority(&self) -> u32 {
584 20 }
586}
587
588impl ConfigHotReload {
593 pub async fn New(config_path:PathBuf, initial_config:AirConfiguration) -> Result<Self> {
604 let (change_sender, _) = broadcast::channel(100);
605
606 let (reload_tx, reload_rx) = mpsc::channel(100);
607
608 let manager = Self {
609 active_config:Arc::new(RwLock::new(initial_config.clone())),
610
611 previous_config:Arc::new(RwLock::new(None)),
612
613 last_config_hash:Arc::new(RwLock::new(None)),
614
615 config_path,
616
617 watcher:None,
618
619 change_sender,
620
621 reload_tx,
622
623 change_history:Arc::new(RwLock::new(Vec::new())),
624
625 last_reload:Arc::new(RwLock::new(None)),
626
627 last_reload_duration:Arc::new(RwLock::new(None)),
628
629 enabled:Arc::new(RwLock::new(true)),
630
631 debounce_delay:Duration::from_millis(500),
632
633 last_change_time:Arc::new(RwLock::new(None)),
634
635 stats:Arc::new(RwLock::new(ReloadStats::default())),
636
637 validators:Arc::new(RwLock::new(Self::DefaultValidators())),
638
639 max_retries:3,
640
641 retry_delay:Duration::from_secs(1),
642
643 auto_rollback_enabled:Arc::new(RwLock::new(true)),
644 };
645
646 let hash = crate::Configuration::ConfigurationManager::ComputeHash(&initial_config)?;
648
649 *manager.last_config_hash.write().await = Some(hash);
650
651 manager.StartReloadProcessor(reload_rx);
653
654 Ok(manager)
655 }
656
657 fn DefaultValidators() -> Vec<Box<dyn ConfigValidator>> {
659 vec![
660 Box::new(gRPCConfigValidator),
661 Box::new(AuthConfigValidator),
662 Box::new(UpdateConfigValidator),
663 Box::new(DownloadConfigValidator),
664 Box::new(IndexingConfigValidator),
665 Box::new(LoggingConfigValidator),
666 Box::new(PerformanceConfigValidator),
667 ]
668 }
669
670 pub async fn EnableFileWatching(&mut self) -> Result<()> {
672 dev_log!("config", "[HotReload] Enabling file watching for configuration changes");
673
674 let config_path = self.config_path.clone();
675
676 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
678
679 let mut watcher = RecommendedWatcher::new(
680 move |res:NotifyResult<Event>| {
681 if let Ok(event) = res {
682 let _ = tx.blocking_send(event);
683 }
684 },
685 notify::Config::default(),
686 )
687 .map_err(|e| AirError::Configuration(format!("Failed to create file watcher: {}", e)))?;
688
689 let watch_path = if config_path.is_file() {
691 config_path.parent().unwrap_or(&config_path).to_path_buf()
692 } else {
693 config_path.clone()
694 };
695
696 watcher
697 .watch(&watch_path, RecursiveMode::NonRecursive)
698 .map_err(|e| AirError::Configuration(format!("Failed to watch path '{}': {}", watch_path.display(), e)))?;
699
700 let reload_tx = self.reload_tx.clone();
702
703 let config_path_clone = config_path.clone();
704
705 tokio::spawn(async move {
706 while let Some(event) = rx.recv().await {
707 dev_log!("config", "file event detected: {:?}", event.kind);
708
709 let should_reload = event
711 .paths
712 .iter()
713 .any(|p| p == &config_path_clone || p == config_path_clone.as_path())
714 && event.kind != EventKind::Access(notify::event::AccessKind::Any);
715
716 if should_reload {
717 let _ = reload_tx.send(ReloadRequest::FileChange).await;
718 }
719 }
720 });
721
722 self.watcher = Some(Arc::new(RwLock::new(watcher)));
723
724 *self.enabled.write().await = true;
725
726 dev_log!("config", "[HotReload] File watching enabled for: {}", config_path.display());
727
728 Ok(())
729 }
730
731 pub async fn DisableFileWatching(&mut self) -> Result<()> {
733 *self.enabled.write().await = false;
734
735 if let Some(watcher) = self.watcher.take() {
736 drop(watcher);
737 }
738
739 dev_log!("config", "[HotReload] File watching disabled");
740
741 Ok(())
742 }
743
744 fn StartReloadProcessor(&self, mut reload_rx:mpsc::Receiver<ReloadRequest>) {
746 let enabled = self.enabled.clone();
747
748 let debounce_delay = self.debounce_delay;
749
750 let last_change_time = self.last_change_time.clone();
751
752 tokio::spawn(async move {
753 while let Some(request) = reload_rx.recv().await {
754 if !*enabled.read().await {
755 continue;
756 }
757
758 let now = Instant::now();
760 {
761 let mut last_change = last_change_time.write().await;
762 if let Some(last) = *last_change {
763 if now.duration_since(last) < debounce_delay {
764 continue; }
766 }
767 *last_change = Some(now);
768 }
769
770 sleep(debounce_delay).await;
771
772 match request {
774 ReloadRequest::Manual => {
775 dev_log!("config", "[HotReload] Processing manual reload request");
776 },
777 ReloadRequest::Signal => {
778 dev_log!("config", "[HotReload] Processing signal-based reload request");
779 },
780 ReloadRequest::FileChange => {
781 dev_log!("config", "[HotReload] Processing file change reload request");
782 },
783 ReloadRequest::Periodic => {
784 dev_log!("config", "processing periodic reload check");
785 },
786 }
787 }
788 });
789 }
790
791 pub async fn Reload(&self) -> Result<()> {
793 dev_log!(
794 "config",
795 "[HotReload] Reloading configuration from: {}",
796 self.config_path.display()
797 );
798
799 if !*self.enabled.read().await {
801 return Err(AirError::Configuration("Hot-reload is disabled".to_string()));
802 }
803
804 let start_time = Instant::now();
805
806 {
808 let mut stats = self.stats.write().await;
809
810 stats.total_attempts += 1;
811 }
812
813 let mut last_error = None;
815
816 for attempt in 0..=self.max_retries {
817 match self.AttemptReload().await {
818 Ok(()) => {
819 let duration = start_time.elapsed();
820
821 *self.last_reload_duration.write().await = Some(duration);
822
823 {
825 let mut stats = self.stats.write().await;
826
827 stats.successful_reloads += 1;
828
829 stats.last_error = None;
830 }
831
832 dev_log!("config", "[HotReload] Configuration reloaded successfully in {:?}", duration);
833
834 return Ok(());
835 },
836
837 Err(e) => {
838 last_error = Some(e.clone());
839
840 if attempt < self.max_retries {
841 let delay = self.retry_delay * 2_u32.pow(attempt);
842
843 dev_log!(
844 "config",
845 "warn: [HotReload] Reload attempt {} failed, retrying in {:?}: {}",
846 attempt + 1,
847 delay,
848 e
849 );
850
851 sleep(delay).await;
852 }
853 },
854 }
855 }
856
857 {
859 let mut stats = self.stats.write().await;
860
861 stats.failed_reloads += 1;
862
863 stats.last_error = last_error.as_ref().map(|e| e.to_string());
864 }
865
866 let error = last_error.unwrap_or_else(|| AirError::Configuration("Unknown error".to_string()));
867
868 if *self.auto_rollback_enabled.read().await {
870 dev_log!("config", "[HotReload] Attempting rollback due to reload failure");
871
872 if let Err(rollback_err) = self.Rollback().await {
873 dev_log!("config", "error: [HotReload] Rollback also failed: {}", rollback_err);
874 }
875 }
876
877 Err(error)
878 }
879
880 async fn AttemptReload(&self) -> Result<()> {
882 let content = fs::read_to_string(&self.config_path).await;
884
885 if let Err(e) = content {
886 let mut stats = self.stats.write().await;
887
888 stats.parse_errors += 1;
889
890 return Err(AirError::Configuration(format!("Failed to read config file: {}", e)));
891 }
892
893 let content = content.unwrap();
894
895 let new_config:std::result::Result<AirConfiguration, toml::de::Error> = toml::from_str(&content);
896
897 if let Err(e) = new_config {
898 let mut stats = self.stats.write().await;
899
900 stats.parse_errors += 1;
901
902 return Err(AirError::Configuration(format!("Failed to parse config file: {}", e)));
903 }
904
905 let new_config = new_config.unwrap();
906
907 self.ValidateConfig(&new_config).await?;
909
910 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&new_config)?;
912
913 let current_hash = self.last_config_hash.read().await.clone();
914
915 if let Some(ref hash) = current_hash {
916 if hash == &new_hash {
917 dev_log!("config", "[HotReload] Configuration unchanged, skipping reload");
918
919 return Ok(());
920 }
921 }
922
923 let old_config = self.active_config.read().await.clone();
925
926 let old_hash = current_hash;
927
928 *self.active_config.write().await = new_config.clone();
929 *self.previous_config.write().await = Some(old_config.clone());
930 *self.last_config_hash.write().await = Some(new_hash.clone());
931 *self.last_reload.write().await = Some(Utc::now());
932
933 let changes = self.ComputeChanges(&old_config, &new_config);
935
936 let record = ConfigChangeRecord {
937 timestamp:Utc::now(),
938
939 changes:changes.clone(),
940
941 validated:true,
942
943 reason:"Reload".to_string(),
944
945 rollback_performed:false,
946 };
947
948 let mut history = self.change_history.write().await;
949
950 history.push(record);
951
952 let history_len = history.len();
954
955 if history_len > 1000 {
956 history.drain(0..history_len - 1000);
957 }
958
959 drop(history);
960
961 let event = ConfigChangeEvent {
963 timestamp:Utc::now(),
964
965 old_config_hash:old_hash,
966
967 new_config_hash:new_hash,
968
969 changes,
970
971 success:true,
972 };
973
974 let _ = self.change_sender.send(event);
975
976 Ok(())
977 }
978
979 pub async fn ReloadAndValidate(&self) -> Result<()> { self.Reload().await }
981
982 pub async fn TriggerReload(&self) -> Result<()> {
984 self.reload_tx
985 .send(ReloadRequest::Manual)
986 .await
987 .map_err(|e| AirError::Configuration(format!("Failed to trigger reload: {}", e)))?;
988
989 Ok(())
990 }
991
992 async fn ValidateConfig(&self, config:&AirConfiguration) -> Result<()> {
994 let validators = self.validators.read().await;
995
996 let mut sorted_validators:Vec<_> = validators.iter().collect();
998
999 sorted_validators.sort_by(|a, b| b.priority().cmp(&a.priority()));
1000
1001 for validator in sorted_validators {
1002 let result = validator.validate(config);
1003
1004 if let Err(e) = result {
1005 let mut stats = self.stats.write().await;
1006
1007 stats.validation_errors += 1;
1008
1009 stats.last_error = Some(format!("{}: {}", validator.name(), e));
1010
1011 dev_log!("config", "error: [HotReload] Validation failed ({}): {}", validator.name(), e);
1012
1013 return Err(AirError::Configuration(format!("{}: {}", validator.name(), e)));
1014 }
1015
1016 dev_log!("config", "validator '{}' passed", validator.name());
1017 }
1018
1019 dev_log!(
1020 "config",
1021 "[HotReload] Configuration validation passed ({} validators)",
1022 validators.len()
1023 );
1024
1025 Ok(())
1026 }
1027
1028 pub async fn RegisterValidator(&self, validator:Box<dyn ConfigValidator>) {
1030 let mut validators = self.validators.write().await;
1031
1032 validators.push(validator);
1033
1034 dev_log!("config", "[HotReload] Registered validator (total: {})", validators.len());
1035 }
1036
1037 pub async fn Rollback(&self) -> Result<()> {
1039 let previous = {
1040 let prev = self.previous_config.read().await.clone();
1041
1042 prev.ok_or_else(|| AirError::Configuration("No previous configuration to rollback to".to_string()))?
1043 };
1044
1045 self.ValidateConfig(&previous).await?;
1047
1048 let _old_config = self.active_config.read().await.clone();
1050
1051 let old_hash = self.last_config_hash.read().await.clone();
1052
1053 *self.active_config.write().await = previous.clone();
1054 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&previous)?;
1055
1056 *self.last_config_hash.write().await = Some(new_hash.clone());
1057
1058 let record = ConfigChangeRecord {
1060 timestamp:Utc::now(),
1061
1062 changes:vec![],
1063
1064 validated:true,
1065
1066 reason:"Rollback".to_string(),
1067
1068 rollback_performed:true,
1069 };
1070
1071 {
1072 let mut stats = self.stats.write().await;
1073
1074 stats.rollback_attempts += 1;
1075 }
1076
1077 self.change_history.write().await.push(record);
1078
1079 let event = ConfigChangeEvent {
1081 timestamp:Utc::now(),
1082
1083 old_config_hash:old_hash,
1084
1085 new_config_hash:new_hash,
1086
1087 changes:vec![],
1088
1089 success:true,
1090 };
1091
1092 let _ = self.change_sender.send(event);
1093
1094 dev_log!("config", "[HotReload] Configuration rolled back successfully");
1095
1096 Ok(())
1097 }
1098
1099 pub async fn GetConfig(&self) -> AirConfiguration { self.active_config.read().await.clone() }
1101
1102 pub async fn GetConfigRef(&self) -> tokio::sync::RwLockReadGuard<'_, AirConfiguration> {
1104 self.active_config.read().await
1105 }
1106
1107 pub async fn SetValue(&self, path:&str, value:&str) -> Result<()> {
1109 let mut config = self.GetConfig().await;
1110
1111 Self::SetConfigValue(&mut config, path, value)?;
1113
1114 self.ValidateConfig(&config).await?;
1116
1117 let content = toml::to_string_pretty(&config)
1119 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
1120
1121 fs::write(&self.config_path, content)
1122 .await
1123 .map_err(|e| AirError::Configuration(format!("Failed to write config: {}", e)))?;
1124
1125 self.Reload().await?;
1127
1128 dev_log!("config", "[HotReload] Configuration value updated: {} = {}", path, value);
1129
1130 Ok(())
1131 }
1132
1133 pub async fn GetValue(&self, path:&str) -> Result<serde_json::Value> {
1135 let config = self.active_config.read().await;
1136
1137 let config_json = serde_json::to_value(&*config)
1138 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
1139
1140 let mut current = config_json;
1141
1142 for key in path.split('.') {
1143 current = current
1144 .get(key)
1145 .ok_or_else(|| AirError::Configuration(format!("Key not found: {}", path)))?
1146 .clone();
1147 }
1148
1149 Ok(current)
1150 }
1151
1152 fn SetConfigValue(config:&mut AirConfiguration, path:&str, value:&str) -> Result<()> {
1154 let parts:Vec<&str> = path.split('.').collect();
1155
1156 match parts.as_slice() {
1157 ["grpc", "bind_address"] => config.gRPC.BindAddress = value.to_string(),
1158
1159 ["grpc", "max_connections"] => {
1160 config.gRPC.MaxConnections = value
1161 .parse()
1162 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1163 },
1164
1165 ["grpc", "request_timeout_secs"] => {
1166 config.gRPC.RequestTimeoutSecs = value
1167 .parse()
1168 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1169 },
1170
1171 ["authentication", "enabled"] => {
1172 config.Authentication.Enabled = value
1173 .parse()
1174 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1175 },
1176
1177 ["authentication", "credentials_path"] => {
1178 config.Authentication.CredentialsPath = value.to_string();
1179 },
1180
1181 ["updates", "enabled"] => {
1182 config.Updates.Enabled = value
1183 .parse()
1184 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1185 },
1186
1187 ["updates", "auto_download"] => {
1188 config.Updates.AutoDownload = value
1189 .parse()
1190 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1191 },
1192
1193 ["updates", "auto_install"] => {
1194 config.Updates.AutoInstall = value
1195 .parse()
1196 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1197 },
1198
1199 ["downloader", "enabled"] => {
1200 config.Downloader.Enabled = value
1201 .parse()
1202 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1203 },
1204
1205 ["indexing", "enabled"] => {
1206 config.Indexing.Enabled = value
1207 .parse()
1208 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1209 },
1210
1211 ["logging", "level"] => {
1212 config.Logging.Level = value.to_lowercase();
1213 },
1214
1215 ["logging", "console_enabled"] => {
1216 config.Logging.ConsoleEnabled = value
1217 .parse()
1218 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1219 },
1220
1221 _ => {
1222 return Err(AirError::Configuration(format!("Unknown configuration path: {}", path)));
1223 },
1224 }
1225
1226 Ok(())
1227 }
1228
1229 fn ComputeChanges(&self, old:&AirConfiguration, new:&AirConfiguration) -> Vec<ConfigChange> {
1231 let mut changes = Vec::new();
1232
1233 let old_json = serde_json::to_value(old).unwrap_or_default();
1234
1235 let new_json = serde_json::to_value(new).unwrap_or_default();
1236
1237 Self::DiffJson("", &old_json, &new_json, &mut changes);
1238
1239 changes
1240 }
1241
1242 fn DiffJson(prefix:&str, old:&serde_json::Value, new:&serde_json::Value, changes:&mut Vec<ConfigChange>) {
1244 match (old, new) {
1245 (serde_json::Value::Object(old_map), serde_json::Value::Object(new_map)) => {
1246 for (key, new_val) in new_map {
1247 let new_prefix = if prefix.is_empty() { key.clone() } else { format!("{}.{}", prefix, key) };
1248
1249 if let Some(old_val) = old_map.get(key) {
1250 Self::DiffJson(&new_prefix, old_val, new_val, changes);
1251 } else {
1252 changes.push(ConfigChange {
1253 path:new_prefix,
1254 old_value:serde_json::Value::Null,
1255 new_value:new_val.clone(),
1256 });
1257 }
1258 }
1259 },
1260
1261 (old_val, new_val) if old_val != new_val => {
1262 changes.push(ConfigChange {
1263 path:prefix.to_string(),
1264 old_value:old_val.clone(),
1265 new_value:new_val.clone(),
1266 });
1267 },
1268
1269 _ => {},
1270 }
1271 }
1272
1273 pub async fn GetChangeHistory(&self, limit:Option<usize>) -> Vec<ConfigChangeRecord> {
1275 let history = self.change_history.read().await;
1276
1277 if let Some(limit) = limit {
1278 history.iter().rev().take(limit).cloned().collect()
1279 } else {
1280 history.iter().rev().cloned().collect()
1281 }
1282 }
1283
1284 pub async fn GetLastReload(&self) -> Option<DateTime<Utc>> { *self.last_reload.read().await }
1286
1287 pub async fn GetLastReloadDuration(&self) -> Option<Duration> { *self.last_reload_duration.read().await }
1289
1290 pub async fn GetStats(&self) -> ReloadStats { self.stats.read().await.clone() }
1292
1293 pub async fn IsEnabled(&self) -> bool { *self.enabled.read().await }
1295
1296 pub async fn SetAutoRollback(&self, enabled:bool) {
1298 *self.auto_rollback_enabled.write().await = enabled;
1299 dev_log!(
1300 "config",
1301 "[HotReload] Auto-rollback {}",
1302 if enabled { "enabled" } else { "disabled" }
1303 );
1304 }
1305
1306 pub fn SubscribeChanges(&self) -> broadcast::Receiver<ConfigChangeEvent> { self.change_sender.subscribe() }
1310
1311 pub fn GetConfigPath(&self) -> &Path { &self.config_path }
1313
1314 pub async fn SetDebounceDelay(&self, delay:Duration) {
1316 dev_log!("config", "[HotReload] Debounce delay set to {:?}", delay);
1320 }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325
1326 use tempfile::NamedTempFile;
1327
1328 use super::*;
1329
1330 #[tokio::test]
1331 async fn test_config_hot_reload_creation() {
1332 let config = AirConfiguration::default();
1333
1334 let temp_file = NamedTempFile::new().unwrap();
1335
1336 let path = temp_file.path().to_path_buf();
1337
1338 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1339
1340 assert_eq!(manager.GetLastReload().await, None);
1341
1342 assert!(
1343 !manager.GetChangeHistory(Some(10)).await.is_empty() || manager.GetChangeHistory(Some(10)).await.is_empty()
1344 );
1345 }
1346
1347 #[tokio::test]
1348 async fn test_get_set_value() {
1349 let config = AirConfiguration::default();
1350
1351 let temp_file = NamedTempFile::new().unwrap();
1352
1353 let path = temp_file.path().to_path_buf();
1354
1355 let content = toml::to_string_pretty(&config).unwrap();
1357
1358 fs::write(&path, content).await.unwrap();
1359
1360 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1361
1362 let value = manager.GetValue("grpc.bind_address").await.unwrap();
1364
1365 assert_eq!(value, "[::1]:50053");
1366 }
1367
1368 #[tokio::test]
1369 async fn test_validator_priority() {
1370 let grpc = gRPCConfigValidator;
1371
1372 let auth = AuthConfigValidator;
1373
1374 let perf = PerformanceConfigValidator;
1375
1376 assert!(grpc.priority() > auth.priority());
1377
1378 assert!(auth.priority() > perf.priority());
1379 }
1380
1381 #[tokio::test]
1382 async fn test_compute_changes() {
1383 let config = AirConfiguration::default();
1384
1385 let manager = ConfigHotReload::New(PathBuf::from("/tmp/test.toml"), config)
1386 .await
1387 .expect("Failed to create manager");
1388
1389 let mut new_config = AirConfiguration::default();
1390
1391 new_config.gRPC.BindAddress = "[::1]:50054".to_string();
1392
1393 let changes = manager.ComputeChanges(&AirConfiguration::default(), &new_config);
1394
1395 assert!(!changes.is_empty());
1396
1397 assert!(changes.iter().any(|c| c.path == "grpc.bind_address"));
1398 }
1399}