An elastic, scalable, and stable telemetry pipeline for AI clusters with a custom message queue implementation.
The system consists of four main components:
- Custom Message Queue: High-performance, distributed message broker
- Telemetry Streamer: Reads CSV data and streams telemetry periodically
- Telemetry Collector: Consumes, parses, and persists telemetry data
- API Gateway: REST API with auto-generated OpenAPI specification
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β CSV Files βββββΆβ Streamers βββββΆβ Message Queue βββββΆβ Collectors β
β β β (scalable) β β (partitioned) β β (scalable) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β REST API ββββββ API Gateway ββββββ SQLite DB ββββββ Persistence β
β (OpenAPI) β β (scalable) β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
- Go 1.21+
- Docker
- Kubernetes cluster (optional)
- Helm 3+ (optional)
- Build all components:
make build- Start message queue:
make dev-messagequeue
# Or: ./bin/messagequeue --port=9090- Start collector (in another terminal):
make dev-collector
# Or: ./bin/collector --messagequeue-addr=localhost:9090- Start streamer (in another terminal):
make dev-streamer
# Or: ./bin/streamer --messagequeue-addr=localhost:9090- Start API gateway (in another terminal):
make dev-gateway
# Or: ./bin/gateway --database-path=data/telemetry.db- Access the API:
# View API documentation
open http://localhost:8080/swagger/index.html
# Query telemetry data
curl http://localhost:8080/api/v1/telemetry
# Get hosts
curl http://localhost:8080/api/v1/telemetry/hosts
# Get aggregations
curl "http://localhost:8080/api/v1/telemetry/aggregations?group_by=host_id&time_range=1h"- Kubernetes cluster (v1.19+)
- Helm 3.0+
- kubectl configured to access your cluster
- Docker registry (or use local registry)
# Build all Docker images
make docker
# Tag and push images to your registry (if using remote registry)
# Update imageRegistry in values.yaml accordingly
docker tag localhost:5000/messagequeue:latest your-registry.com/messagequeue:latest
docker tag localhost:5000/streamer:latest your-registry.com/streamer:latest
docker tag localhost:5000/collector:latest your-registry.com/collector:latest
docker tag localhost:5000/gateway:latest your-registry.com/gateway:latest
docker push your-registry.com/messagequeue:latest
docker push your-registry.com/streamer:latest
docker push your-registry.com/collector:latest
docker push your-registry.com/gateway:latestCreate a custom values-production.yaml file:
global:
imageRegistry: "your-registry.com" # Update with your registry
imagePullPolicy: Always
storageClass: "fast-ssd" # Update with your storage class
# Adjust resource limits for production
messagequeue:
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 500m
memory: 512Mi
collector:
persistence:
size: 50Gi # Increase storage for production
config:
batchSize: 100
workerCount: 4
gateway:
service:
type: LoadBalancer # or NodePort/ClusterIP as needed
ingress:
enabled: true
hosts:
- host: telemetry-api.yourdomain.com
paths:
- path: /
pathType: Prefix# Validate the Helm chart
helm lint ./helm/telemetry-pipeline
# Install with default values
helm install telemetry-pipeline ./helm/telemetry-pipeline
# Or install with custom values
helm install telemetry-pipeline ./helm/telemetry-pipeline -f values-production.yaml
# Install to specific namespace
kubectl create namespace telemetry
helm install telemetry-pipeline ./helm/telemetry-pipeline -n telemetry# Check deployment status
helm status telemetry-pipeline
# Check all pods are running
kubectl get pods -l app.kubernetes.io/name=telemetry-pipeline
# Check services
kubectl get services -l app.kubernetes.io/name=telemetry-pipeline
# Check persistent volumes (for collector)
kubectl get pvc -l app.kubernetes.io/name=telemetry-pipeline
# View logs
kubectl logs -l app.kubernetes.io/component=messagequeue -f
kubectl logs -l app.kubernetes.io/component=collector -f
kubectl logs -l app.kubernetes.io/component=streamer -f
kubectl logs -l app.kubernetes.io/component=gateway -f# Option 1: Port forward (for testing)
kubectl port-forward svc/telemetry-pipeline-gateway 8080:80
# Option 2: Use LoadBalancer external IP
kubectl get svc telemetry-pipeline-gateway
export GATEWAY_IP=$(kubectl get svc telemetry-pipeline-gateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl http://${GATEWAY_IP}/api/v1/telemetry
# Option 3: Use Ingress (if configured)
curl http://telemetry-api.yourdomain.com/api/v1/telemetry# Upgrade deployment
helm upgrade telemetry-pipeline ./helm/telemetry-pipeline -f values-production.yaml
# Rollback to previous version
helm rollback telemetry-pipeline 1
# Uninstall
helm uninstall telemetry-pipeline
# View Helm history
helm history telemetry-pipeline
# Get values
helm get values telemetry-pipeline- Technology: Custom gRPC-based implementation
- Features:
- Partitioned topics for horizontal scaling
- Consumer groups for load balancing
- In-memory storage with TTL-based cleanup
- Prometheus metrics integration
- Ports: 9090 (gRPC)
- Scaling: Single instance (can be extended for clustering)
- Technology: Go application reading CSV files
- Features:
- Configurable batch sizes and intervals
- Loop-based continuous streaming
- Graceful error handling and retries
- Prometheus metrics integration
- Scaling: 1-10 instances via HPA
- Configuration: CSV file path, streaming interval, batch size
- Technology: Go application with SQLite persistence
- Features:
- Multi-worker concurrent processing
- Transactional batch inserts
- Automatic database schema management
- Prometheus metrics integration
- Storage: SQLite database with persistent volumes
- Scaling: 1-10 instances via HPA
- Configuration: Worker count, batch size, polling interval
- Technology: Gin HTTP framework with Swagger
- Features:
- RESTful API with OpenAPI 3.0 specification
- Filtering, pagination, and aggregation
- CORS support for web applications
- Prometheus metrics integration
- Ports: 8080 (HTTP)
- Scaling: 1-5 instances via HPA
- Documentation: Available at
/swagger/index.html
PORT: gRPC server port (default: 9090)MAX_PARTITIONS: Number of partitions per topic (default: 4)MAX_MESSAGES_PER_PARTITION: Max messages per partition (default: 10000)MESSAGE_TTL: Message time-to-live (default: 24h)LOG_LEVEL: Logging level (default: info)
CSV_FILE: Path to CSV file (default: data/telemetry.csv)MESSAGEQUEUE_ADDR: Message queue address (default: localhost:9090)TOPIC: Topic name (default: telemetry)STREAM_INTERVAL: Streaming interval (default: 5s)BATCH_SIZE: Batch size (default: 10)LOOP_DATA: Loop through data (default: true)
MESSAGEQUEUE_ADDR: Message queue address (default: localhost:9090)TOPIC: Topic name (default: telemetry)CONSUMER_GROUP: Consumer group name (default: collectors)DATABASE_PATH: SQLite database path (default: data/telemetry.db)POLL_INTERVAL: Polling interval (default: 2s)BATCH_SIZE: Batch size (default: 50)WORKER_COUNT: Number of workers (default: 2)
PORT: HTTP server port (default: 8080)DATABASE_PATH: SQLite database path (default: data/telemetry.db)CORS_ENABLED: Enable CORS (default: true)
The system exposes comprehensive Prometheus metrics:
telemetry_pipeline_messagequeue_messages_published_totaltelemetry_pipeline_messagequeue_messages_consumed_totaltelemetry_pipeline_messagequeue_queue_sizetelemetry_pipeline_messagequeue_publish_duration_seconds
telemetry_pipeline_streamer_messages_sent_totaltelemetry_pipeline_streamer_errors_totaltelemetry_pipeline_streamer_batch_sizetelemetry_pipeline_streamer_last_processed_timestamp
telemetry_pipeline_collector_messages_processed_totaltelemetry_pipeline_collector_errors_totaltelemetry_pipeline_collector_database_size_bytestelemetry_pipeline_collector_active_workers
telemetry_pipeline_gateway_http_requests_totaltelemetry_pipeline_gateway_http_request_duration_secondstelemetry_pipeline_gateway_http_requests_in_flight
All components expose health check endpoints:
- Message Queue: TCP check on port 9090
- Collector: Process check + database file existence
- Gateway: HTTP
/healthendpoint - Streamer: Process check
All components use structured JSON logging with configurable levels:
debug: Detailed debugging informationinfo: General operational messages (default)warn: Warning conditionserror: Error conditions requiring attention
The system supports automatic scaling based on CPU and memory usage:
- Streamers: 1-10 replicas (70% CPU, 80% memory thresholds)
- Collectors: 1-10 replicas (70% CPU, 80% memory thresholds)
- Gateway: 1-5 replicas (70% CPU, 80% memory thresholds)
- Message Queue: Single instance (can be extended for clustering)
# Scale streamers
kubectl scale deployment telemetry-pipeline-streamer --replicas=5
# Scale collectors
kubectl scale deployment telemetry-pipeline-collector --replicas=3
# Scale gateway
kubectl scale deployment telemetry-pipeline-gateway --replicas=2# Install dependencies
go mod tidy
# Generate protobuf files
make proto
# Build all binaries
make build
# Run tests
make test
# Clean build artifacts
make clean# Build all Docker images
make docker
# Build specific component
make docker-messagequeue
make docker-streamer
make docker-collector
make docker-gateway- Update protobuf definition (
proto/telemetry.proto):
message TelemetryData {
// ... existing fields ...
double new_metric = 12;
}- Regenerate protobuf files:
make proto- Update CSV parser (
internal/streamer/streamer.go):
// Add parsing logic for new field
newMetric, err := strconv.ParseFloat(row[11], 64)- Update database schema (
internal/collector/collector.go):
ALTER TABLE telemetry ADD COLUMN new_metric REAL;Retrieve telemetry data with optional filters.
Query Parameters:
host_id: Filter by host IDgpu_id: Filter by GPU IDstart_time: Start time (RFC3339 format)end_time: End time (RFC3339 format)page: Page number (default: 1)page_size: Page size (default: 100, max: 1000)
Example:
curl "http://localhost:8080/api/v1/telemetry?host_id=host-01&page_size=50"Get list of all unique host IDs.
Get list of GPU IDs for a specific host.
Get aggregated telemetry statistics.
Query Parameters:
group_by: Group by field (host_idorgpu_id, default:host_id)time_range: Time range (1h,24h,7d, default:1h)
Get overall system statistics for the last 24 hours.
Health check endpoint returning system status.
# Check Helm release status
helm status telemetry-pipeline
# View Helm installation logs
helm install telemetry-pipeline ./helm/telemetry-pipeline --debug --dry-run
# Check for invalid template syntax
helm lint ./helm/telemetry-pipeline
# Validate values file
helm template telemetry-pipeline ./helm/telemetry-pipeline -f values-production.yaml# Check if images exist in registry
docker pull your-registry.com/messagequeue:latest
# Check image pull secrets (if using private registry)
kubectl get secrets
kubectl create secret docker-registry regcred \
--docker-server=your-registry.com \
--docker-username=<username> \
--docker-password=<password>
# Update values.yaml to use pull secrets
global:
imagePullSecrets:
- name: regcred# Check if message queue is running
kubectl get pods -l app.kubernetes.io/component=messagequeue
# Check service endpoints
kubectl get endpoints telemetry-pipeline-messagequeue
# Test connectivity from another pod
kubectl run debug --image=busybox -it --rm -- telnet telemetry-pipeline-messagequeue 9090
# Check logs
kubectl logs -l app.kubernetes.io/component=messagequeue# Check if PVC is mounted correctly
kubectl describe pvc telemetry-pipeline-collector-data
# Check storage class is available
kubectl get storageclass
# Check collector logs
kubectl logs -l app.kubernetes.io/component=collector
# Verify database file permissions
kubectl exec -it <collector-pod> -- ls -la /data/# Check if LoadBalancer is supported in your cluster
kubectl get svc telemetry-pipeline-gateway
# Use NodePort instead of LoadBalancer
gateway:
service:
type: NodePort
# Or use port-forward for testing
kubectl port-forward svc/telemetry-pipeline-gateway 8080:80# Check resource limits
kubectl describe pod <pod-name>
# Reduce batch sizes in configuration
# For streamer: reduce --batch-size
# For collector: reduce --batch-size or --worker-count# Verify database has data
kubectl exec -it <collector-pod> -- sqlite3 /data/telemetry.db "SELECT COUNT(*) FROM telemetry;"
# Check if gateway can access database
kubectl logs -l app.kubernetes.io/component=gatewayEnable debug logging for detailed troubleshooting:
# For local development
./bin/streamer --log-level=debug
# For Kubernetes (edit values.yaml)
streamer:
config:
logLevel: debugThis project is licensed under the MIT License - see the LICENSE file for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
For questions, issues, or support:
- Create an issue in this repository
- Contact the development team
- Check the troubleshooting section