1616 *
1717 */
1818
19- use std:: sync:: { Arc , LazyLock , atomic:: AtomicBool } ;
19+ use std:: collections:: VecDeque ;
20+ use std:: sync:: { Arc , LazyLock , Mutex , atomic:: AtomicBool } ;
2021
2122use actix_web:: {
2223 body:: MessageBody ,
@@ -34,8 +35,75 @@ use tracing::{info, trace, warn};
3435use crate :: analytics:: { SYS_INFO , refresh_sys_info} ;
3536use crate :: parseable:: PARSEABLE ;
3637
38+ #[ derive( Debug , Clone ) ]
39+ struct ResourceSample {
40+ cpu_usage : f32 ,
41+ memory_usage : f32 ,
42+ timestamp : std:: time:: Instant ,
43+ }
44+
45+ /// Structure to maintain rolling average of resource utilization
46+ struct ResourceHistory {
47+ samples : VecDeque < ResourceSample > ,
48+ window_duration : Duration ,
49+ }
50+
51+ impl ResourceHistory {
52+ fn new ( window_duration : Duration ) -> Self {
53+ Self {
54+ samples : VecDeque :: new ( ) ,
55+ window_duration,
56+ }
57+ }
58+
59+ fn add_sample ( & mut self , cpu_usage : f32 , memory_usage : f32 ) {
60+ let now = std:: time:: Instant :: now ( ) ;
61+ let sample = ResourceSample {
62+ cpu_usage,
63+ memory_usage,
64+ timestamp : now,
65+ } ;
66+
67+ // Add new sample
68+ self . samples . push_back ( sample) ;
69+
70+ // Remove old samples outside the window
71+ let cutoff_time = now - self . window_duration ;
72+ while let Some ( front) = self . samples . front ( ) {
73+ if front. timestamp < cutoff_time {
74+ self . samples . pop_front ( ) ;
75+ } else {
76+ break ;
77+ }
78+ }
79+ }
80+
81+ fn get_average ( & self ) -> Option < ( f32 , f32 ) > {
82+ if self . samples . is_empty ( ) {
83+ return None ;
84+ }
85+
86+ let count = self . samples . len ( ) as f32 ;
87+ let ( total_cpu, total_memory) =
88+ self . samples
89+ . iter ( )
90+ . fold ( ( 0.0 , 0.0 ) , |( cpu_acc, mem_acc) , sample| {
91+ ( cpu_acc + sample. cpu_usage , mem_acc + sample. memory_usage )
92+ } ) ;
93+
94+ Some ( ( total_cpu / count, total_memory / count) )
95+ }
96+
97+ fn sample_count ( & self ) -> usize {
98+ self . samples . len ( )
99+ }
100+ }
101+
37102static RESOURCE_CHECK_ENABLED : LazyLock < Arc < AtomicBool > > =
38- LazyLock :: new ( || Arc :: new ( AtomicBool :: new ( false ) ) ) ;
103+ LazyLock :: new ( || Arc :: new ( AtomicBool :: new ( true ) ) ) ;
104+
105+ static RESOURCE_HISTORY : LazyLock < Arc < Mutex < ResourceHistory > > > =
106+ LazyLock :: new ( || Arc :: new ( Mutex :: new ( ResourceHistory :: new ( Duration :: from_secs ( 120 ) ) ) ) ) ;
39107
40108/// Spawn a background task to monitor system resources
41109pub fn spawn_resource_monitor ( shutdown_rx : tokio:: sync:: oneshot:: Receiver < ( ) > ) {
@@ -48,9 +116,13 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
48116 let memory_threshold = PARSEABLE . options . memory_utilization_threshold ;
49117
50118 info ! (
51- "Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%" ,
119+ "Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}% (2-minute rolling average) " ,
52120 cpu_threshold, memory_threshold
53121 ) ;
122+
123+ // Calculate minimum samples needed for a reliable 2-minute average
124+ let min_samples_for_decision = std:: cmp:: max ( 1 , 120 / resource_check_interval as usize ) ;
125+
54126 loop {
55127 select ! {
56128 _ = check_interval. tick( ) => {
@@ -65,32 +137,61 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
65137 ( used_memory, total_memory, cpu_usage)
66138 } ) . await . unwrap( ) ;
67139
68- let mut resource_ok = true ;
69-
70140 // Calculate memory usage percentage
71141 let memory_usage = if total_memory > 0.0 {
72142 ( used_memory / total_memory) * 100.0
73143 } else {
74144 0.0
75145 } ;
76146
77- // Log current resource usage every few checks for debugging
78- info! ( "Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)" ,
79- cpu_usage , memory_usage ,
80- used_memory / 1024.0 / 1024.0 / 1024.0 ,
81- total_memory / 1024.0 / 1024.0 / 1024.0 ) ;
147+ // Add current sample to history
148+ {
149+ let mut history = RESOURCE_HISTORY . lock ( ) . unwrap ( ) ;
150+ history . add_sample ( cpu_usage , memory_usage ) ;
151+ }
82152
83- // Check memory utilization
84- if memory_usage > memory_threshold {
85- warn!( "High memory usage detected: {:.1}% (threshold: {:.1}%)" ,
86- memory_usage, memory_threshold) ;
153+ // Get rolling averages
154+ let ( avg_cpu, avg_memory, sample_count) = {
155+ let history = RESOURCE_HISTORY . lock( ) . unwrap( ) ;
156+ if let Some ( ( cpu_avg, mem_avg) ) = history. get_average( ) {
157+ ( cpu_avg, mem_avg, history. sample_count( ) )
158+ } else {
159+ ( cpu_usage, memory_usage, 1 ) // Fallback to current values if no history
160+ }
161+ } ;
162+
163+ // Log current and average resource usage
164+ info!(
165+ "Resource usage - Current: CPU {:.1}%, Memory {:.1}% | 2-min avg: CPU {:.1}%, Memory {:.1}% (samples: {})" ,
166+ cpu_usage, memory_usage, avg_cpu, avg_memory, sample_count
167+ ) ;
168+
169+ // Only make decisions based on rolling average if we have enough samples
170+ let ( decision_cpu, decision_memory) = if sample_count >= min_samples_for_decision {
171+ ( avg_cpu, avg_memory)
172+ } else {
173+ // For the first few minutes, use current values but be more conservative
174+ info!( "Still warming up resource history (need {} samples, have {})" , min_samples_for_decision, sample_count) ;
175+ ( cpu_usage, memory_usage)
176+ } ;
177+
178+ let mut resource_ok = true ;
179+
180+ // Check memory utilization against rolling average
181+ if decision_memory > memory_threshold {
182+ warn!(
183+ "High memory usage detected: 2-min avg {:.1}% (threshold: {:.1}%, current: {:.1}%)" ,
184+ decision_memory, memory_threshold, memory_usage
185+ ) ;
87186 resource_ok = false ;
88187 }
89188
90- // Check CPU utilization
91- if cpu_usage > cpu_threshold {
92- warn!( "High CPU usage detected: {:.1}% (threshold: {:.1}%)" ,
93- cpu_usage, cpu_threshold) ;
189+ // Check CPU utilization against rolling average
190+ if decision_cpu > cpu_threshold {
191+ warn!(
192+ "High CPU usage detected: 2-min avg {:.1}% (threshold: {:.1}%, current: {:.1}%)" ,
193+ decision_cpu, cpu_threshold, cpu_usage
194+ ) ;
94195 resource_ok = false ;
95196 }
96197
@@ -100,9 +201,9 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
100201 // Log state changes
101202 if previous_state != resource_ok {
102203 if resource_ok {
103- info!( "Resource utilization back to normal - requests will be accepted" ) ;
204+ info!( "Resource utilization back to normal (2-min avg: CPU {:.1}%, Memory {:.1}%) - requests will be accepted" , avg_cpu , avg_memory ) ;
104205 } else {
105- warn!( "Resource utilization too high - requests will be rejected" ) ;
206+ warn!( "Resource utilization too high (2-min avg: CPU {:.1}%, Memory {:.1}%) - requests will be rejected" , avg_cpu , avg_memory ) ;
106207 }
107208 }
108209 } ,
@@ -116,17 +217,17 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
116217}
117218
118219/// Middleware to check system resource utilization before processing requests
119- /// Returns 503 Service Unavailable if resources are over-utilized
220+ /// Returns 503 Service Unavailable if resources are over-utilized (based on 2-minute rolling average)
120221pub async fn check_resource_utilization_middleware (
121222 req : ServiceRequest ,
122223 next : Next < impl MessageBody > ,
123224) -> Result < ServiceResponse < impl MessageBody > , Error > {
124225 let resource_ok = RESOURCE_CHECK_ENABLED . load ( std:: sync:: atomic:: Ordering :: SeqCst ) ;
125226
126227 if !resource_ok {
127- let error_msg = "Server resources over-utilized" ;
228+ let error_msg = "Server resources over-utilized (based on 2-minute rolling average) " ;
128229 warn ! (
129- "Rejecting request to {} due to resource constraints" ,
230+ "Rejecting request to {} due to resource constraints (2-minute average above threshold) " ,
130231 req. path( )
131232 ) ;
132233 return Err ( ErrorServiceUnavailable ( error_msg) ) ;
@@ -135,3 +236,66 @@ pub async fn check_resource_utilization_middleware(
135236 // Continue processing the request if resource utilization is within limits
136237 next. call ( req) . await
137238}
239+
240+ #[ cfg( test) ]
241+ mod tests {
242+ use super :: * ;
243+ use std:: time:: Duration ;
244+
245+ #[ test]
246+ fn test_resource_history_basic ( ) {
247+ let mut history = ResourceHistory :: new ( Duration :: from_secs ( 60 ) ) ;
248+
249+ // Add some samples
250+ history. add_sample ( 50.0 , 60.0 ) ;
251+ history. add_sample ( 70.0 , 80.0 ) ;
252+
253+ let ( avg_cpu, avg_memory) = history. get_average ( ) . unwrap ( ) ;
254+ assert_eq ! ( avg_cpu, 60.0 ) ; // (50 + 70) / 2
255+ assert_eq ! ( avg_memory, 70.0 ) ; // (60 + 80) / 2
256+ assert_eq ! ( history. sample_count( ) , 2 ) ;
257+ }
258+
259+ #[ test]
260+ fn test_resource_history_window_cleanup ( ) {
261+ let mut history = ResourceHistory :: new ( Duration :: from_millis ( 100 ) ) ;
262+
263+ // Add samples
264+ history. add_sample ( 50.0 , 60.0 ) ;
265+ std:: thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
266+ history. add_sample ( 70.0 , 80.0 ) ;
267+
268+ // Both samples should be present
269+ assert_eq ! ( history. sample_count( ) , 2 ) ;
270+
271+ // Wait for first sample to expire
272+ std:: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
273+ history. add_sample ( 90.0 , 100.0 ) ;
274+
275+ // Old samples should be cleaned up, only recent samples remain
276+ assert ! ( history. sample_count( ) <= 2 ) ;
277+
278+ let ( avg_cpu, avg_memory) = history. get_average ( ) . unwrap ( ) ;
279+ // Should be average of recent samples only
280+ assert ! ( avg_cpu >= 70.0 ) ;
281+ assert ! ( avg_memory >= 80.0 ) ;
282+ }
283+
284+ #[ test]
285+ fn test_resource_history_empty ( ) {
286+ let history = ResourceHistory :: new ( Duration :: from_secs ( 60 ) ) ;
287+ assert ! ( history. get_average( ) . is_none( ) ) ;
288+ assert_eq ! ( history. sample_count( ) , 0 ) ;
289+ }
290+
291+ #[ test]
292+ fn test_resource_history_single_sample ( ) {
293+ let mut history = ResourceHistory :: new ( Duration :: from_secs ( 60 ) ) ;
294+ history. add_sample ( 75.5 , 85.3 ) ;
295+
296+ let ( avg_cpu, avg_memory) = history. get_average ( ) . unwrap ( ) ;
297+ assert_eq ! ( avg_cpu, 75.5 ) ;
298+ assert_eq ! ( avg_memory, 85.3 ) ;
299+ assert_eq ! ( history. sample_count( ) , 1 ) ;
300+ }
301+ }
0 commit comments