Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 211 additions & 94 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

pub mod utils;

use futures::{future, stream, StreamExt};
use std::collections::HashSet;
use std::time::Duration;

Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::HTTP_CLIENT;
use super::base_path_without_preceding_slash;
use super::ingest::PostError;
use super::logstream::error::StreamError;
use super::modal::{IndexerMetadata, IngestorMetadata};
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata};
use super::rbac::RBACError;
use super::role::RoleError;

Expand Down Expand Up @@ -541,72 +542,118 @@ pub async fn send_retention_cleanup_request(
}

pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
})?;
// Get ingestor and indexer metadata concurrently
let (ingestor_result, indexer_result) =
future::join(get_ingestor_info(), get_indexer_info()).await;

let mut infos = vec![];
// Handle ingestor metadata result
let ingestor_metadata = ingestor_result
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})
.map_err(|err| StreamError::Anyhow(err.into()))?;

for ingestor in ingestor_infos {
let uri = Url::parse(&format!(
"{}{}/about",
ingestor.domain_name,
base_path_without_preceding_slash()
))
.expect("should always be a valid url");
// Handle indexer metadata result
let indexer_metadata = indexer_result
.map_err(|err| {
error!("Fatal: failed to get indexer info: {:?}", err);
PostError::Invalid(err)
})
.map_err(|err| StreamError::Anyhow(err.into()))?;

// Fetch info for both node types concurrently
let (ingestor_infos, indexer_infos) = future::join(
fetch_servers_info(ingestor_metadata),
fetch_servers_info(indexer_metadata),
)
.await;

// Combine results from both node types
let mut infos = Vec::new();
infos.extend(ingestor_infos?);
infos.extend(indexer_infos?);

let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, ingestor.token.clone())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;
Ok(actix_web::HttpResponse::Ok().json(infos))
}

let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
let status = Some(resp.status().to_string());
/// Fetches info for a single server (ingestor or indexer)
async fn fetch_server_info<T: Metadata>(server: &T) -> Result<utils::ClusterInfo, StreamError> {
let uri = Url::parse(&format!(
"{}{}/about",
server.domain_name(),
base_path_without_preceding_slash()
))
.expect("should always be a valid url");

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse ingestor info to bytes: {:?}", err);
StreamError::Network(err)
})?;
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, server.token().to_owned())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
error!("Fatal: failed to parse ingestor info: {:?}", err);
StreamError::SerdeError(err)
})?
.get("staging")
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
} else {
(
false,
"".to_owned(),
resp.as_ref().err().map(|e| e.to_string()),
resp.unwrap_err().status().map(|s| s.to_string()),
)
};

infos.push(utils::ClusterInfo::new(
&ingestor.domain_name,
reachable,
staging_path,
PARSEABLE.storage.get_endpoint(),
error,
status,
));
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
let status = Some(resp.status().to_string());

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse server info to bytes: {:?}", err);
StreamError::Network(err)
})?;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
error!("Fatal: failed to parse server info: {:?}", err);
StreamError::SerdeError(err)
})?
.get("staging")
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
} else {
(
false,
"".to_owned(),
resp.as_ref().err().map(|e| e.to_string()),
resp.unwrap_err().status().map(|s| s.to_string()),
)
};

Ok(utils::ClusterInfo::new(
server.domain_name(),
reachable,
staging_path,
PARSEABLE.storage.get_endpoint(),
error,
status,
))
}

/// Fetches info for multiple servers in parallel
async fn fetch_servers_info<T: Metadata>(
servers: Vec<T>,
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
let servers_len = servers.len();
let results = stream::iter(servers)
.map(|server| async move { fetch_server_info(&server).await })
.buffer_unordered(servers_len) // No concurrency limit
.collect::<Vec<_>>()
.await;

// Collect results, propagating any errors
let mut infos = Vec::with_capacity(results.len());
for result in results {
infos.push(result?);
}

Ok(actix_web::HttpResponse::Ok().json(infos))
Ok(infos)
}

pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
Expand Down Expand Up @@ -702,60 +749,130 @@ pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, P
Ok((msg, StatusCode::OK))
}

async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
/// Fetches metrics from a server (ingestor or indexer)
async fn fetch_server_metrics<T>(server: &T) -> Result<Option<Metrics>, PostError>
where
T: Metadata + Send + Sync + 'static,
{
// Format the metrics URL
let uri = Url::parse(&format!(
"{}{}/metrics",
server.domain_name(),
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in server metadata: {}", err))
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

// add a check to see if the ingestor is live
if !check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
// Check if the server is live
if !check_liveness(server.domain_name()).await {
warn!("Server {} is not live", server.domain_name());
return Ok(None);
}

let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;
// Fetch metrics
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, server.token())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
match res {
Ok(res) => {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)

let metrics = Metrics::from_prometheus_samples(sample, server)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor metrics: {:?}", err);
error!("Fatal: failed to get server metrics: {:?}", err);
PostError::Invalid(err.into())
})?;
dresses.push(ingestor_metrics);
} else {

Ok(Some(metrics))
}
Err(_) => {
warn!(
"Failed to fetch metrics from ingestor: {}\n",
&ingestor.domain_name,
"Failed to fetch metrics from server: {}\n",
server.domain_name()
);
Ok(None)
}
}
}

/// Fetches metrics from multiple servers in parallel
async fn fetch_servers_metrics<T>(servers: Vec<T>) -> Result<Vec<Metrics>, PostError>
where
T: Metadata + Send + Sync + 'static,
{
let servers_len = servers.len();
let results = stream::iter(servers)
.map(|server| async move { fetch_server_metrics(&server).await })
.buffer_unordered(servers_len) // No concurrency limit
.collect::<Vec<_>>()
.await;

// Process results
let mut metrics = Vec::new();
for result in results {
match result {
Ok(Some(server_metrics)) => metrics.push(server_metrics),
Ok(None) => {} // server was not live or metrics couldn't be fetched
Err(err) => return Err(err),
}
}
Ok(dresses)

Ok(metrics)
}

/// Main function to fetch all cluster metrics, parallelized and refactored
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
// Get ingestor and indexer metadata concurrently
let (ingestor_result, indexer_result) =
future::join(get_ingestor_info(), get_indexer_info()).await;

// Handle ingestor metadata result
let ingestor_metadata = ingestor_result.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})?;

// Handle indexer metadata result
let indexer_metadata = indexer_result.map_err(|err| {
error!("Fatal: failed to get indexer info: {:?}", err);
PostError::Invalid(err)
})?;

// Fetch metrics from ingestors and indexers concurrently
let (ingestor_metrics, indexer_metrics) = future::join(
fetch_servers_metrics(ingestor_metadata),
fetch_servers_metrics(indexer_metadata),
)
.await;

// Combine all metrics
let mut all_metrics = Vec::new();

// Add ingestor metrics
match ingestor_metrics {
Ok(metrics) => all_metrics.extend(metrics),
Err(err) => return Err(err),
}

// Add indexer metrics
match indexer_metrics {
Ok(metrics) => all_metrics.extend(metrics),
Err(err) => return Err(err),
}

Ok(all_metrics)
}

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
Expand Down
24 changes: 24 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,30 @@ impl IndexerMetadata {
}
}

pub trait Metadata {
fn domain_name(&self) -> &str;
fn token(&self) -> &str;
}

impl Metadata for IngestorMetadata {
fn domain_name(&self) -> &str {
&self.domain_name
}

fn token(&self) -> &str {
&self.token
}
}

impl Metadata for IndexerMetadata {
fn domain_name(&self) -> &str {
&self.domain_name
}

fn token(&self) -> &str {
&self.token
}
}
#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
Expand Down
Loading
Loading