Skip to main content

AirLibrary/Resilience/
mod.rs

1//! # Resilience Patterns Module
2//!
3//! Provides robust resilience patterns for external service calls:
4//! - Exponential backoff retry logic with jitter
5//! - Circuit breaker pattern for fault isolation
6//! - Bulkhead pattern for resource isolation
7//! - Timeout management with cascading deadlines
8//!
9//! ## Responsibilities
10//!
11//! ### Retry Patterns
12//! - Exponential backoff with jitter for distributed systems
13//! - Adaptive retry policies based on error classification
14//! - Retry budget management for service rate limiting
15//! - Panic recovery for background retry tasks
16//!
17//! ### Circuit Breaker
18//! - Automatic fault detection and isolation
19//! - State consistency validation across transitions
20//! - Event publishing for telemetry integration
21//! - Half-open state monitoring for recovery testing
22//!
23//! ### Bulkhead Pattern
24//! - Concurrent request limiting for resource protection
25//! - Queue management with overflow protection
26//! - Load monitoring and metrics collection
27//! - Timeout validation for all operations
28//!
29//! ### Timeout Management
30//! - Cascading deadline propagation
31//! - Global deadline coordination
32//! - Operation timeout enforcement
33//! - Panic-safe timeout cancellation
34//!
35//! ## Integration with Mountain
36//!
37//! Resilience patterns directly support Mountain's stability by:
38//! - preventing cascading failures through circuit breaker isolation
39//! - managing load through bulkhead resource limits
40//! - providing event publishing for Mountain's telemetry dashboard
41//! - enabling adaptive retry behavior for improved service availability
42//!
43//! ## VSCode Stability References
44//!
45//! Similar patterns used in VSCode for:
46//! - External service resilience (telemetry, updates, extensions)
47//! - Editor process isolation and recovery
48//! - Background task fault tolerance
49//!
50//! Reference:
51//! vs/base/common/errors
52//!
53//! # FUTURE Enhancements
54//!
55//! - [DISTRIBUTED TRACING] Integrate with Tracing module for retry/circuit span
56//! correlation
57//! - [CUSTOM METRICS] Add detailed bulkhead load metrics to Metrics module
58//! - [EVENT PUBLISHING] Extend circuit breaker events with OpenTelemetry
59//! support
60//! - [ADAPTIVE POLICIES] Enhance retry policies with machine learning-based
61//! error prediction
62//! - [METRICS INTEGRATION] Export resilience metrics to Mountain's telemetry UI
63//! ## Sensitive Data Handling
64//!
65//! This module does not process sensitive data directly but should:
66//! - Redact error messages before logging/event publishing
67//! - Avoid including request payloads in resilience events
68//! - Sanitize service names before publishing to telemetry
69
70use std::{
71	collections::HashMap,
72	sync::Arc,
73	time::{Duration, Instant},
74};
75
76use tokio::sync::{Mutex, RwLock, broadcast};
77use serde::{Deserialize, Serialize};
78
79use crate::dev_log;
80
81/// Error classification for adaptive retry policies
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum ErrorClass {
84	/// Transient errors (network timeouts, temporary failures)
85	Transient,
86
87	/// Non-retryable errors (authentication, invalid requests)
88	NonRetryable,
89
90	/// Rate limit errors (429 Too Many Requests)
91	RateLimited,
92
93	/// Server errors (500-599)
94	ServerError,
95
96	/// Unknown error classification
97	Unknown,
98}
99
100/// Retry policy configuration
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RetryPolicy {
103	/// Maximum number of retry attempts
104	pub MaxRetries:u32,
105
106	/// Initial retry interval (in milliseconds)
107	pub InitialIntervalMs:u64,
108
109	/// Maximum retry interval (in milliseconds)
110	pub MaxIntervalMs:u64,
111
112	/// Exponential backoff multiplier
113	pub BackoffMultiplier:f64,
114
115	/// Jitter percentage (0-1)
116	pub JitterFactor:f64,
117
118	/// Retry budget per service (max retries per minute)
119	pub BudgetPerMinute:u32,
120
121	/// Adaptive error classification for intelligent retry behavior
122	pub ErrorClassification:HashMap<String, ErrorClass>,
123}
124
125impl Default for RetryPolicy {
126	fn default() -> Self {
127		let mut ErrorClassification = HashMap::new();
128
129		// Default error classifications
130		ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
131
132		ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
133
134		ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
135
136		ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
137
138		ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
139
140		ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
141
142		ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
143
144		ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
145
146		ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
147
148		ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
149
150		ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
151
152		Self {
153			MaxRetries:3,
154
155			InitialIntervalMs:1000,
156
157			MaxIntervalMs:32000,
158
159			BackoffMultiplier:2.0,
160
161			JitterFactor:0.1,
162
163			BudgetPerMinute:100,
164
165			ErrorClassification,
166		}
167	}
168}
169
170/// Retry budget tracker
171#[derive(Debug, Clone)]
172struct RetryBudget {
173	Attempts:Vec<Instant>,
174
175	MaxPerMinute:u32,
176}
177
178impl RetryBudget {
179	fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
180
181	fn can_retry(&mut self) -> bool {
182		let Now = Instant::now();
183
184		let OneMinuteAgo = Now - Duration::from_secs(60);
185
186		// Remove attempts older than 1 minute
187		self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
188
189		if self.Attempts.len() < self.MaxPerMinute as usize {
190			self.Attempts.push(Now);
191
192			true
193		} else {
194			false
195		}
196	}
197}
198
199/// Retry manager with budget tracking and adaptive policies
200pub struct RetryManager {
201	Policy:RetryPolicy,
202
203	Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
204
205	EventTx:Arc<broadcast::Sender<RetryEvent>>,
206}
207
208/// Events published by retry operations for metrics and telemetry integration
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct RetryEvent {
211	pub Service:String,
212
213	pub Attempt:u32,
214
215	pub ErrorClass:ErrorClass,
216
217	pub DelayMs:u64,
218
219	pub Success:bool,
220
221	pub ErrorMessage:Option<String>,
222}
223
224impl RetryManager {
225	/// Create a new retry manager
226	pub fn new(policy:RetryPolicy) -> Self {
227		let (EventTx, _) = broadcast::channel(1000);
228
229		Self {
230			Policy:policy,
231
232			Budgets:Arc::new(Mutex::new(HashMap::new())),
233
234			EventTx:Arc::new(EventTx),
235		}
236	}
237
238	/// Get the retry event transmitter for subscription
239	pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
240
241	/// Calculate next retry delay with exponential backoff and jitter
242	pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
243		if Attempt == 0 {
244			return Duration::from_millis(0);
245		}
246
247		let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
248			.min(self.Policy.MaxIntervalMs as f64) as u64;
249
250		// Add jitter
251		let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
252
253		let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
254
255		let FinalDelay = BaseDelay + RandomJitter;
256
257		Duration::from_millis(FinalDelay)
258	}
259
260	/// Calculate adaptive retry delay based on error classification
261	pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
262		let ErrorClass = self
263			.Policy
264			.ErrorClassification
265			.get(ErrorType)
266			.copied()
267			.unwrap_or(ErrorClass::Unknown);
268
269		match ErrorClass {
270			ErrorClass::RateLimited => {
271				// Longer delays with linear backoff for rate limits
272				// 5s, 10s, 15s...
273				let delay = (attempt + 1) * 5000;
274
275				Duration::from_millis(delay as u64)
276			},
277
278			ErrorClass::ServerError => {
279				// Aggressive backoff for server errors
280				let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
281
282				Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
283			},
284
285			ErrorClass::Transient => {
286				// Standard exponential backoff
287				self.CalculateRetryDelay(attempt)
288			},
289
290			ErrorClass::NonRetryable | ErrorClass::Unknown => {
291				// Minimal delay for non-retryable errors (should fail quickly)
292				Duration::from_millis(100)
293			},
294		}
295	}
296
297	/// Classify an error for adaptive retry behavior
298	pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
299		let ErrorLower = ErrorMessage.to_lowercase();
300
301		for (pattern, class) in &self.Policy.ErrorClassification {
302			if ErrorLower.contains(pattern) {
303				return *class;
304			}
305		}
306
307		ErrorClass::Unknown
308	}
309
310	/// Check if retry is possible within budget
311	/// Validates budget state before allowing retry
312	pub async fn CanRetry(&self, service:&str) -> bool {
313		let mut budgets = self.Budgets.lock().await;
314
315		let budget = budgets
316			.entry(service.to_string())
317			.or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
318
319		budget.can_retry()
320	}
321
322	/// Publish a retry event for telemetry integration
323	pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
324
325	/// Validate retry policy configuration
326	pub fn ValidatePolicy(&self) -> Result<(), String> {
327		if self.Policy.MaxRetries == 0 {
328			return Err("MaxRetries must be greater than 0".to_string());
329		}
330
331		if self.Policy.InitialIntervalMs == 0 {
332			return Err("InitialIntervalMs must be greater than 0".to_string());
333		}
334
335		if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
336			return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
337		}
338
339		if self.Policy.BackoffMultiplier <= 1.0 {
340			return Err("BackoffMultiplier must be greater than 1.0".to_string());
341		}
342
343		if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
344			return Err("JitterFactor must be between 0 and 1".to_string());
345		}
346
347		if self.Policy.BudgetPerMinute == 0 {
348			return Err("BudgetPerMinute must be greater than 0".to_string());
349		}
350
351		Ok(())
352	}
353}
354
355/// Circuit breaker states
356#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
357pub enum CircuitState {
358	/// Circuit is closed (normal operation)
359	Closed,
360
361	/// Circuit is open (failing fast)
362	Open,
363
364	/// Circuit is half-open (testing recovery)
365	HalfOpen,
366}
367
368/// Circuit breaker configuration
369#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
370pub struct CircuitBreakerConfig {
371	/// Failure threshold before tripping
372	pub FailureThreshold:u32,
373
374	/// Success threshold before closing
375	pub SuccessThreshold:u32,
376
377	/// Timeout before attempting recovery (in seconds)
378	pub TimeoutSecs:u64,
379}
380
381impl Default for CircuitBreakerConfig {
382	fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
383}
384
385/// Circuit breaker events for metrics and telemetry integration
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct CircuitEvent {
388	pub name:String,
389
390	pub FromState:CircuitState,
391
392	pub ToState:CircuitState,
393
394	pub timestamp:u64,
395
396	pub reason:String,
397}
398
399/// Circuit breaker for fault isolation with state consistency validation and
400/// event publishing
401pub struct CircuitBreaker {
402	Name:String,
403
404	State:Arc<RwLock<CircuitState>>,
405
406	Config:CircuitBreakerConfig,
407
408	FailureCount:Arc<RwLock<u32>>,
409
410	SuccessCount:Arc<RwLock<u32>>,
411
412	LastFailureTime:Arc<RwLock<Option<Instant>>>,
413
414	EventTx:Arc<broadcast::Sender<CircuitEvent>>,
415
416	StateTransitionCounter:Arc<RwLock<u32>>,
417}
418
419impl CircuitBreaker {
420	/// Create a new circuit breaker with event publishing
421	pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
422		let (EventTx, _) = broadcast::channel(1000);
423
424		Self {
425			Name:name.clone(),
426
427			State:Arc::new(RwLock::new(CircuitState::Closed)),
428
429			Config,
430
431			FailureCount:Arc::new(RwLock::new(0)),
432
433			SuccessCount:Arc::new(RwLock::new(0)),
434
435			LastFailureTime:Arc::new(RwLock::new(None)),
436
437			EventTx:Arc::new(EventTx),
438
439			StateTransitionCounter:Arc::new(RwLock::new(0)),
440		}
441	}
442
443	/// Get the circuit breaker event transmitter for subscription
444	pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
445
446	/// Get current state with panic recovery
447	pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
448
449	/// Validate state consistency across all counters
450	pub async fn ValidateState(&self) -> Result<(), String> {
451		let state = *self.State.read().await;
452
453		let failures = *self.FailureCount.read().await;
454
455		let successes = *self.SuccessCount.read().await;
456
457		match state {
458			CircuitState::Closed => {
459				if successes != 0 {
460					return Err(format!("Inconsistent state: Closed but has {} successes", successes));
461				}
462
463				if failures >= self.Config.FailureThreshold {
464					dev_log!(
465						"resilience",
466						"warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
467						failures,
468						self.Config.FailureThreshold
469					);
470				}
471			},
472
473			CircuitState::Open => {
474				if failures < self.Config.FailureThreshold {
475					dev_log!(
476						"resilience",
477						"warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
478						failures,
479						self.Config.FailureThreshold
480					);
481				}
482			},
483
484			CircuitState::HalfOpen => {
485				if successes >= self.Config.SuccessThreshold {
486					return Err(format!(
487						"Inconsistent state: HalfOpen but has {} successes (should be Closed)",
488						successes
489					));
490				}
491			},
492		}
493
494		Ok(())
495	}
496
497	/// Transition state with validation and event publishing
498	async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
499		let CurrentState = self.GetState().await;
500
501		if CurrentState == NewState {
502			// No transition needed
503			return Ok(());
504		}
505
506		// Validate the proposed transition
507		match (CurrentState, NewState) {
508			(CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
509
510				// Valid transitions
511			},
512
513			(CircuitState::Open, CircuitState::HalfOpen) => {
514
515				// Valid transition through recovery
516			},
517
518			(CircuitState::HalfOpen, CircuitState::Closed) => {
519
520				// Valid recovery transition
521			},
522
523			_ => {
524				return Err(format!(
525					"Invalid state transition from {:?} to {:?} for {}",
526					CurrentState, NewState, self.Name
527				));
528			},
529		}
530
531		// Publish state transition event
532		let event = CircuitEvent {
533			name:self.Name.clone(),
534
535			FromState:CurrentState,
536
537			ToState:NewState,
538
539			timestamp:crate::Utility::CurrentTimestamp(),
540
541			reason:reason.to_string(),
542		};
543
544		let _ = self.EventTx.send(event);
545
546		// Transition state
547		*self.State.write().await = NewState;
548
549		// Increment transition counter
550		*self.StateTransitionCounter.write().await += 1;
551
552		dev_log!(
553			"resilience",
554			"[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
555			self.Name,
556			CurrentState,
557			NewState,
558			reason
559		);
560
561		// Validate new state consistency
562		self.ValidateState().await.map_err(|e| {
563			dev_log!(
564				"resilience",
565				"error: [CircuitBreaker] State validation failed after transition: {}",
566				e
567			);
568			e
569		})?;
570
571		Ok(())
572	}
573
574	/// Record a successful call with panic recovery
575	pub async fn RecordSuccess(&self) {
576		let state = self.GetState().await;
577
578		match state {
579			CircuitState::Closed => {
580				// Reset counters
581				*self.FailureCount.write().await = 0;
582			},
583
584			CircuitState::HalfOpen => {
585				// Increment success count
586				let mut SuccessCount = self.SuccessCount.write().await;
587
588				*SuccessCount += 1;
589
590				if *SuccessCount >= self.Config.SuccessThreshold {
591					// Close the circuit
592					let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
593
594					*self.FailureCount.write().await = 0;
595
596					*self.SuccessCount.write().await = 0;
597				}
598			},
599
600			_ => {},
601		}
602	}
603
604	/// Record a failed call with panic recovery
605	pub async fn RecordFailure(&self) {
606		let State = self.GetState().await;
607
608		*self.LastFailureTime.write().await = Some(Instant::now());
609
610		match State {
611			CircuitState::Closed => {
612				// Increment failure count
613				let mut FailureCount = self.FailureCount.write().await;
614
615				*FailureCount += 1;
616
617				if *FailureCount >= self.Config.FailureThreshold {
618					// Open the circuit
619					let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
620
621					*self.SuccessCount.write().await = 0;
622				}
623			},
624
625			CircuitState::HalfOpen => {
626				// Return to open state
627				let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
628
629				*self.SuccessCount.write().await = 0;
630			},
631
632			_ => {},
633		}
634	}
635
636	/// Attempt to transition to half-open if timeout has elapsed with panic
637	/// recovery
638	pub async fn AttemptRecovery(&self) -> bool {
639		let state = self.GetState().await;
640
641		if state != CircuitState::Open {
642			return state == CircuitState::HalfOpen;
643		}
644
645		if let Some(last_failure) = *self.LastFailureTime.read().await {
646			if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
647				let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
648
649				*self.SuccessCount.write().await = 0;
650
651				return true;
652			}
653		}
654
655		false
656	}
657
658	/// Get circuit breaker statistics for metrics
659	pub async fn GetStatistics(&self) -> CircuitStatistics {
660		CircuitStatistics {
661			Name:self.Name.clone(),
662
663			State:self.GetState().await,
664
665			Failures:*self.FailureCount.read().await,
666
667			Successes:*self.SuccessCount.read().await,
668
669			StateTransitions:*self.StateTransitionCounter.read().await,
670
671			LastFailureTime:*self.LastFailureTime.read().await,
672		}
673	}
674
675	/// Validate circuit breaker configuration
676	pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
677		if config.FailureThreshold == 0 {
678			return Err("FailureThreshold must be greater than 0".to_string());
679		}
680
681		if config.SuccessThreshold == 0 {
682			return Err("SuccessThreshold must be greater than 0".to_string());
683		}
684
685		if config.TimeoutSecs == 0 {
686			return Err("TimeoutSecs must be greater than 0".to_string());
687		}
688
689		Ok(())
690	}
691}
692
693/// Circuit breaker statistics for metrics export
694#[derive(Debug, Clone, Serialize)]
695pub struct CircuitStatistics {
696	pub Name:String,
697
698	pub State:CircuitState,
699
700	pub Failures:u32,
701
702	pub Successes:u32,
703
704	pub StateTransitions:u32,
705
706	#[serde(skip_serializing)]
707	pub LastFailureTime:Option<Instant>,
708}
709
710impl<'de> Deserialize<'de> for CircuitStatistics {
711	fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
712	where
713		D: serde::Deserializer<'de>, {
714		use serde::de::{self, Visitor};
715
716		struct CircuitStatisticsVisitor;
717
718		impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
719			type Value = CircuitStatistics;
720
721			fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
722				formatter.write_str("struct CircuitStatistics")
723			}
724
725			fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
726			where
727				A: de::MapAccess<'de>, {
728				let mut Name = None;
729
730				let mut State = None;
731
732				let mut Failures = None;
733
734				let mut Successes = None;
735
736				let mut StateTransitions = None;
737
738				while let Some(key) = map.next_key::<String>()? {
739					match key.as_str() {
740						"name" => Name = Some(map.next_value()?),
741
742						"state" => State = Some(map.next_value()?),
743
744						"failures" => Failures = Some(map.next_value()?),
745
746						"successes" => Successes = Some(map.next_value()?),
747
748						"state_transitions" => StateTransitions = Some(map.next_value()?),
749
750						_ => {
751							map.next_value::<de::IgnoredAny>()?;
752						},
753					}
754				}
755
756				Ok(CircuitStatistics {
757					Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
758
759					State:State.ok_or_else(|| de::Error::missing_field("state"))?,
760
761					Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
762
763					Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
764
765					StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
766
767					LastFailureTime:None,
768				})
769			}
770		}
771
772		const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
773
774		Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
775	}
776}
777
778impl Clone for CircuitBreaker {
779	fn clone(&self) -> Self {
780		Self {
781			Name:self.Name.clone(),
782
783			State:self.State.clone(),
784
785			Config:self.Config.clone(),
786
787			FailureCount:self.FailureCount.clone(),
788
789			SuccessCount:self.SuccessCount.clone(),
790
791			LastFailureTime:self.LastFailureTime.clone(),
792
793			EventTx:self.EventTx.clone(),
794
795			StateTransitionCounter:self.StateTransitionCounter.clone(),
796		}
797	}
798}
799
800/// Bulkhead configuration
801#[derive(Debug, Clone, Serialize, Deserialize)]
802pub struct BulkheadConfig {
803	/// Maximum concurrent requests
804	pub max_concurrent:usize,
805
806	/// Maximum queue size
807	pub max_queue:usize,
808
809	/// Request timeout (in seconds)
810	pub timeout_secs:u64,
811}
812
813impl Default for BulkheadConfig {
814	fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
815}
816
817/// Bulkhead statistics for metrics export
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct BulkheadStatistics {
820	pub name:String,
821
822	pub current_concurrent:u32,
823
824	pub current_queue:u32,
825
826	pub max_concurrent:usize,
827
828	pub max_queue:usize,
829
830	pub total_rejected:u64,
831
832	pub total_completed:u64,
833
834	pub total_timed_out:u64,
835}
836
837/// Bulkhead semaphore for resource isolation with metrics and panic recovery
838pub struct BulkheadExecutor {
839	name:String,
840
841	semaphore:Arc<tokio::sync::Semaphore>,
842
843	config:BulkheadConfig,
844
845	current_requests:Arc<RwLock<u32>>,
846
847	queue_size:Arc<RwLock<u32>>,
848
849	total_rejected:Arc<RwLock<u64>>,
850
851	total_completed:Arc<RwLock<u64>>,
852
853	total_timed_out:Arc<RwLock<u64>>,
854}
855
856impl BulkheadExecutor {
857	/// Create a new bulkhead executor with metrics tracking
858	pub fn new(name:String, config:BulkheadConfig) -> Self {
859		Self {
860			name:name.clone(),
861
862			semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
863
864			config,
865
866			current_requests:Arc::new(RwLock::new(0)),
867
868			queue_size:Arc::new(RwLock::new(0)),
869
870			total_rejected:Arc::new(RwLock::new(0)),
871
872			total_completed:Arc::new(RwLock::new(0)),
873
874			total_timed_out:Arc::new(RwLock::new(0)),
875		}
876	}
877
878	/// Validate bulkhead configuration
879	pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
880		if config.max_concurrent == 0 {
881			return Err("max_concurrent must be greater than 0".to_string());
882		}
883
884		if config.max_queue == 0 {
885			return Err("max_queue must be greater than 0".to_string());
886		}
887
888		if config.timeout_secs == 0 {
889			return Err("timeout_secs must be greater than 0".to_string());
890		}
891
892		Ok(())
893	}
894
895	/// Execute with bulkhead protection and panic recovery
896	pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
897	where
898		F: std::future::Future<Output = Result<R, String>>, {
899		async {
900			// Validate timeout
901			if self.config.timeout_secs == 0 {
902				return Err("Bulkhead timeout must be greater than 0".to_string());
903			}
904
905			// Check queue size
906			let queue = *self.queue_size.read().await;
907
908			if queue >= self.config.max_queue as u32 {
909				*self.total_rejected.write().await += 1;
910
911				dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
912
913				return Err("Bulkhead queue full".to_string());
914			}
915
916			// Increment queue size
917			*self.queue_size.write().await += 1;
918
919			// Acquire permit with timeout
920			let _Permit =
921				match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
922					.await
923				{
924					Ok(Ok(_)) => {
925						// Permit acquired, proceed with execution
926						// Decrement queue size
927						*self.queue_size.write().await -= 1;
928					},
929
930					Ok(Err(e)) => {
931						*self.queue_size.write().await -= 1;
932
933						return Err(format!("Bulkhead semaphore error: {}", e));
934					},
935
936					Err(_) => {
937						*self.queue_size.write().await -= 1;
938
939						*self.total_timed_out.write().await += 1;
940
941						dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
942
943						return Err("Bulkhead timeout waiting for permit".to_string());
944					},
945				};
946
947			// Decrement queue size, increment current requests
948			*self.queue_size.write().await -= 1;
949
950			*self.current_requests.write().await += 1;
951
952			// Execute with timeout (no catch_unwind to avoid interior mutability issues)
953			let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
954
955			let execution_result:Result<R, String> = match execution_result {
956				Ok(Ok(value)) => Ok(value),
957
958				Ok(Err(e)) => Err(e),
959
960				Err(_) => {
961					*self.total_timed_out.write().await += 1;
962
963					Err("Bulkhead execution timeout".to_string())
964				},
965			};
966
967			if execution_result.is_ok() {
968				*self.total_completed.write().await += 1;
969			}
970
971			execution_result
972		}
973		.await
974	}
975
976	/// Get current load with panic recovery
977	pub async fn GetLoad(&self) -> (u32, u32) {
978		async {
979			let current = *self.current_requests.read().await;
980
981			let queue = *self.queue_size.read().await;
982
983			(current, queue)
984		}
985		.await
986	}
987
988	/// Get bulkhead statistics for metrics
989	pub async fn GetStatistics(&self) -> BulkheadStatistics {
990		async {
991			BulkheadStatistics {
992				name:self.name.clone(),
993
994				current_concurrent:*self.current_requests.read().await,
995
996				current_queue:*self.queue_size.read().await,
997
998				max_concurrent:self.config.max_concurrent,
999
1000				max_queue:self.config.max_queue,
1001
1002				total_rejected:*self.total_rejected.read().await,
1003
1004				total_completed:*self.total_completed.read().await,
1005
1006				total_timed_out:*self.total_timed_out.read().await,
1007			}
1008		}
1009		.await
1010	}
1011
1012	/// Calculate utilization percentage
1013	pub async fn GetUtilization(&self) -> f64 {
1014		let (current, _) = self.GetLoad().await;
1015
1016		if self.config.max_concurrent == 0 {
1017			return 0.0;
1018		}
1019
1020		(current as f64 / self.config.max_concurrent as f64) * 100.0
1021	}
1022}
1023
1024impl Clone for BulkheadExecutor {
1025	fn clone(&self) -> Self {
1026		Self {
1027			name:self.name.clone(),
1028
1029			semaphore:self.semaphore.clone(),
1030
1031			config:self.config.clone(),
1032
1033			current_requests:self.current_requests.clone(),
1034
1035			queue_size:self.queue_size.clone(),
1036
1037			total_rejected:self.total_rejected.clone(),
1038
1039			total_completed:self.total_completed.clone(),
1040
1041			total_timed_out:self.total_timed_out.clone(),
1042		}
1043	}
1044}
1045
1046/// Timeout manager for cascading deadlines with validation
1047#[derive(Debug, Clone)]
1048pub struct TimeoutManager {
1049	global_deadline:Option<Instant>,
1050
1051	operation_timeout:Duration,
1052}
1053
1054impl TimeoutManager {
1055	/// Create a new timeout manager
1056	pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1057
1058	/// Create with global deadline
1059	pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1060		Self { global_deadline:Some(global_deadline), operation_timeout }
1061	}
1062
1063	/// Validate timeout configuration
1064	pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1065		if timeout.is_zero() {
1066			return Err("Timeout must be greater than 0".to_string());
1067		}
1068
1069		if timeout.as_secs() > 3600 {
1070			return Err("Timeout cannot exceed 1 hour".to_string());
1071		}
1072
1073		Ok(())
1074	}
1075
1076	/// Validate timeout as Result for error handling
1077	pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1078		if timeout.is_zero() {
1079			return Err("Timeout must be greater than 0".to_string());
1080		}
1081
1082		if timeout.as_secs() > 3600 {
1083			return Err("Timeout cannot exceed 1 hour".to_string());
1084		}
1085
1086		Ok(timeout)
1087	}
1088
1089	/// Get remaining time until deadline
1090	pub fn remaining(&self) -> Option<Duration> {
1091		self.global_deadline.map(|deadline| {
1092			deadline
1093				.checked_duration_since(Instant::now())
1094				.unwrap_or(Duration::from_secs(0))
1095		})
1096	}
1097
1098	/// Get remaining time with panic recovery
1099	pub fn Remaining(&self) -> Option<Duration> {
1100		std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1101			dev_log!("resilience", "error: [TimeoutManager] Panic in Remaining: {:?}", e);
1102			None
1103		})
1104	}
1105
1106	/// Get effective timeout (minimum of operation timeout and remaining time)
1107	pub fn effective_timeout(&self) -> Duration {
1108		match self.remaining() {
1109			Some(remaining) => self.operation_timeout.min(remaining),
1110
1111			None => self.operation_timeout,
1112		}
1113	}
1114
1115	/// Get effective timeout with validation
1116	pub fn EffectiveTimeout(&self) -> Duration {
1117		std::panic::catch_unwind(|| {
1118			let timeout = self.effective_timeout();
1119
1120			match Self::ValidateTimeoutResult(timeout) {
1121				Ok(valid_timeout) => valid_timeout,
1122
1123				Err(_) => Duration::from_secs(30),
1124			}
1125		})
1126		.unwrap_or_else(|e| {
1127			dev_log!("resilience", "error: [TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1128			Duration::from_secs(30)
1129		})
1130	}
1131
1132	/// Check if deadline has been exceeded
1133	pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1134
1135	/// Check if deadline has been exceeded with panic recovery
1136	pub fn IsExceeded(&self) -> bool {
1137		std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1138			dev_log!("resilience", "error: [TimeoutManager] Panic in IsExceeded: {:?}", e);
1139			true // Fail safe: assume exceeded
1140		})
1141	}
1142
1143	/// Get the global deadline
1144	pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1145
1146	/// Get the operation timeout
1147	pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1148}
1149
1150/// Resilience orchestrator combining all patterns
1151pub struct ResilienceOrchestrator {
1152	retry_manager:Arc<RetryManager>,
1153
1154	circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1155
1156	bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1157}
1158
1159impl ResilienceOrchestrator {
1160	/// Create a new resilience orchestrator
1161	pub fn new(retry_policy:RetryPolicy) -> Self {
1162		Self {
1163			retry_manager:Arc::new(RetryManager::new(retry_policy)),
1164
1165			circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1166
1167			bulkheads:Arc::new(RwLock::new(HashMap::new())),
1168		}
1169	}
1170
1171	/// Get or create circuit breaker with configuration validation
1172	pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1173		let mut breakers = self.circuit_breakers.write().await;
1174
1175		Arc::new(
1176			breakers
1177				.entry(service.to_string())
1178				.or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1179				.clone(),
1180		)
1181	}
1182
1183	/// Get or create bulkhead with configuration validation
1184	pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1185		let mut bulkheads = self.bulkheads.write().await;
1186
1187		Arc::new(
1188			bulkheads
1189				.entry(service.to_string())
1190				.or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1191				.clone(),
1192		)
1193	}
1194
1195	/// Get all circuit breaker statistics
1196	pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1197		let breakers = self.circuit_breakers.read().await;
1198
1199		let mut stats = Vec::new();
1200
1201		for breaker in breakers.values() {
1202			stats.push(breaker.GetStatistics().await);
1203		}
1204
1205		stats
1206	}
1207
1208	/// Get all bulkhead statistics
1209	pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1210		let bulkheads = self.bulkheads.read().await;
1211
1212		let mut stats = Vec::new();
1213
1214		for bulkhead in bulkheads.values() {
1215			stats.push(bulkhead.GetStatistics().await);
1216		}
1217
1218		stats
1219	}
1220
1221	/// Execute with full resilience and event publishing
1222	pub async fn ExecuteResilient<F, R>(
1223		&self,
1224
1225		service:&str,
1226
1227		retry_policy:&RetryPolicy,
1228
1229		circuit_config:CircuitBreakerConfig,
1230
1231		bulkhead_config:BulkheadConfig,
1232
1233		f:F,
1234	) -> Result<R, String>
1235	where
1236		F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1237		// Validate configurations
1238		if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1239			return Err(format!("Invalid circuit breaker config: {}", e));
1240		}
1241
1242		if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1243			return Err(format!("Invalid bulkhead config: {}", e));
1244		}
1245
1246		let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1247
1248		let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1249
1250		// Check circuit state
1251		if breaker.GetState().await == CircuitState::Open {
1252			if !breaker.AttemptRecovery().await {
1253				return Err("Circuit breaker is open".to_string());
1254			}
1255		}
1256
1257		// Execute with bulkhead protection and retry logic
1258		let mut Attempt = 0;
1259
1260		let _LastError = "".to_string();
1261
1262		loop {
1263			let result = bulkhead.Execute(f()).await;
1264
1265			match result {
1266				Ok(Value) => {
1267					breaker.RecordSuccess().await;
1268
1269					// Publish retry success event
1270					let Event = RetryEvent {
1271						Service:service.to_string(),
1272
1273						Attempt,
1274
1275						ErrorClass:ErrorClass::Unknown,
1276
1277						DelayMs:0,
1278
1279						Success:true,
1280
1281						ErrorMessage:None,
1282					};
1283
1284					self.retry_manager.PublishRetryEvent(Event);
1285
1286					return Ok(Value);
1287				},
1288
1289				Err(E) => {
1290					let ErrorClass = self.retry_manager.ClassifyError(&E);
1291
1292					breaker.RecordFailure().await;
1293
1294					// Publish retry failure event
1295					let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1296
1297					let Event = RetryEvent {
1298						Service:service.to_string(),
1299
1300						Attempt,
1301
1302						ErrorClass,
1303
1304						DelayMs:Delay.as_millis() as u64,
1305
1306						Success:false,
1307
1308						ErrorMessage:Some(self.redact_sensitive_data(&E)),
1309					};
1310
1311					self.retry_manager.PublishRetryEvent(Event);
1312
1313					if Attempt < retry_policy.MaxRetries
1314						&& ErrorClass != ErrorClass::NonRetryable
1315						&& self.retry_manager.CanRetry(service).await
1316					{
1317						let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1318
1319						dev_log!(
1320							"resilience",
1321							"[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1322							service,
1323							Attempt + 1,
1324							retry_policy.MaxRetries,
1325							Delay,
1326							self.redact_sensitive_data(&E)
1327						);
1328
1329						tokio::time::sleep(Delay).await;
1330
1331						Attempt += 1;
1332					} else {
1333						return Err(E);
1334					}
1335				},
1336			}
1337		}
1338	}
1339
1340	/// Redact sensitive data from error messages before logging/event
1341	/// publishing
1342	fn redact_sensitive_data(&self, message:&str) -> String {
1343		let mut redacted = message.to_string();
1344
1345		// Redact common patterns - simplified to avoid escaping issues
1346		let patterns = vec![
1347			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
1348			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
1349			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1350			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1351			(
1352				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1353				"Authorization: Bearer [REDACTED]",
1354			),
1355			(r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1356			(r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1357		];
1358
1359		for (pattern, replacement) in patterns {
1360			if let Ok(re) = regex::Regex::new(pattern) {
1361				redacted = re.replace_all(&redacted, replacement).to_string();
1362			}
1363		}
1364
1365		redacted
1366	}
1367
1368	/// Validate all configurations
1369	pub fn ValidateConfigurations(
1370		&self,
1371
1372		_RetryPolicy:&RetryPolicy,
1373
1374		CircuitConfig:&CircuitBreakerConfig,
1375
1376		BulkheadConfig:&BulkheadConfig,
1377	) -> Result<(), String> {
1378		self.retry_manager.ValidatePolicy()?;
1379
1380		CircuitBreaker::ValidateConfig(CircuitConfig)?;
1381
1382		BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1383
1384		TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1385
1386		Ok(())
1387	}
1388}
1389
1390impl Clone for ResilienceOrchestrator {
1391	fn clone(&self) -> Self {
1392		Self {
1393			retry_manager:self.retry_manager.clone(),
1394
1395			circuit_breakers:self.circuit_breakers.clone(),
1396
1397			bulkheads:self.bulkheads.clone(),
1398		}
1399	}
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404
1405	use super::*;
1406
1407	#[test]
1408	fn test_retry_delay_calculation() {
1409		let policy = RetryPolicy::default();
1410
1411		let manager = RetryManager::new(policy);
1412
1413		let delay_1 = manager.CalculateRetryDelay(1);
1414
1415		let delay_2 = manager.CalculateRetryDelay(2);
1416
1417		// delay_2 should be roughly double delay_1 (with some jitter)
1418		assert!(delay_2 >= delay_1);
1419	}
1420
1421	#[test]
1422	fn test_adaptive_retry_delay() {
1423		let policy = RetryPolicy::default();
1424
1425		let manager = RetryManager::new(policy);
1426
1427		// Rate limited errors should have longer delays
1428		let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1429
1430		let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1431
1432		assert!(rate_limit_delay >= transient_delay);
1433	}
1434
1435	#[test]
1436	fn test_error_classification() {
1437		let policy = RetryPolicy::default();
1438
1439		let manager = RetryManager::new(policy);
1440
1441		assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1442
1443		assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1444
1445		assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1446
1447		assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1448	}
1449
1450	#[test]
1451	fn test_policy_validation() {
1452		let policy = RetryPolicy::default();
1453
1454		let manager = RetryManager::new(policy);
1455
1456		assert!(manager.ValidatePolicy().is_ok());
1457
1458		let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1459
1460		let invalid_manager = RetryManager::new(invalid_policy);
1461
1462		assert!(invalid_manager.ValidatePolicy().is_err());
1463	}
1464
1465	#[tokio::test]
1466	async fn test_circuit_breaker_state_transitions() {
1467		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1468
1469		let breaker = CircuitBreaker::new("test".to_string(), config);
1470
1471		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1472
1473		breaker.RecordFailure().await;
1474
1475		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1476
1477		breaker.RecordFailure().await;
1478
1479		assert_eq!(breaker.GetState().await, CircuitState::Open);
1480
1481		assert!(breaker.AttemptRecovery().await);
1482
1483		assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1484
1485		breaker.RecordSuccess().await;
1486
1487		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1488	}
1489
1490	#[tokio::test]
1491	async fn test_circuit_breaker_validation() {
1492		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1493
1494		let breaker = CircuitBreaker::new("test".to_string(), config);
1495
1496		// Validate initial state
1497		assert!(breaker.ValidateState().await.is_ok());
1498
1499		// Trigger state transition to open
1500		breaker.RecordFailure().await;
1501
1502		breaker.RecordFailure().await;
1503
1504		let validate_result = breaker.ValidateState().await;
1505
1506		// May be valid due to timeout behavior
1507		assert!(validate_result.is_ok() || validate_result.is_err());
1508	}
1509
1510	#[test]
1511	fn test_circuit_breaker_config_validation() {
1512		let valid_config = CircuitBreakerConfig::default();
1513
1514		assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1515
1516		let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1517
1518		assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1519	}
1520
1521	#[tokio::test]
1522	async fn test_bulkhead_resource_isolation() {
1523		let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1524
1525		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1526
1527		let (_current, _queue) = bulkhead.GetLoad().await;
1528
1529		assert_eq!(_current, 0);
1530
1531		assert_eq!(_queue, 0);
1532
1533		let stats = bulkhead.GetStatistics().await;
1534
1535		assert_eq!(stats.current_concurrent, 0);
1536
1537		assert_eq!(stats.current_queue, 0);
1538
1539		assert_eq!(stats.max_concurrent, 2);
1540
1541		assert_eq!(stats.max_queue, 5);
1542	}
1543
1544	#[tokio::test]
1545	async fn test_bulkhead_utilization() {
1546		let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1547
1548		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1549
1550		let utilization = bulkhead.GetUtilization().await;
1551
1552		assert_eq!(utilization, 0.0);
1553	}
1554
1555	#[test]
1556	fn test_bulkhead_config_validation() {
1557		let valid_config = BulkheadConfig::default();
1558
1559		assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1560
1561		let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1562
1563		assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1564	}
1565
1566	#[test]
1567	fn test_timeout_manager() {
1568		let manager = TimeoutManager::new(Duration::from_secs(30));
1569
1570		assert!(!manager.IsExceeded());
1571
1572		assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1573
1574		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1575
1576		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1577	}
1578
1579	#[test]
1580	fn test_timeout_manager_with_deadline() {
1581		let deadline = Instant::now() + Duration::from_secs(60);
1582
1583		let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1584
1585		let remaining = manager.Remaining();
1586
1587		assert!(remaining.is_some());
1588
1589		assert!(remaining.unwrap() <= Duration::from_secs(60));
1590	}
1591}