|
17 | 17 | */ |
18 | 18 |
|
19 | 19 | use self::error::StreamError; |
20 | | -use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; |
| 20 | +use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; |
21 | 21 | use super::query::update_schema_when_distributed; |
22 | 22 | use crate::event::format::override_data_type; |
23 | 23 | use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; |
@@ -257,64 +257,26 @@ pub async fn get_stats( |
257 | 257 | let stats = stats::get_current_stats(&stream_name, "json") |
258 | 258 | .ok_or_else(|| StreamNotFound(stream_name.clone()))?; |
259 | 259 |
|
260 | | - let ingestor_stats: Option<Vec<QueriedStats>> = None; |
261 | | - |
262 | | - let hash_map = PARSEABLE.streams.read().expect("Readable"); |
263 | | - let stream_meta = &hash_map |
264 | | - .get(&stream_name) |
265 | | - .ok_or_else(|| StreamNotFound(stream_name.clone()))? |
266 | | - .metadata |
267 | | - .read() |
268 | | - .expect(LOCK_EXPECT); |
269 | | - |
270 | 260 | let time = Utc::now(); |
271 | 261 |
|
272 | | - let stats = match &stream_meta.first_event_at { |
273 | | - Some(_) => { |
274 | | - let ingestion_stats = IngestionStats::new( |
275 | | - stats.current_stats.events, |
276 | | - format!("{} {}", stats.current_stats.ingestion, "Bytes"), |
277 | | - stats.lifetime_stats.events, |
278 | | - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), |
279 | | - stats.deleted_stats.events, |
280 | | - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), |
281 | | - "json", |
282 | | - ); |
283 | | - let storage_stats = StorageStats::new( |
284 | | - format!("{} {}", stats.current_stats.storage, "Bytes"), |
285 | | - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), |
286 | | - format!("{} {}", stats.deleted_stats.storage, "Bytes"), |
287 | | - "parquet", |
288 | | - ); |
289 | | - |
290 | | - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) |
291 | | - } |
292 | | - |
293 | | - None => { |
294 | | - let ingestion_stats = IngestionStats::new( |
295 | | - stats.current_stats.events, |
296 | | - format!("{} {}", stats.current_stats.ingestion, "Bytes"), |
297 | | - stats.lifetime_stats.events, |
298 | | - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), |
299 | | - stats.deleted_stats.events, |
300 | | - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), |
301 | | - "json", |
302 | | - ); |
303 | | - let storage_stats = StorageStats::new( |
304 | | - format!("{} {}", stats.current_stats.storage, "Bytes"), |
305 | | - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), |
306 | | - format!("{} {}", stats.deleted_stats.storage, "Bytes"), |
307 | | - "parquet", |
308 | | - ); |
309 | | - |
310 | | - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) |
311 | | - } |
312 | | - }; |
313 | | - let stats = if let Some(mut ingestor_stats) = ingestor_stats { |
314 | | - ingestor_stats.push(stats); |
315 | | - merge_quried_stats(ingestor_stats) |
316 | | - } else { |
317 | | - stats |
| 262 | + let stats = { |
| 263 | + let ingestion_stats = IngestionStats::new( |
| 264 | + stats.current_stats.events, |
| 265 | + format!("{} Bytes", stats.current_stats.ingestion), |
| 266 | + stats.lifetime_stats.events, |
| 267 | + format!("{} Bytes", stats.lifetime_stats.ingestion), |
| 268 | + stats.deleted_stats.events, |
| 269 | + format!("{} Bytes", stats.deleted_stats.ingestion), |
| 270 | + "json", |
| 271 | + ); |
| 272 | + let storage_stats = StorageStats::new( |
| 273 | + format!("{} Bytes", stats.current_stats.storage), |
| 274 | + format!("{} Bytes", stats.lifetime_stats.storage), |
| 275 | + format!("{} Bytes", stats.deleted_stats.storage), |
| 276 | + "parquet", |
| 277 | + ); |
| 278 | + |
| 279 | + QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) |
318 | 280 | }; |
319 | 281 |
|
320 | 282 | let stats = serde_json::to_value(stats)?; |
|
0 commit comments