Skip to content

refactor: moving server serving from lib and using a trait for health check telemetry #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/apollo-mcp-server/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub enum ServerError {
StartupError(#[from] JoinError),

#[error("Failed to initialize MCP server")]
McpInitializeError(#[from] rmcp::service::ServerInitializeError<std::io::Error>),
McpInitializeError(#[from] Box<rmcp::service::ServerInitializeError<std::io::Error>>),

#[error(transparent)]
UrlParseError(ParseError),
Expand Down
27 changes: 12 additions & 15 deletions crates/apollo-mcp-server/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicBool, Ordering},
},
time::Duration,
};

use crate::telemetry::Telemetry;
use axum::http::StatusCode;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -112,15 +113,13 @@ pub struct HealthCheck {
config: HealthCheckConfig,
live: Arc<AtomicBool>,
ready: Arc<AtomicBool>,
rejected: Arc<AtomicUsize>,
ticker: Arc<tokio::task::JoinHandle<()>>,
}

impl HealthCheck {
pub fn new(config: HealthCheckConfig) -> Self {
pub fn new(config: HealthCheckConfig, telemetry: Arc<dyn Telemetry>) -> Self {
let live = Arc::new(AtomicBool::new(true)); // Start as live
let ready = Arc::new(AtomicBool::new(true)); // Start as ready
let rejected = Arc::new(AtomicUsize::new(0));
let ready = Arc::new(AtomicBool::new(true)); // Start as ready;

let allowed = config.readiness.allowed;
let sampling_interval = config.readiness.interval.sampling;
Expand All @@ -130,20 +129,20 @@ impl HealthCheck {
.unready
.unwrap_or(2 * sampling_interval);

let my_rejected = rejected.clone();
let my_ready = ready.clone();
let telemetry_clone = Arc::clone(&telemetry);

let ticker = tokio::spawn(async move {
loop {
let start = Instant::now() + sampling_interval;
let mut interval = tokio::time::interval_at(start, sampling_interval);
loop {
interval.tick().await;
if my_rejected.load(Ordering::Relaxed) > allowed {
if telemetry_clone.errors() > allowed {
debug!("Health check readiness threshold exceeded, marking as unready");
my_ready.store(false, Ordering::SeqCst);
tokio::time::sleep(recovery_interval).await;
my_rejected.store(0, Ordering::Relaxed);
telemetry_clone.set_error_count(0);
my_ready.store(true, Ordering::SeqCst);
debug!("Health check readiness restored");
break;
Expand All @@ -156,15 +155,10 @@ impl HealthCheck {
config,
live,
ready,
rejected,
ticker: Arc::new(ticker),
}
}

pub fn record_rejection(&self) {
self.rejected.fetch_add(1, Ordering::Relaxed);
}

pub fn config(&self) -> &HealthCheckConfig {
&self.config
}
Expand Down Expand Up @@ -215,6 +209,7 @@ impl Drop for HealthCheck {
#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::InMemoryTelemetry;
use tokio::time::{Duration, sleep};

#[test]
Expand All @@ -234,15 +229,17 @@ mod tests {
config.readiness.interval.sampling = Duration::from_millis(50);
config.readiness.interval.unready = Some(Duration::from_millis(100));

let health_check = HealthCheck::new(config);
let mock_telemetry: Arc<dyn Telemetry> = Arc::new(InMemoryTelemetry::new());

let health_check = HealthCheck::new(config, Arc::clone(&mock_telemetry));

// Should be live and ready initially
assert!(health_check.live.load(Ordering::SeqCst));
assert!(health_check.ready.load(Ordering::SeqCst));

// Record rejections beyond threshold
for _ in 0..5 {
health_check.record_rejection();
mock_telemetry.record_error();
}

// Wait for the ticker to process
Expand Down
4 changes: 4 additions & 0 deletions crates/apollo-mcp-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ pub mod operations;
pub mod sanitize;
pub(crate) mod schema_tree_shake;
pub mod server;
pub mod server_config;
pub mod telemetry;

pub mod server_handler;
45 changes: 36 additions & 9 deletions crates/apollo-mcp-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use std::path::PathBuf;

use crate::runtime::Serve;
use apollo_mcp_registry::platform_api::operation_collections::collection_poller::CollectionSource;
use apollo_mcp_registry::uplink::persisted_queries::ManifestSource;
use apollo_mcp_registry::uplink::schema::SchemaSource;
use apollo_mcp_server::custom_scalar_map::CustomScalarMap;
use apollo_mcp_server::errors::ServerError;
use apollo_mcp_server::operations::OperationSource;
use apollo_mcp_server::server::Server;
use apollo_mcp_server::server_config::ServerConfig;
use apollo_mcp_server::server_handler::ApolloMcpServerHandler;
use apollo_mcp_server::telemetry::{InMemoryTelemetry, Telemetry};
use clap::Parser;
use clap::builder::Styles;
use clap::builder::styling::{AnsiColor, Effects};
use runtime::IdOrDefault;
use runtime::logging::Logging;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

mod runtime;
Expand Down Expand Up @@ -109,11 +115,15 @@ async fn main() -> anyhow::Result<()> {
.then(|| config.graphos.graph_ref())
.transpose()?;

Ok(Server::builder()
.transport(config.transport)
.schema_source(schema_source)
.operation_source(operation_source)
.endpoint(config.endpoint.into_inner())
let telemetry: Option<Arc<dyn Telemetry>> = config
.health_check
.enabled
.then(|| Arc::new(InMemoryTelemetry::new()) as Arc<dyn Telemetry>);
let server_handler =
ApolloMcpServerHandler::new(config.headers.clone(), config.endpoint.clone(), telemetry);
let cancellation_token = CancellationToken::new();

let server_config = ServerConfig::builder()
.maybe_explorer_graph_ref(explorer_graph_ref)
.headers(config.headers)
.execute_introspection(config.introspection.execute.enabled)
Expand All @@ -133,8 +143,25 @@ async fn main() -> anyhow::Result<()> {
)
.search_leaf_depth(config.introspection.search.leaf_depth)
.index_memory_bytes(config.introspection.search.index_memory_bytes)
.health_check(config.health_check)
.build();

Server::builder()
.schema_source(schema_source)
.operation_source(operation_source)
.server_handler(Arc::new(RwLock::new(server_handler.clone())))
.cancellation_token(cancellation_token.child_token())
.server_config(server_config)
.build()
.start()
.await?)
.await?;

Serve::serve(
server_handler,
config.transport,
cancellation_token,
config.health_check,
)
.await?;

Ok(())
}
2 changes: 2 additions & 0 deletions crates/apollo-mcp-server/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod operation_source;
mod overrides;
mod schema_source;
mod schemas;
mod serve;

use std::path::Path;

Expand All @@ -22,6 +23,7 @@ use figment::{
};
pub use operation_source::{IdOrDefault, OperationSource};
pub use schema_source::SchemaSource;
pub use serve::Serve;

/// Separator to use when drilling down into nested options in the env figment
const ENV_NESTED_SEPARATOR: &str = "__";
Expand Down
7 changes: 0 additions & 7 deletions crates/apollo-mcp-server/src/runtime/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ use url::Url;
#[derive(Debug)]
pub struct Endpoint(Url);

impl Endpoint {
/// Unwrap the endpoint into its inner URL
pub fn into_inner(self) -> Url {
self.0
}
}

impl Default for Endpoint {
fn default() -> Self {
Self(defaults::endpoint())
Expand Down
Loading