
This project implements a modular streaming anomaly detection pipeline for HDFS logs using Kafka for streaming and MLflow for experiment tracking.
The system supports multiple anomaly detection backends (currently RandomForest and Autoencoder) and provides services for real-time inference and incremental retraining.
-
Modular pipeline – interchangeable detection models (RandomForest, Autoencoder, future extensions).
-
Dual-mode input:
- Raw logs → block-wise event distributions.
- Preprocessed features (CSV).
-
Aggregator service to convert raw logs into per-block event distributions.
-
Streaming inference service with threshold-based anomaly detection.
-
Retrainer service for incremental or batch updates.
-
Metrics tracking (ROC AUC, Precision, Recall, F1, Confusion Matrix).
-
Scalable Kafka-based architecture.
Topic | Partitions | Purpose |
---|---|---|
raw_logs |
50 | Raw HDFS logs (key = BlockId). |
aggregated_events |
1 | Aggregated event distributions per block. |
anomalies |
1 | Blocks flagged as anomalous. |
hdfs_logs |
1 | Preprocessed block features (optional for test streaming). |
- Reads raw
HDFS.log
line-by-line. - Extracts block IDs (
blk_*
). - Streams logs to
raw_logs
(Kafka). - Supports throttling to simulate real-time streaming.
-
Consumes from
raw_logs
. -
Buffers per block and maps logs to event IDs (E1–E29 + UNKNOWN).
-
Flushes based on:
- Min logs per block (normal flush).
- Timeout (delayed block closure).
-
Publishes event occurrence vectors to
aggregated_events
.
-
Consumes
aggregated_events
. -
Loads a chosen model:
- RandomForest (
random_forest_hdfs.pkl
) - Autoencoder (
autoencoder_hdfs.pth
)
- RandomForest (
-
Detects anomalies based on:
- RF: probability threshold (default = 0.75).
- AE: reconstruction error percentile (configurable).
-
Publishes anomalies to
anomalies
.
- Streams preprocessed CSV test data (
Event_occurrence_matrix_test.csv
). - Supports mini-batches and delayed streaming for evaluation.
-
Consumes
raw_logs
orhdfs_logs
. -
Buffers rows until
batch_size
is reached. -
Retrains models:
- RF: refits on combined dataset.
- AE: continues training with new batches.
-
Updates model checkpoint in
models/
.
Raw HDFS Logs
|
v
Raw Log Producer → Kafka raw_logs (50 partitions)
|
v
Aggregator
- Buffer logs per block
- Map → event IDs
- Flush (min logs / timeout)
|
v
Kafka aggregated_events
|
v
Streaming Inference
- RandomForest or Autoencoder
- Threshold anomaly detection
- Send to Kafka anomalies
|
v
Streaming Retrainer
- Batch updates
- Save new model in models/
RandomForest:
python utils/train.py
Autoencoder:
python utils/train_ae.py
docker-compose up -d
docker-compose logs -f inference
docker-compose logs -f aggregate-consumer
- RandomForest: Precision, Recall.
- Autoencoder: Reconstruction error distribution, ROC AUC, percentile thresholding.
- Default evaluation uses
sklearn.metrics
and reports confusion matrix + ROC curve.
-
Incremental models:
- Autoencoder can train online in mini-batches.
- RF retrains in batch; future: switch to
SGDClassifier
/ online ensembles.
-
Scalability:
- Raw logs → 50 partitions = horizontal scaling.
- Aggregated events = 1 partition (can scale later).
-
Monitoring & Visualization:
- Stream anomalies to Grafana or Plotly dashboards.
- Log MLflow metrics per model version.
-
Extensions:
- Add Transformer-based anomaly detection.
- Deploy inference as REST/gRPC microservice.
├── HDFS_v1/
│ └── HDFS.log
├── preprocessed/
│ ├── Event_occurrence_matrix_train.csv
│ └── Event_occurrence_matrix_test.csv
├── models/
│ ├── random_forest_hdfs.pkl
│ └── autoencoder_hdfs.pth
├── services/
│ ├── aggregate_consumer.py
│ ├── inference.py
│ ├── labelled_producer.py
│ ├── raw_producer.py
│ ├── retrainer_randomforest.py
│ └── retrainer.py
├── docker/
│ └── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── README.md