1use std::{collections::HashMap, sync::Arc};
93
94use serde::{Deserialize, Serialize};
95use tokio::sync::{Mutex, RwLock};
96use systemstat::{Platform, System};
97
98use crate::{AirError, Configuration::AirConfiguration, Result, Utility, dev_log};
99
100#[derive(Debug)]
102pub struct ApplicationState {
103 pub Configuration:Arc<AirConfiguration>,
105
106 pub ServiceStatus:Arc<RwLock<HashMap<String, ServiceStatus>>>,
108
109 pub ActiveRequest:Arc<Mutex<HashMap<String, RequestStatus>>>,
111
112 pub Metrics:Arc<RwLock<PerformanceMetrics>>,
114
115 pub Resources:Arc<RwLock<ResourceUsage>>,
117
118 pub Connection:Arc<RwLock<HashMap<String, ConnectionInfo>>>,
120
121 pub BackgroundTask:Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum ServiceStatus {
128 Starting,
129
130 Running,
131
132 Stopping,
133
134 Stopped,
135
136 Error(String),
137}
138
139#[derive(Debug, Clone)]
141pub struct RequestStatus {
142 pub RequestId:String,
143
144 pub Service:String,
145
146 pub StartedAt:u64,
147
148 pub Status:RequestState,
149
150 pub Progress:Option<f32>,
151}
152
153#[derive(Debug, Clone)]
155pub enum RequestState {
156 Pending,
157
158 InProgress,
159
160 Completed,
161
162 Failed(String),
163
164 Cancelled,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct PerformanceMetrics {
170 pub TotalRequest:u64,
171
172 pub SuccessfulRequest:u64,
173
174 pub FailedRequest:u64,
175
176 pub AverageResponseTime:f64,
177
178 pub UptimeSeconds:u64,
179
180 pub LastUpdated:u64,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ResourceUsage {
186 pub MemoryUsageMb:f64,
187
188 pub CPUUsagePercent:f64,
189
190 pub DiskUsageMb:f64,
191
192 pub NetworkUsageMbps:f64,
193
194 pub LastUpdated:u64,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct ConnectionInfo {
200 pub ConnectionId:String,
201
202 pub ClientId:String,
203
204 pub ClientVersion:String,
205
206 pub ProtocolVersion:u32,
207
208 pub LastHeartbeat:u64,
209
210 pub IsActive:bool,
211
212 pub ConnectionType:ConnectionType,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
217pub enum ConnectionType {
218 MountainMain,
219
220 MountainWorker,
221
222 Cocoon,
223
224 Wind,
225
226 External,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct ConnectionHealthReport {
232 pub TotalConnection:usize,
233
234 pub HealthyConnection:usize,
235
236 pub StaleConnection:usize,
237
238 pub ConnectionByType:HashMap<String, usize>,
239
240 pub LastChecked:u64,
241}
242
243impl ApplicationState {
244 pub async fn New(Configuration:Arc<AirConfiguration>) -> Result<Self> {
246 let State = Self {
247 Configuration,
248
249 ServiceStatus:Arc::new(RwLock::new(HashMap::new())),
250
251 ActiveRequest:Arc::new(Mutex::new(HashMap::new())),
252
253 Metrics:Arc::new(RwLock::new(PerformanceMetrics {
254 TotalRequest:0,
255 SuccessfulRequest:0,
256 FailedRequest:0,
257 AverageResponseTime:0.0,
258 UptimeSeconds:0,
259 LastUpdated:Utility::CurrentTimestamp(),
260 })),
261
262 Resources:Arc::new(RwLock::new(ResourceUsage {
263 MemoryUsageMb:0.0,
264 CPUUsagePercent:0.0,
265 DiskUsageMb:0.0,
266 NetworkUsageMbps:0.0,
267 LastUpdated:Utility::CurrentTimestamp(),
268 })),
269
270 Connection:Arc::new(RwLock::new(HashMap::new())),
271
272 BackgroundTask:Arc::new(Mutex::new(Vec::new())),
273 };
274
275 State.InitializeServiceStatus().await?;
277
278 Ok(State)
279 }
280
281 async fn InitializeServiceStatus(&self) -> Result<()> {
283 let mut Status = self.ServiceStatus.write().await;
284
285 Status.insert("authentication".to_string(), ServiceStatus::Starting);
286
287 Status.insert("updates".to_string(), ServiceStatus::Starting);
288
289 Status.insert("downloader".to_string(), ServiceStatus::Starting);
290
291 Status.insert("indexing".to_string(), ServiceStatus::Starting);
292
293 Status.insert("grpc".to_string(), ServiceStatus::Starting);
294
295 Status.insert("connections".to_string(), ServiceStatus::Starting);
296
297 Ok(())
298 }
299
300 pub async fn RegisterConnection(
303 &self,
304
305 ConnectionId:String,
306
307 ClientId:String,
308
309 ClientVersion:String,
310
311 ProtocolVersion:u32,
312
313 ConnectionType:ConnectionType,
314 ) -> Result<()> {
315 if ConnectionId.is_empty() {
317 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
318 }
319
320 if ClientId.is_empty() {
322 return Err(AirError::Configuration("Client ID cannot be empty".to_string()));
323 }
324
325 if ProtocolVersion == 0 {
327 return Err(AirError::Configuration("Protocol version must be greater than 0".to_string()));
328 }
329
330 let mut Connection = self.Connection.write().await;
331
332 if Connection.contains_key(&ConnectionId) {
334 return Err(AirError::Configuration(format!("Connection {} already exists", ConnectionId)));
335 }
336
337 if matches!(ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) {
339 let ClientConnCount = Connection
341 .values()
342 .filter(|c| {
343 c.ClientId == ClientId
344 && matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker)
345 })
346 .count();
347
348 const MAX_CONN_PER_CLIENT:usize = 10;
349
350 if ClientConnCount >= MAX_CONN_PER_CLIENT {
351 return Err(AirError::ResourceLimit(format!(
352 "Client {} exceeds maximum connection limit ({})",
353 ClientId, MAX_CONN_PER_CLIENT
354 )));
355 }
356 }
357
358 Connection.insert(
359 ConnectionId.clone(),
360 ConnectionInfo {
361 ConnectionId:ConnectionId.clone(),
362 ClientId:ClientId.clone(),
363 ClientVersion,
364 ProtocolVersion,
365 LastHeartbeat:Utility::CurrentTimestamp(),
366 IsActive:true,
367 ConnectionType:ConnectionType.clone(),
368 },
369 );
370
371 dev_log!(
372 "lifecycle",
373 "Connection registered: {} - {} ({:?})",
374 ConnectionId,
375 ClientId,
376 ConnectionType
377 );
378
379 Ok(())
380 }
381
382 pub async fn UpdateHeartbeat(&self, ConnectionId:&str) -> Result<()> {
385 if ConnectionId.is_empty() {
386 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
387 }
388
389 let mut Connection = self.Connection.write().await;
390
391 if let Some(Connection) = Connection.get_mut(ConnectionId) {
392 let CurrentTime = Utility::CurrentTimestamp();
393
394 const MAX_HEARTBEAT_INTERVAL:u64 = 120000; if CurrentTime - Connection.LastHeartbeat > MAX_HEARTBEAT_INTERVAL {
398 dev_log!(
399 "lifecycle",
400 "warn: Long heartbeat interval for connection {}: {}ms",
401 ConnectionId,
402 CurrentTime - Connection.LastHeartbeat
403 );
404 }
405
406 Connection.LastHeartbeat = CurrentTime;
407
408 Connection.IsActive = true;
409
410 dev_log!(
411 "lifecycle",
412 "Heartbeat updated for connection: {} (client: {})",
413 ConnectionId,
414 Connection.ClientId
415 );
416 } else {
417 return Err(AirError::Internal(format!("Connection {} not found", ConnectionId)));
418 }
419
420 Ok(())
421 }
422
423 pub async fn RemoveConnection(&self, ConnectionId:&str) -> Result<()> {
426 if ConnectionId.is_empty() {
427 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
428 }
429
430 let mut Connection = self.Connection.write().await;
431
432 if let Some(Connection) = Connection.remove(ConnectionId) {
433 dev_log!(
434 "lifecycle",
435 "Connection removed: {} (client: {}, type: {:?})",
436 ConnectionId,
437 Connection.ClientId,
438 Connection.ConnectionType
439 );
440
441 drop(Connection); } else {
448 dev_log!(
449 "lifecycle",
450 "warn: Attempted to remove non-existent connection: {}",
451 ConnectionId
452 );
453 }
454
455 Ok(())
456 }
457
458 pub async fn GetActiveConnectionCount(&self) -> usize {
460 let Connection = self.Connection.read().await;
461
462 Connection.values().filter(|c| c.IsActive).count()
463 }
464
465 pub async fn GetConnectionCountByType(&self, ConnectionType:ConnectionType) -> usize {
467 let Connection = self.Connection.read().await;
468
469 Connection
470 .values()
471 .filter(|c| c.ConnectionType == ConnectionType && c.IsActive)
472 .count()
473 }
474
475 pub async fn GetConnectionsByType(&self, ConnectionType:ConnectionType) -> Vec<ConnectionInfo> {
477 let Connection = self.Connection.read().await;
478
479 Connection
480 .values()
481 .filter(|c| c.ConnectionType == ConnectionType)
482 .cloned()
483 .collect()
484 }
485
486 pub async fn GetNextMountainConnection(&self) -> Result<ConnectionInfo> {
489 let Connection = self.Connection.read().await;
490
491 let MountainConnection:Vec<_> = Connection
492 .values()
493 .filter(|c| {
494 matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) && c.IsActive
495 })
496 .collect();
497
498 if MountainConnection.is_empty() {
499 return Err(AirError::ServiceUnavailable(
500 "No active Mountain connections available".to_string(),
501 ));
502 }
503
504 let Selected = MountainConnection[0].clone();
510
511 Ok(Selected)
512 }
513
514 pub async fn CleanupStaleConnections(&self, TimeoutSeconds:u64) -> Result<usize> {
518 let mut Connection = self.Connection.write().await;
519
520 let CurrentTime = Utility::CurrentTimestamp();
521
522 let TimeoutMs = TimeoutSeconds * 1000;
523
524 let mut RemovedCount = 0;
525
526 let mut RemovedByType:HashMap<String, usize> = HashMap::new();
527
528 Connection.retain(|Id, Connection| {
529 if CurrentTime - Connection.LastHeartbeat > TimeoutMs {
530 dev_log!(
531 "lifecycle",
532 "warn: Removing stale connection: {} - {} ({:?}) - idle: {}ms",
533 Id,
534 Connection.ClientId,
535 Connection.ConnectionType,
536 CurrentTime - Connection.LastHeartbeat
537 );
538
539 *RemovedByType.entry(format!("{:?}", Connection.ConnectionType)).or_insert(0) += 1;
540
541 RemovedCount += 1;
542 false
543 } else {
544 true
545 }
546 });
547
548 if RemovedCount > 0 {
549 dev_log!("lifecycle", "Cleaned up {} stale connections", RemovedCount);
550
551 for (ConnType, Count) in RemovedByType {
552 dev_log!("lifecycle", " - {} connections: {}", ConnType, Count);
553 }
554 }
555
556 Ok(RemovedCount)
557 }
558
559 pub async fn RegisterBackgroundTask(&self, TaskItem:tokio::task::JoinHandle<()>) -> Result<()> {
561 let mut BackgroundTask = self.BackgroundTask.lock().await;
562
563 BackgroundTask.push(TaskItem);
564
565 dev_log!("lifecycle", "Background task registered. Total tasks: {}", BackgroundTask.len());
566
567 Ok(())
568 }
569
570 pub async fn StopAllBackgroundTasks(&self) -> Result<()> {
572 let mut BackgroundTask = self.BackgroundTask.lock().await;
573
574 let TaskCount = BackgroundTask.len();
575
576 dev_log!("lifecycle", "Stopping {} background tasks", TaskCount);
577
578 for TaskItem in BackgroundTask.drain(..) {
580 TaskItem.abort();
581 }
582
583 dev_log!("lifecycle", "Stopped all {} background tasks", TaskCount);
584
585 Ok(())
586 }
587
588 pub async fn UpdateServiceStatus(&self, Service:&str, Status:ServiceStatus) -> Result<()> {
590 if Service.is_empty() {
591 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
592 }
593
594 let mut ServiceStatus = self.ServiceStatus.write().await;
595
596 let StatusClone = Status.clone();
597
598 ServiceStatus.insert(Service.to_string(), Status);
599
600 dev_log!("lifecycle", "Service status updated: {} -> {:?}", Service, StatusClone);
601
602 Ok(())
603 }
604
605 pub async fn GetServiceStatus(&self, Service:&str) -> Option<ServiceStatus> {
607 let ServiceStatus = self.ServiceStatus.read().await;
608
609 ServiceStatus.get(Service).cloned()
610 }
611
612 pub async fn GetAllServiceStatuses(&self) -> HashMap<String, ServiceStatus> {
614 let ServiceStatus = self.ServiceStatus.read().await;
615
616 ServiceStatus.clone()
617 }
618
619 pub async fn RegisterRequest(&self, RequestId:String, Service:String) -> Result<()> {
621 if RequestId.is_empty() {
622 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
623 }
624
625 if Service.is_empty() {
626 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
627 }
628
629 let mut Request = self.ActiveRequest.lock().await;
630
631 if Request.contains_key(&RequestId) {
633 return Err(AirError::Configuration(format!("Request {} already exists", RequestId)));
634 }
635
636 Request.insert(
637 RequestId.clone(),
638 RequestStatus {
639 RequestId:RequestId.clone(),
640 Service,
641 StartedAt:Utility::CurrentTimestamp(),
642 Status:RequestState::Pending,
643 Progress:None,
644 },
645 );
646
647 dev_log!("lifecycle", "Request registered: {}", RequestId);
648
649 Ok(())
650 }
651
652 pub async fn UpdateRequestStatus(&self, RequestId:&str, Status:RequestState, Progress:Option<f32>) -> Result<()> {
654 if RequestId.is_empty() {
655 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
656 }
657
658 if let Some(p) = Progress {
660 if !(0.0..=1.0).contains(&p) {
661 return Err(AirError::Configuration("Progress must be between 0.0 and 1.0".to_string()));
662 }
663 }
664
665 let mut Request = self.ActiveRequest.lock().await;
666
667 if let Some(Request) = Request.get_mut(RequestId) {
668 Request.Status = Status;
669
670 Request.Progress = Progress;
671 } else {
672 return Err(AirError::Internal(format!("Request {} not found", RequestId)));
673 }
674
675 Ok(())
676 }
677
678 pub async fn RemoveRequest(&self, RequestId:&str) -> Result<()> {
680 if RequestId.is_empty() {
681 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
682 }
683
684 let mut request = self.ActiveRequest.lock().await;
685
686 if request.remove(RequestId).is_some() {
687 dev_log!("lifecycle", "Request removed: {}", RequestId);
688 }
689
690 Ok(())
691 }
692
693 pub async fn UpdateMetrics(&self, Success:bool, ResponseTime:u64) -> Result<()> {
695 let mut Metrics = self.Metrics.write().await;
696
697 Metrics.TotalRequest += 1;
698
699 if Success {
700 Metrics.SuccessfulRequest += 1;
701 } else {
702 Metrics.FailedRequest += 1;
703 }
704
705 let Alpha = 0.1; Metrics.AverageResponseTime = Alpha * (ResponseTime as f64) + (1.0 - Alpha) * Metrics.AverageResponseTime;
708
709 Metrics.LastUpdated = Utility::CurrentTimestamp();
710
711 Ok(())
712 }
713
714 pub async fn UpdateResourceUsage(&self) -> Result<()> {
716 let Sys = System::new();
717
718 let MemoryUsage = if let Ok(Memory) = Sys.memory() {
720 (Memory.total.as_u64() - Memory.free.as_u64()) as f64 / 1024.0 / 1024.0
721 } else {
722 dev_log!("lifecycle", "warn: Failed to get memory usage");
723
724 0.0
725 };
726
727 let CPUUsage = if let Ok(CPU) = Sys.cpu_load_aggregate() {
729 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
730
731 if let Ok(CPU) = CPU.done() {
732 (CPU.user + CPU.nice + CPU.system) as f64 * 100.0
733 } else {
734 dev_log!("lifecycle", "warn: Failed to get CPU usage after sampling");
735
736 0.0
737 }
738 } else {
739 dev_log!("lifecycle", "warn: Failed to start CPU load sampling");
740
741 0.0
742 };
743
744 let mut Resources = self.Resources.write().await;
746
747 Resources.MemoryUsageMb = MemoryUsage;
748
749 Resources.CPUUsagePercent = CPUUsage;
750
751 Resources.LastUpdated = Utility::CurrentTimestamp();
752
753 Ok(())
754 }
755
756 pub async fn GetMetrics(&self) -> PerformanceMetrics {
758 let metrics = self.Metrics.read().await;
759
760 metrics.clone()
761 }
762
763 pub async fn GetResourceUsage(&self) -> ResourceUsage {
765 let Resources = self.Resources.read().await;
766
767 Resources.clone()
768 }
769
770 pub async fn GetActiveRequestCount(&self) -> usize {
772 let Request = self.ActiveRequest.lock().await;
773
774 Request.len()
775 }
776
777 pub async fn IsRequestCancelled(&self, RequestId:&str) -> bool {
779 let Request = self.ActiveRequest.lock().await;
780
781 if let Some(Request) = Request.get(RequestId) {
782 matches!(Request.Status, RequestState::Cancelled)
783 } else {
784 false
785 }
786 }
787
788 pub async fn GetConfiguration(&self) -> Arc<AirConfiguration> { self.Configuration.clone() }
790
791 pub async fn UpdateConfiguration(
793 &self,
794
795 Section:String,
796
797 Updates:std::collections::HashMap<String, String>,
798 ) -> Result<()> {
799 dev_log!("lifecycle", "[ApplicationState] Updating configuration section: {}", Section);
800
801 if Section.is_empty() {
803 return Err(AirError::Configuration("Configuration section cannot be empty".to_string()));
804 }
805
806 if Updates.is_empty() {
808 return Err(AirError::Configuration("Configuration updates cannot be empty".to_string()));
809 }
810
811 match Section.as_str() {
819 "grpc" => {
820 dev_log!("lifecycle", "Updating gRPC configuration: {:?}", Updates);
821 },
822
823 "updates" => {
824 dev_log!("lifecycle", "Updating updates configuration: {:?}", Updates);
825 },
826
827 "downloader" => {
828 dev_log!("lifecycle", "Updating downloader configuration: {:?}", Updates);
829 },
830
831 "indexing" => {
832 dev_log!("lifecycle", "Updating indexing configuration: {:?}", Updates);
833 },
834
835 "daemon" => {
836 dev_log!("lifecycle", "Updating daemon configuration: {:?}", Updates);
837 },
838
839 _ => {
840 return Err(AirError::Configuration(format!("Unknown configuration section: {}", Section)));
841 },
842 }
843
844 Ok(())
845 }
846
847 pub async fn SetResourceLimits(
849 &self,
850
851 MemoryLimitMb:Option<u64>,
852
853 CPULimitPercent:Option<f64>,
854
855 DiskLimitMb:Option<u64>,
856 ) -> Result<()> {
857 dev_log!(
858 "lifecycle",
859 "[ApplicationState] Setting resource limits memory={:?}, CPU={:?}, disk={:?}",
860 MemoryLimitMb,
861 CPULimitPercent,
862 DiskLimitMb
863 );
864
865 if let Some(CPU) = CPULimitPercent {
867 if !(0.0..=100.0).contains(&CPU) {
868 return Err(AirError::ResourceLimit("CPU limit must be between 0 and 100".to_string()));
869 }
870 }
871
872 if let Some(Memory) = MemoryLimitMb {
874 if Memory == 0 {
875 return Err(AirError::ResourceLimit("Memory limit must be greater than 0".to_string()));
876 }
877 }
878
879 if let Some(Disk) = DiskLimitMb {
881 if Disk == 0 {
882 return Err(AirError::ResourceLimit("Disk limit must be greater than 0".to_string()));
883 }
884 }
885
886 if MemoryLimitMb.is_some() {
896 dev_log!("lifecycle", "Memory limit set: {} MB", MemoryLimitMb.unwrap());
897 }
898
899 if CPULimitPercent.is_some() {
900 dev_log!("lifecycle", "CPU limit set: {}%", CPULimitPercent.unwrap());
901 }
902
903 if DiskLimitMb.is_some() {
904 dev_log!("lifecycle", "Disk limit set: {} MB", DiskLimitMb.unwrap());
905 }
906
907 Ok(())
908 }
909
910 pub async fn CheckResourceLimits(&self) -> Result<bool> {
912 let _Resources = self.Resources.read().await;
913
914 Ok(false)
918 }
919
920 pub async fn GetConnectionHealthReport(&self) -> ConnectionHealthReport {
922 let Connection = self.Connection.read().await;
923
924 let CurrentTime = Utility::CurrentTimestamp();
925
926 let mut Healthy = 0;
927
928 let mut Stale = 0;
929
930 let mut ByType:HashMap<String, usize> = HashMap::new();
931
932 for ConnectionItem in Connection.values() {
933 let IsStale = CurrentTime - ConnectionItem.LastHeartbeat > 120000; if IsStale {
936 Stale += 1;
937 } else if ConnectionItem.IsActive {
938 Healthy += 1;
939 }
940
941 *ByType.entry(format!("{:?}", ConnectionItem.ConnectionType)).or_insert(0) += 1;
942 }
943
944 ConnectionHealthReport {
945 TotalConnection:Connection.len(),
946
947 HealthyConnection:Healthy,
948
949 StaleConnection:Stale,
950
951 ConnectionByType:ByType,
952
953 LastChecked:CurrentTime,
954 }
955 }
956}