1use std::{
71 collections::HashMap,
72 sync::Arc,
73 time::{Duration, Instant},
74};
75
76use tokio::sync::{Mutex, RwLock, broadcast};
77use serde::{Deserialize, Serialize};
78
79use crate::dev_log;
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum ErrorClass {
84 Transient,
86
87 NonRetryable,
89
90 RateLimited,
92
93 ServerError,
95
96 Unknown,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RetryPolicy {
103 pub MaxRetries:u32,
105
106 pub InitialIntervalMs:u64,
108
109 pub MaxIntervalMs:u64,
111
112 pub BackoffMultiplier:f64,
114
115 pub JitterFactor:f64,
117
118 pub BudgetPerMinute:u32,
120
121 pub ErrorClassification:HashMap<String, ErrorClass>,
123}
124
125impl Default for RetryPolicy {
126 fn default() -> Self {
127 let mut ErrorClassification = HashMap::new();
128
129 ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
131
132 ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
133
134 ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
135
136 ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
137
138 ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
139
140 ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
141
142 ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
143
144 ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
145
146 ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
147
148 ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
149
150 ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
151
152 Self {
153 MaxRetries:3,
154
155 InitialIntervalMs:1000,
156
157 MaxIntervalMs:32000,
158
159 BackoffMultiplier:2.0,
160
161 JitterFactor:0.1,
162
163 BudgetPerMinute:100,
164
165 ErrorClassification,
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
172struct RetryBudget {
173 Attempts:Vec<Instant>,
174
175 MaxPerMinute:u32,
176}
177
178impl RetryBudget {
179 fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
180
181 fn can_retry(&mut self) -> bool {
182 let Now = Instant::now();
183
184 let OneMinuteAgo = Now - Duration::from_secs(60);
185
186 self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
188
189 if self.Attempts.len() < self.MaxPerMinute as usize {
190 self.Attempts.push(Now);
191
192 true
193 } else {
194 false
195 }
196 }
197}
198
199pub struct RetryManager {
201 Policy:RetryPolicy,
202
203 Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
204
205 EventTx:Arc<broadcast::Sender<RetryEvent>>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct RetryEvent {
211 pub Service:String,
212
213 pub Attempt:u32,
214
215 pub ErrorClass:ErrorClass,
216
217 pub DelayMs:u64,
218
219 pub Success:bool,
220
221 pub ErrorMessage:Option<String>,
222}
223
224impl RetryManager {
225 pub fn new(policy:RetryPolicy) -> Self {
227 let (EventTx, _) = broadcast::channel(1000);
228
229 Self {
230 Policy:policy,
231
232 Budgets:Arc::new(Mutex::new(HashMap::new())),
233
234 EventTx:Arc::new(EventTx),
235 }
236 }
237
238 pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
240
241 pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
243 if Attempt == 0 {
244 return Duration::from_millis(0);
245 }
246
247 let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
248 .min(self.Policy.MaxIntervalMs as f64) as u64;
249
250 let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
252
253 let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
254
255 let FinalDelay = BaseDelay + RandomJitter;
256
257 Duration::from_millis(FinalDelay)
258 }
259
260 pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
262 let ErrorClass = self
263 .Policy
264 .ErrorClassification
265 .get(ErrorType)
266 .copied()
267 .unwrap_or(ErrorClass::Unknown);
268
269 match ErrorClass {
270 ErrorClass::RateLimited => {
271 let delay = (attempt + 1) * 5000;
274
275 Duration::from_millis(delay as u64)
276 },
277
278 ErrorClass::ServerError => {
279 let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
281
282 Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
283 },
284
285 ErrorClass::Transient => {
286 self.CalculateRetryDelay(attempt)
288 },
289
290 ErrorClass::NonRetryable | ErrorClass::Unknown => {
291 Duration::from_millis(100)
293 },
294 }
295 }
296
297 pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
299 let ErrorLower = ErrorMessage.to_lowercase();
300
301 for (pattern, class) in &self.Policy.ErrorClassification {
302 if ErrorLower.contains(pattern) {
303 return *class;
304 }
305 }
306
307 ErrorClass::Unknown
308 }
309
310 pub async fn CanRetry(&self, service:&str) -> bool {
313 let mut budgets = self.Budgets.lock().await;
314
315 let budget = budgets
316 .entry(service.to_string())
317 .or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
318
319 budget.can_retry()
320 }
321
322 pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
324
325 pub fn ValidatePolicy(&self) -> Result<(), String> {
327 if self.Policy.MaxRetries == 0 {
328 return Err("MaxRetries must be greater than 0".to_string());
329 }
330
331 if self.Policy.InitialIntervalMs == 0 {
332 return Err("InitialIntervalMs must be greater than 0".to_string());
333 }
334
335 if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
336 return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
337 }
338
339 if self.Policy.BackoffMultiplier <= 1.0 {
340 return Err("BackoffMultiplier must be greater than 1.0".to_string());
341 }
342
343 if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
344 return Err("JitterFactor must be between 0 and 1".to_string());
345 }
346
347 if self.Policy.BudgetPerMinute == 0 {
348 return Err("BudgetPerMinute must be greater than 0".to_string());
349 }
350
351 Ok(())
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
357pub enum CircuitState {
358 Closed,
360
361 Open,
363
364 HalfOpen,
366}
367
368#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
370pub struct CircuitBreakerConfig {
371 pub FailureThreshold:u32,
373
374 pub SuccessThreshold:u32,
376
377 pub TimeoutSecs:u64,
379}
380
381impl Default for CircuitBreakerConfig {
382 fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct CircuitEvent {
388 pub name:String,
389
390 pub FromState:CircuitState,
391
392 pub ToState:CircuitState,
393
394 pub timestamp:u64,
395
396 pub reason:String,
397}
398
399pub struct CircuitBreaker {
402 Name:String,
403
404 State:Arc<RwLock<CircuitState>>,
405
406 Config:CircuitBreakerConfig,
407
408 FailureCount:Arc<RwLock<u32>>,
409
410 SuccessCount:Arc<RwLock<u32>>,
411
412 LastFailureTime:Arc<RwLock<Option<Instant>>>,
413
414 EventTx:Arc<broadcast::Sender<CircuitEvent>>,
415
416 StateTransitionCounter:Arc<RwLock<u32>>,
417}
418
419impl CircuitBreaker {
420 pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
422 let (EventTx, _) = broadcast::channel(1000);
423
424 Self {
425 Name:name.clone(),
426
427 State:Arc::new(RwLock::new(CircuitState::Closed)),
428
429 Config,
430
431 FailureCount:Arc::new(RwLock::new(0)),
432
433 SuccessCount:Arc::new(RwLock::new(0)),
434
435 LastFailureTime:Arc::new(RwLock::new(None)),
436
437 EventTx:Arc::new(EventTx),
438
439 StateTransitionCounter:Arc::new(RwLock::new(0)),
440 }
441 }
442
443 pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
445
446 pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
448
449 pub async fn ValidateState(&self) -> Result<(), String> {
451 let state = *self.State.read().await;
452
453 let failures = *self.FailureCount.read().await;
454
455 let successes = *self.SuccessCount.read().await;
456
457 match state {
458 CircuitState::Closed => {
459 if successes != 0 {
460 return Err(format!("Inconsistent state: Closed but has {} successes", successes));
461 }
462
463 if failures >= self.Config.FailureThreshold {
464 dev_log!(
465 "resilience",
466 "warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
467 failures,
468 self.Config.FailureThreshold
469 );
470 }
471 },
472
473 CircuitState::Open => {
474 if failures < self.Config.FailureThreshold {
475 dev_log!(
476 "resilience",
477 "warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
478 failures,
479 self.Config.FailureThreshold
480 );
481 }
482 },
483
484 CircuitState::HalfOpen => {
485 if successes >= self.Config.SuccessThreshold {
486 return Err(format!(
487 "Inconsistent state: HalfOpen but has {} successes (should be Closed)",
488 successes
489 ));
490 }
491 },
492 }
493
494 Ok(())
495 }
496
497 async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
499 let CurrentState = self.GetState().await;
500
501 if CurrentState == NewState {
502 return Ok(());
504 }
505
506 match (CurrentState, NewState) {
508 (CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
509
510 },
512
513 (CircuitState::Open, CircuitState::HalfOpen) => {
514
515 },
517
518 (CircuitState::HalfOpen, CircuitState::Closed) => {
519
520 },
522
523 _ => {
524 return Err(format!(
525 "Invalid state transition from {:?} to {:?} for {}",
526 CurrentState, NewState, self.Name
527 ));
528 },
529 }
530
531 let event = CircuitEvent {
533 name:self.Name.clone(),
534
535 FromState:CurrentState,
536
537 ToState:NewState,
538
539 timestamp:crate::Utility::CurrentTimestamp(),
540
541 reason:reason.to_string(),
542 };
543
544 let _ = self.EventTx.send(event);
545
546 *self.State.write().await = NewState;
548
549 *self.StateTransitionCounter.write().await += 1;
551
552 dev_log!(
553 "resilience",
554 "[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
555 self.Name,
556 CurrentState,
557 NewState,
558 reason
559 );
560
561 self.ValidateState().await.map_err(|e| {
563 dev_log!(
564 "resilience",
565 "error: [CircuitBreaker] State validation failed after transition: {}",
566 e
567 );
568 e
569 })?;
570
571 Ok(())
572 }
573
574 pub async fn RecordSuccess(&self) {
576 let state = self.GetState().await;
577
578 match state {
579 CircuitState::Closed => {
580 *self.FailureCount.write().await = 0;
582 },
583
584 CircuitState::HalfOpen => {
585 let mut SuccessCount = self.SuccessCount.write().await;
587
588 *SuccessCount += 1;
589
590 if *SuccessCount >= self.Config.SuccessThreshold {
591 let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
593
594 *self.FailureCount.write().await = 0;
595
596 *self.SuccessCount.write().await = 0;
597 }
598 },
599
600 _ => {},
601 }
602 }
603
604 pub async fn RecordFailure(&self) {
606 let State = self.GetState().await;
607
608 *self.LastFailureTime.write().await = Some(Instant::now());
609
610 match State {
611 CircuitState::Closed => {
612 let mut FailureCount = self.FailureCount.write().await;
614
615 *FailureCount += 1;
616
617 if *FailureCount >= self.Config.FailureThreshold {
618 let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
620
621 *self.SuccessCount.write().await = 0;
622 }
623 },
624
625 CircuitState::HalfOpen => {
626 let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
628
629 *self.SuccessCount.write().await = 0;
630 },
631
632 _ => {},
633 }
634 }
635
636 pub async fn AttemptRecovery(&self) -> bool {
639 let state = self.GetState().await;
640
641 if state != CircuitState::Open {
642 return state == CircuitState::HalfOpen;
643 }
644
645 if let Some(last_failure) = *self.LastFailureTime.read().await {
646 if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
647 let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
648
649 *self.SuccessCount.write().await = 0;
650
651 return true;
652 }
653 }
654
655 false
656 }
657
658 pub async fn GetStatistics(&self) -> CircuitStatistics {
660 CircuitStatistics {
661 Name:self.Name.clone(),
662
663 State:self.GetState().await,
664
665 Failures:*self.FailureCount.read().await,
666
667 Successes:*self.SuccessCount.read().await,
668
669 StateTransitions:*self.StateTransitionCounter.read().await,
670
671 LastFailureTime:*self.LastFailureTime.read().await,
672 }
673 }
674
675 pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
677 if config.FailureThreshold == 0 {
678 return Err("FailureThreshold must be greater than 0".to_string());
679 }
680
681 if config.SuccessThreshold == 0 {
682 return Err("SuccessThreshold must be greater than 0".to_string());
683 }
684
685 if config.TimeoutSecs == 0 {
686 return Err("TimeoutSecs must be greater than 0".to_string());
687 }
688
689 Ok(())
690 }
691}
692
693#[derive(Debug, Clone, Serialize)]
695pub struct CircuitStatistics {
696 pub Name:String,
697
698 pub State:CircuitState,
699
700 pub Failures:u32,
701
702 pub Successes:u32,
703
704 pub StateTransitions:u32,
705
706 #[serde(skip_serializing)]
707 pub LastFailureTime:Option<Instant>,
708}
709
710impl<'de> Deserialize<'de> for CircuitStatistics {
711 fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
712 where
713 D: serde::Deserializer<'de>, {
714 use serde::de::{self, Visitor};
715
716 struct CircuitStatisticsVisitor;
717
718 impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
719 type Value = CircuitStatistics;
720
721 fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
722 formatter.write_str("struct CircuitStatistics")
723 }
724
725 fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
726 where
727 A: de::MapAccess<'de>, {
728 let mut Name = None;
729
730 let mut State = None;
731
732 let mut Failures = None;
733
734 let mut Successes = None;
735
736 let mut StateTransitions = None;
737
738 while let Some(key) = map.next_key::<String>()? {
739 match key.as_str() {
740 "name" => Name = Some(map.next_value()?),
741
742 "state" => State = Some(map.next_value()?),
743
744 "failures" => Failures = Some(map.next_value()?),
745
746 "successes" => Successes = Some(map.next_value()?),
747
748 "state_transitions" => StateTransitions = Some(map.next_value()?),
749
750 _ => {
751 map.next_value::<de::IgnoredAny>()?;
752 },
753 }
754 }
755
756 Ok(CircuitStatistics {
757 Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
758
759 State:State.ok_or_else(|| de::Error::missing_field("state"))?,
760
761 Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
762
763 Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
764
765 StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
766
767 LastFailureTime:None,
768 })
769 }
770 }
771
772 const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
773
774 Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
775 }
776}
777
778impl Clone for CircuitBreaker {
779 fn clone(&self) -> Self {
780 Self {
781 Name:self.Name.clone(),
782
783 State:self.State.clone(),
784
785 Config:self.Config.clone(),
786
787 FailureCount:self.FailureCount.clone(),
788
789 SuccessCount:self.SuccessCount.clone(),
790
791 LastFailureTime:self.LastFailureTime.clone(),
792
793 EventTx:self.EventTx.clone(),
794
795 StateTransitionCounter:self.StateTransitionCounter.clone(),
796 }
797 }
798}
799
800#[derive(Debug, Clone, Serialize, Deserialize)]
802pub struct BulkheadConfig {
803 pub max_concurrent:usize,
805
806 pub max_queue:usize,
808
809 pub timeout_secs:u64,
811}
812
813impl Default for BulkheadConfig {
814 fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct BulkheadStatistics {
820 pub name:String,
821
822 pub current_concurrent:u32,
823
824 pub current_queue:u32,
825
826 pub max_concurrent:usize,
827
828 pub max_queue:usize,
829
830 pub total_rejected:u64,
831
832 pub total_completed:u64,
833
834 pub total_timed_out:u64,
835}
836
837pub struct BulkheadExecutor {
839 name:String,
840
841 semaphore:Arc<tokio::sync::Semaphore>,
842
843 config:BulkheadConfig,
844
845 current_requests:Arc<RwLock<u32>>,
846
847 queue_size:Arc<RwLock<u32>>,
848
849 total_rejected:Arc<RwLock<u64>>,
850
851 total_completed:Arc<RwLock<u64>>,
852
853 total_timed_out:Arc<RwLock<u64>>,
854}
855
856impl BulkheadExecutor {
857 pub fn new(name:String, config:BulkheadConfig) -> Self {
859 Self {
860 name:name.clone(),
861
862 semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
863
864 config,
865
866 current_requests:Arc::new(RwLock::new(0)),
867
868 queue_size:Arc::new(RwLock::new(0)),
869
870 total_rejected:Arc::new(RwLock::new(0)),
871
872 total_completed:Arc::new(RwLock::new(0)),
873
874 total_timed_out:Arc::new(RwLock::new(0)),
875 }
876 }
877
878 pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
880 if config.max_concurrent == 0 {
881 return Err("max_concurrent must be greater than 0".to_string());
882 }
883
884 if config.max_queue == 0 {
885 return Err("max_queue must be greater than 0".to_string());
886 }
887
888 if config.timeout_secs == 0 {
889 return Err("timeout_secs must be greater than 0".to_string());
890 }
891
892 Ok(())
893 }
894
895 pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
897 where
898 F: std::future::Future<Output = Result<R, String>>, {
899 async {
900 if self.config.timeout_secs == 0 {
902 return Err("Bulkhead timeout must be greater than 0".to_string());
903 }
904
905 let queue = *self.queue_size.read().await;
907
908 if queue >= self.config.max_queue as u32 {
909 *self.total_rejected.write().await += 1;
910
911 dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
912
913 return Err("Bulkhead queue full".to_string());
914 }
915
916 *self.queue_size.write().await += 1;
918
919 let _Permit =
921 match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
922 .await
923 {
924 Ok(Ok(_)) => {
925 *self.queue_size.write().await -= 1;
928 },
929
930 Ok(Err(e)) => {
931 *self.queue_size.write().await -= 1;
932
933 return Err(format!("Bulkhead semaphore error: {}", e));
934 },
935
936 Err(_) => {
937 *self.queue_size.write().await -= 1;
938
939 *self.total_timed_out.write().await += 1;
940
941 dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
942
943 return Err("Bulkhead timeout waiting for permit".to_string());
944 },
945 };
946
947 *self.queue_size.write().await -= 1;
949
950 *self.current_requests.write().await += 1;
951
952 let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
954
955 let execution_result:Result<R, String> = match execution_result {
956 Ok(Ok(value)) => Ok(value),
957
958 Ok(Err(e)) => Err(e),
959
960 Err(_) => {
961 *self.total_timed_out.write().await += 1;
962
963 Err("Bulkhead execution timeout".to_string())
964 },
965 };
966
967 if execution_result.is_ok() {
968 *self.total_completed.write().await += 1;
969 }
970
971 execution_result
972 }
973 .await
974 }
975
976 pub async fn GetLoad(&self) -> (u32, u32) {
978 async {
979 let current = *self.current_requests.read().await;
980
981 let queue = *self.queue_size.read().await;
982
983 (current, queue)
984 }
985 .await
986 }
987
988 pub async fn GetStatistics(&self) -> BulkheadStatistics {
990 async {
991 BulkheadStatistics {
992 name:self.name.clone(),
993
994 current_concurrent:*self.current_requests.read().await,
995
996 current_queue:*self.queue_size.read().await,
997
998 max_concurrent:self.config.max_concurrent,
999
1000 max_queue:self.config.max_queue,
1001
1002 total_rejected:*self.total_rejected.read().await,
1003
1004 total_completed:*self.total_completed.read().await,
1005
1006 total_timed_out:*self.total_timed_out.read().await,
1007 }
1008 }
1009 .await
1010 }
1011
1012 pub async fn GetUtilization(&self) -> f64 {
1014 let (current, _) = self.GetLoad().await;
1015
1016 if self.config.max_concurrent == 0 {
1017 return 0.0;
1018 }
1019
1020 (current as f64 / self.config.max_concurrent as f64) * 100.0
1021 }
1022}
1023
1024impl Clone for BulkheadExecutor {
1025 fn clone(&self) -> Self {
1026 Self {
1027 name:self.name.clone(),
1028
1029 semaphore:self.semaphore.clone(),
1030
1031 config:self.config.clone(),
1032
1033 current_requests:self.current_requests.clone(),
1034
1035 queue_size:self.queue_size.clone(),
1036
1037 total_rejected:self.total_rejected.clone(),
1038
1039 total_completed:self.total_completed.clone(),
1040
1041 total_timed_out:self.total_timed_out.clone(),
1042 }
1043 }
1044}
1045
1046#[derive(Debug, Clone)]
1048pub struct TimeoutManager {
1049 global_deadline:Option<Instant>,
1050
1051 operation_timeout:Duration,
1052}
1053
1054impl TimeoutManager {
1055 pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1057
1058 pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1060 Self { global_deadline:Some(global_deadline), operation_timeout }
1061 }
1062
1063 pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1065 if timeout.is_zero() {
1066 return Err("Timeout must be greater than 0".to_string());
1067 }
1068
1069 if timeout.as_secs() > 3600 {
1070 return Err("Timeout cannot exceed 1 hour".to_string());
1071 }
1072
1073 Ok(())
1074 }
1075
1076 pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1078 if timeout.is_zero() {
1079 return Err("Timeout must be greater than 0".to_string());
1080 }
1081
1082 if timeout.as_secs() > 3600 {
1083 return Err("Timeout cannot exceed 1 hour".to_string());
1084 }
1085
1086 Ok(timeout)
1087 }
1088
1089 pub fn remaining(&self) -> Option<Duration> {
1091 self.global_deadline.map(|deadline| {
1092 deadline
1093 .checked_duration_since(Instant::now())
1094 .unwrap_or(Duration::from_secs(0))
1095 })
1096 }
1097
1098 pub fn Remaining(&self) -> Option<Duration> {
1100 std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1101 dev_log!("resilience", "error: [TimeoutManager] Panic in Remaining: {:?}", e);
1102 None
1103 })
1104 }
1105
1106 pub fn effective_timeout(&self) -> Duration {
1108 match self.remaining() {
1109 Some(remaining) => self.operation_timeout.min(remaining),
1110
1111 None => self.operation_timeout,
1112 }
1113 }
1114
1115 pub fn EffectiveTimeout(&self) -> Duration {
1117 std::panic::catch_unwind(|| {
1118 let timeout = self.effective_timeout();
1119
1120 match Self::ValidateTimeoutResult(timeout) {
1121 Ok(valid_timeout) => valid_timeout,
1122
1123 Err(_) => Duration::from_secs(30),
1124 }
1125 })
1126 .unwrap_or_else(|e| {
1127 dev_log!("resilience", "error: [TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1128 Duration::from_secs(30)
1129 })
1130 }
1131
1132 pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1134
1135 pub fn IsExceeded(&self) -> bool {
1137 std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1138 dev_log!("resilience", "error: [TimeoutManager] Panic in IsExceeded: {:?}", e);
1139 true })
1141 }
1142
1143 pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1145
1146 pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1148}
1149
1150pub struct ResilienceOrchestrator {
1152 retry_manager:Arc<RetryManager>,
1153
1154 circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1155
1156 bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1157}
1158
1159impl ResilienceOrchestrator {
1160 pub fn new(retry_policy:RetryPolicy) -> Self {
1162 Self {
1163 retry_manager:Arc::new(RetryManager::new(retry_policy)),
1164
1165 circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1166
1167 bulkheads:Arc::new(RwLock::new(HashMap::new())),
1168 }
1169 }
1170
1171 pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1173 let mut breakers = self.circuit_breakers.write().await;
1174
1175 Arc::new(
1176 breakers
1177 .entry(service.to_string())
1178 .or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1179 .clone(),
1180 )
1181 }
1182
1183 pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1185 let mut bulkheads = self.bulkheads.write().await;
1186
1187 Arc::new(
1188 bulkheads
1189 .entry(service.to_string())
1190 .or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1191 .clone(),
1192 )
1193 }
1194
1195 pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1197 let breakers = self.circuit_breakers.read().await;
1198
1199 let mut stats = Vec::new();
1200
1201 for breaker in breakers.values() {
1202 stats.push(breaker.GetStatistics().await);
1203 }
1204
1205 stats
1206 }
1207
1208 pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1210 let bulkheads = self.bulkheads.read().await;
1211
1212 let mut stats = Vec::new();
1213
1214 for bulkhead in bulkheads.values() {
1215 stats.push(bulkhead.GetStatistics().await);
1216 }
1217
1218 stats
1219 }
1220
1221 pub async fn ExecuteResilient<F, R>(
1223 &self,
1224
1225 service:&str,
1226
1227 retry_policy:&RetryPolicy,
1228
1229 circuit_config:CircuitBreakerConfig,
1230
1231 bulkhead_config:BulkheadConfig,
1232
1233 f:F,
1234 ) -> Result<R, String>
1235 where
1236 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1237 if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1239 return Err(format!("Invalid circuit breaker config: {}", e));
1240 }
1241
1242 if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1243 return Err(format!("Invalid bulkhead config: {}", e));
1244 }
1245
1246 let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1247
1248 let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1249
1250 if breaker.GetState().await == CircuitState::Open {
1252 if !breaker.AttemptRecovery().await {
1253 return Err("Circuit breaker is open".to_string());
1254 }
1255 }
1256
1257 let mut Attempt = 0;
1259
1260 let _LastError = "".to_string();
1261
1262 loop {
1263 let result = bulkhead.Execute(f()).await;
1264
1265 match result {
1266 Ok(Value) => {
1267 breaker.RecordSuccess().await;
1268
1269 let Event = RetryEvent {
1271 Service:service.to_string(),
1272
1273 Attempt,
1274
1275 ErrorClass:ErrorClass::Unknown,
1276
1277 DelayMs:0,
1278
1279 Success:true,
1280
1281 ErrorMessage:None,
1282 };
1283
1284 self.retry_manager.PublishRetryEvent(Event);
1285
1286 return Ok(Value);
1287 },
1288
1289 Err(E) => {
1290 let ErrorClass = self.retry_manager.ClassifyError(&E);
1291
1292 breaker.RecordFailure().await;
1293
1294 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1296
1297 let Event = RetryEvent {
1298 Service:service.to_string(),
1299
1300 Attempt,
1301
1302 ErrorClass,
1303
1304 DelayMs:Delay.as_millis() as u64,
1305
1306 Success:false,
1307
1308 ErrorMessage:Some(self.redact_sensitive_data(&E)),
1309 };
1310
1311 self.retry_manager.PublishRetryEvent(Event);
1312
1313 if Attempt < retry_policy.MaxRetries
1314 && ErrorClass != ErrorClass::NonRetryable
1315 && self.retry_manager.CanRetry(service).await
1316 {
1317 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1318
1319 dev_log!(
1320 "resilience",
1321 "[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1322 service,
1323 Attempt + 1,
1324 retry_policy.MaxRetries,
1325 Delay,
1326 self.redact_sensitive_data(&E)
1327 );
1328
1329 tokio::time::sleep(Delay).await;
1330
1331 Attempt += 1;
1332 } else {
1333 return Err(E);
1334 }
1335 },
1336 }
1337 }
1338 }
1339
1340 fn redact_sensitive_data(&self, message:&str) -> String {
1343 let mut redacted = message.to_string();
1344
1345 let patterns = vec![
1347 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
1348 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
1349 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1350 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1351 (
1352 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1353 "Authorization: Bearer [REDACTED]",
1354 ),
1355 (r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1356 (r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1357 ];
1358
1359 for (pattern, replacement) in patterns {
1360 if let Ok(re) = regex::Regex::new(pattern) {
1361 redacted = re.replace_all(&redacted, replacement).to_string();
1362 }
1363 }
1364
1365 redacted
1366 }
1367
1368 pub fn ValidateConfigurations(
1370 &self,
1371
1372 _RetryPolicy:&RetryPolicy,
1373
1374 CircuitConfig:&CircuitBreakerConfig,
1375
1376 BulkheadConfig:&BulkheadConfig,
1377 ) -> Result<(), String> {
1378 self.retry_manager.ValidatePolicy()?;
1379
1380 CircuitBreaker::ValidateConfig(CircuitConfig)?;
1381
1382 BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1383
1384 TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1385
1386 Ok(())
1387 }
1388}
1389
1390impl Clone for ResilienceOrchestrator {
1391 fn clone(&self) -> Self {
1392 Self {
1393 retry_manager:self.retry_manager.clone(),
1394
1395 circuit_breakers:self.circuit_breakers.clone(),
1396
1397 bulkheads:self.bulkheads.clone(),
1398 }
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404
1405 use super::*;
1406
1407 #[test]
1408 fn test_retry_delay_calculation() {
1409 let policy = RetryPolicy::default();
1410
1411 let manager = RetryManager::new(policy);
1412
1413 let delay_1 = manager.CalculateRetryDelay(1);
1414
1415 let delay_2 = manager.CalculateRetryDelay(2);
1416
1417 assert!(delay_2 >= delay_1);
1419 }
1420
1421 #[test]
1422 fn test_adaptive_retry_delay() {
1423 let policy = RetryPolicy::default();
1424
1425 let manager = RetryManager::new(policy);
1426
1427 let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1429
1430 let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1431
1432 assert!(rate_limit_delay >= transient_delay);
1433 }
1434
1435 #[test]
1436 fn test_error_classification() {
1437 let policy = RetryPolicy::default();
1438
1439 let manager = RetryManager::new(policy);
1440
1441 assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1442
1443 assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1444
1445 assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1446
1447 assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1448 }
1449
1450 #[test]
1451 fn test_policy_validation() {
1452 let policy = RetryPolicy::default();
1453
1454 let manager = RetryManager::new(policy);
1455
1456 assert!(manager.ValidatePolicy().is_ok());
1457
1458 let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1459
1460 let invalid_manager = RetryManager::new(invalid_policy);
1461
1462 assert!(invalid_manager.ValidatePolicy().is_err());
1463 }
1464
1465 #[tokio::test]
1466 async fn test_circuit_breaker_state_transitions() {
1467 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1468
1469 let breaker = CircuitBreaker::new("test".to_string(), config);
1470
1471 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1472
1473 breaker.RecordFailure().await;
1474
1475 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1476
1477 breaker.RecordFailure().await;
1478
1479 assert_eq!(breaker.GetState().await, CircuitState::Open);
1480
1481 assert!(breaker.AttemptRecovery().await);
1482
1483 assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1484
1485 breaker.RecordSuccess().await;
1486
1487 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1488 }
1489
1490 #[tokio::test]
1491 async fn test_circuit_breaker_validation() {
1492 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1493
1494 let breaker = CircuitBreaker::new("test".to_string(), config);
1495
1496 assert!(breaker.ValidateState().await.is_ok());
1498
1499 breaker.RecordFailure().await;
1501
1502 breaker.RecordFailure().await;
1503
1504 let validate_result = breaker.ValidateState().await;
1505
1506 assert!(validate_result.is_ok() || validate_result.is_err());
1508 }
1509
1510 #[test]
1511 fn test_circuit_breaker_config_validation() {
1512 let valid_config = CircuitBreakerConfig::default();
1513
1514 assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1515
1516 let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1517
1518 assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1519 }
1520
1521 #[tokio::test]
1522 async fn test_bulkhead_resource_isolation() {
1523 let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1524
1525 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1526
1527 let (_current, _queue) = bulkhead.GetLoad().await;
1528
1529 assert_eq!(_current, 0);
1530
1531 assert_eq!(_queue, 0);
1532
1533 let stats = bulkhead.GetStatistics().await;
1534
1535 assert_eq!(stats.current_concurrent, 0);
1536
1537 assert_eq!(stats.current_queue, 0);
1538
1539 assert_eq!(stats.max_concurrent, 2);
1540
1541 assert_eq!(stats.max_queue, 5);
1542 }
1543
1544 #[tokio::test]
1545 async fn test_bulkhead_utilization() {
1546 let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1547
1548 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1549
1550 let utilization = bulkhead.GetUtilization().await;
1551
1552 assert_eq!(utilization, 0.0);
1553 }
1554
1555 #[test]
1556 fn test_bulkhead_config_validation() {
1557 let valid_config = BulkheadConfig::default();
1558
1559 assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1560
1561 let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1562
1563 assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1564 }
1565
1566 #[test]
1567 fn test_timeout_manager() {
1568 let manager = TimeoutManager::new(Duration::from_secs(30));
1569
1570 assert!(!manager.IsExceeded());
1571
1572 assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1573
1574 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1575
1576 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1577 }
1578
1579 #[test]
1580 fn test_timeout_manager_with_deadline() {
1581 let deadline = Instant::now() + Duration::from_secs(60);
1582
1583 let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1584
1585 let remaining = manager.Remaining();
1586
1587 assert!(remaining.is_some());
1588
1589 assert!(remaining.unwrap() <= Duration::from_secs(60));
1590 }
1591}