Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ pub async fn put_stream_hot_tier(

validator::hot_tier(&hottier.size.to_string())?;

stream.set_hot_tier(true);
stream.set_hot_tier(Some(hottier.clone()));
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
Expand All @@ -418,6 +418,7 @@ pub async fn put_stream_hot_tier(
.await?,
)?;
stream_metadata.hot_tier_enabled = true;
stream_metadata.hot_tier = Some(hottier.clone());

PARSEABLE
.metastore
Expand Down Expand Up @@ -480,6 +481,7 @@ pub async fn delete_stream_hot_tier(
.await?,
)?;
stream_metadata.hot_tier_enabled = false;
stream_metadata.hot_tier = None;

PARSEABLE
.metastore
Expand Down
50 changes: 50 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
cli::Options,
correlation::CORRELATIONS,
hottier::{HotTierManager, StreamHotTier},
metastore::metastore_traits::MetastoreObject,
oidc::Claims,
option::Mode,
Expand Down Expand Up @@ -594,6 +595,55 @@ pub type IndexerMetadata = NodeMetadata;
pub type QuerierMetadata = NodeMetadata;
pub type PrismMetadata = NodeMetadata;

/// Initialize hot tier metadata files for streams that have hot tier configuration
/// in their stream metadata but don't have local hot tier metadata files yet.
/// This function is called once during query server startup.
async fn initialize_hot_tier_metadata_on_startup(
hot_tier_manager: &HotTierManager,
) -> anyhow::Result<()> {
// Collect hot tier configurations from streams before doing async operations
let hot_tier_configs: Vec<(String, StreamHotTier)> = {
let streams_guard = PARSEABLE.streams.read().unwrap();
streams_guard
.iter()
.filter_map(|(stream_name, stream)| {
// Skip if hot tier metadata file already exists for this stream
if hot_tier_manager.check_stream_hot_tier_exists(stream_name) {
return None;
}

// Get the hot tier configuration from the in-memory stream metadata
stream
.get_hot_tier()
.map(|config| (stream_name.clone(), config))
})
.collect()
};

for (stream_name, hot_tier_config) in hot_tier_configs {
// Create the hot tier metadata file with the configuration from stream metadata
let mut hot_tier_metadata = hot_tier_config;
hot_tier_metadata.used_size = 0;
hot_tier_metadata.available_size = hot_tier_metadata.size;
hot_tier_metadata.oldest_date_time_entry = None;
if hot_tier_metadata.version.is_none() {
hot_tier_metadata.version = Some(crate::hottier::CURRENT_HOT_TIER_VERSION.to_string());
}

if let Err(e) = hot_tier_manager
.put_hot_tier(&stream_name, &mut hot_tier_metadata)
.await
{
warn!(
"Failed to initialize hot tier metadata for stream {}: {}",
stream_name, e
);
}
}

Ok(())
}

#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::thread;
use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream};
use crate::handlers::http::{base_path, prism_base_path, resource_check};
use crate::handlers::http::{rbac, role};
Expand Down Expand Up @@ -141,6 +142,11 @@ impl ParseableServer for QueryServer {
});
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.put_internal_stream_hot_tier().await?;
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await {
tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e);
}
hot_tier_manager.download_from_s3()?;
};

Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::demo_data::get_demo_data;
use crate::handlers::http::health_check;
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::query;
use crate::handlers::http::resource_check;
Expand Down Expand Up @@ -143,6 +144,11 @@ impl ParseableServer for Server {
});

if let Some(hot_tier_manager) = HotTierManager::global() {
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await {
tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e);
}
hot_tier_manager.download_from_s3()?;
};

Expand Down
2 changes: 1 addition & 1 deletion src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
pub const CURRENT_HOT_TIER_VERSION: &str = "v2";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct StreamHotTier {
pub version: Option<String>,
#[serde(with = "crate::utils::human_size")]
Expand Down
2 changes: 2 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use crate::catalog::snapshot::ManifestItem;
use crate::event::format::LogSourceEntry;
use crate::handlers::TelemetryType;
use crate::hottier::StreamHotTier;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub hot_tier: Option<StreamHotTier>,
pub stream_type: StreamType,
pub log_source: Vec<LogSourceEntry>,
pub telemetry_type: TelemetryType,
Expand Down
5 changes: 1 addition & 4 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore {
/// Delete an overview
async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> {
let path = RelativePathBuf::from_iter([stream, "overview"]);
Ok(self
.storage
.delete_object(&path)
.await?)
Ok(self.storage.delete_object(&path).await?)
}

/// This function fetches all the keystones from the underlying object store
Expand Down
2 changes: 2 additions & 0 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ async fn setup_logstream_metadata(
custom_partition,
static_schema_flag,
hot_tier_enabled,
hot_tier,
stream_type,
log_source,
telemetry_type,
Expand Down Expand Up @@ -402,6 +403,7 @@ async fn setup_logstream_metadata(
custom_partition,
static_schema_flag,
hot_tier_enabled,
hot_tier,
stream_type,
log_source,
telemetry_type,
Expand Down
20 changes: 18 additions & 2 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,13 @@ impl Parseable {
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition;
let static_schema_flag = stream_metadata.static_schema_flag;
let hot_tier_enabled = stream_metadata.hot_tier_enabled;
let hot_tier = stream_metadata.hot_tier.clone();
let stream_type = stream_metadata.stream_type;
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
let telemetry_type = stream_metadata.telemetry_type;
let metadata = LogStreamMetadata::new(
let mut metadata = LogStreamMetadata::new(
created_at,
time_partition,
time_partition_limit,
Expand All @@ -375,18 +377,32 @@ impl Parseable {
log_source,
telemetry_type,
);

// Set hot tier fields from the stored metadata
metadata.hot_tier_enabled = hot_tier_enabled;
metadata.hot_tier = hot_tier.clone();

let ingestor_id = INGESTOR_META
.get()
.map(|ingestor_metadata| ingestor_metadata.get_node_id());

// Gets write privileges only for creating the stream when it doesn't already exist.
self.streams.get_or_create(
let stream = self.streams.get_or_create(
self.options.clone(),
stream_name.to_owned(),
metadata,
ingestor_id,
);

// Set hot tier configuration in memory based on stored metadata
if let Some(hot_tier_config) = hot_tier {
stream.set_hot_tier(Some(hot_tier_config));
} else if hot_tier_enabled {
// Backward compatibility: if hot_tier_enabled is true but no hot_tier config exists,
// mark it as enabled in the stream metadata
stream.set_hot_tier(None);
}

//commit schema in memory
commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?;

Expand Down
15 changes: 13 additions & 2 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{
DEFAULT_TIMESTAMP_KEY,
format::{LogSource, LogSourceEntry},
},
hottier::StreamHotTier,
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
option::Mode,
Expand Down Expand Up @@ -921,8 +922,18 @@ impl Stream {
self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned();
}

pub fn set_hot_tier(&self, enable: bool) {
self.metadata.write().expect(LOCK_EXPECT).hot_tier_enabled = enable;
pub fn set_hot_tier(&self, hot_tier: Option<StreamHotTier>) {
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
metadata.hot_tier = hot_tier.clone();
metadata.hot_tier_enabled = hot_tier.is_some();
}

pub fn get_hot_tier(&self) -> Option<StreamHotTier> {
self.metadata.read().expect(LOCK_EXPECT).hot_tier.clone()
}

pub fn is_hot_tier_enabled(&self) -> bool {
self.metadata.read().expect(LOCK_EXPECT).hot_tier_enabled
}

pub fn get_stream_type(&self) -> StreamType {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
catalog::snapshot::Snapshot,
event::format::LogSourceEntry,
handlers::TelemetryType,
hottier::StreamHotTier,
metadata::SchemaVersion,
metastore::{MetastoreErrorDetail, metastore_traits::MetastoreObject},
option::StandaloneWithDistributed,
Expand Down Expand Up @@ -122,6 +123,8 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: bool,
#[serde(default)]
pub hot_tier_enabled: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hot_tier: Option<StreamHotTier>,
#[serde(default)]
pub stream_type: StreamType,
#[serde(default)]
Expand Down Expand Up @@ -242,6 +245,7 @@ impl Default for ObjectStoreFormat {
custom_partition: None,
static_schema_flag: false,
hot_tier_enabled: false,
hot_tier: None,
log_source: vec![LogSourceEntry::default()],
telemetry_type: TelemetryType::Logs,
}
Expand Down
Loading