Skip to content
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
15 changes: 12 additions & 3 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@ use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !PARSEABLE.streams.contains(&stream_name) {
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
if !PARSEABLE.streams.contains(&stream_name)
&& PARSEABLE.options.mode == Mode::Query
&& matches!(
PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await,
Ok(true) | Err(_)
)
{
return Err(StreamNotFound(stream_name).into());
}

Expand Down Expand Up @@ -120,12 +129,12 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

// Ensure parseable is aware of stream in distributed mode
if PARSEABLE.options.mode == Mode::Query
&& PARSEABLE
&& !PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await?
{
Expand Down
164 changes: 81 additions & 83 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use std::{path::Path, sync::Arc};
use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use base64::Engine;
use base64::{prelude::BASE64_STANDARD, Engine};
use bytes::Bytes;
use openid::Discovered;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Map, Value};
use ssl_acceptor::get_ssl_acceptor;
use tokio::sync::oneshot;
use tracing::{error, info, warn};
Expand All @@ -35,8 +35,8 @@ use crate::{
cli::Options,
oidc::Claims,
parseable::PARSEABLE,
storage::PARSEABLE_ROOT_DIRECTORY,
utils::{get_ingestor_id, get_url},
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
utils::get_ingestor_id,
};

use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
Expand Down Expand Up @@ -213,85 +213,71 @@ impl IngestorMetadata {
}

/// Capture metadata information by either loading it from staging or starting fresh
pub fn load() -> Arc<Self> {
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
// all the files should be in the staging directory root
let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path)
let entries = options
.staging_dir()
.read_dir()
.expect("Couldn't read from file");
let url = get_url();
let url = options.get_url();
let port = url.port().unwrap_or(80).to_string();
let url = url.to_string();
let Options {
username, password, ..
} = PARSEABLE.options.as_ref();
let staging_path = PARSEABLE.staging_dir();
let flight_port = PARSEABLE.options.flight_port.to_string();
} = options;
let staging_path = options.staging_dir();
let flight_port = options.flight_port.to_string();

for entry in entries {
// cause the staging directory will have only one file with ingestor in the name
// so the JSON Parse should not error unless the file is corrupted
let path = entry.expect("Should be a directory entry").path();
let flag = path
if !path
.file_name()
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.contains("ingestor");

if flag {
// get the ingestor metadata from staging
let text = std::fs::read(path).expect("File should be present");
let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON");

// migrate the staging meta
let obj = meta
.as_object_mut()
.expect("Could Not parse Ingestor Metadata Json");

if obj.get("flight_port").is_none() {
obj.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
);
}

let mut meta: IngestorMetadata =
serde_json::from_value(meta).expect("Couldn't write to disk");

// compare url endpoint and port
if meta.domain_name != url {
info!(
"Domain Name was Updated. Old: {} New: {}",
meta.domain_name, url
);
meta.domain_name = url;
}

if meta.port != port {
info!("Port was Updated. Old: {} New: {}", meta.port, port);
meta.port = port;
}

let token =
base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));

let token = format!("Basic {}", token);

if meta.token != token {
// TODO: Update the message to be more informative with username and password
info!(
"Credentials were Updated. Old: {} New: {}",
meta.token, token
);
meta.token = token;
}

meta.put_on_disk(staging_path)
.expect("Couldn't write to disk");
return Arc::new(meta);
.and_then(|s| s.to_str())
.is_some_and(|s| s.contains("ingestor"))
{
continue;
}

// get the ingestor metadata from staging
let bytes = std::fs::read(path).expect("File should be present");
let mut meta =
Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata");

// compare url endpoint and port, update
if meta.domain_name != url {
info!(
"Domain Name was Updated. Old: {} New: {}",
meta.domain_name, url
);
meta.domain_name = url;
}

if meta.port != port {
info!("Port was Updated. Old: {} New: {}", meta.port, port);
meta.port = port;
}

let token = format!(
"Basic {}",
BASE64_STANDARD.encode(format!("{username}:{password}"))
);
if meta.token != token {
// TODO: Update the message to be more informative with username and password
warn!(
"Credentials were Updated. Tokens updated; Old: {} New: {}",
meta.token, token
);
meta.token = token;
}
meta.put_on_disk(staging_path)
.expect("Couldn't write to disk");

return Arc::new(meta);
}

let storage = PARSEABLE.storage.get_object_store();
let storage = storage.get_object_store();
let meta = Self::new(
port,
url,
Expand Down Expand Up @@ -319,6 +305,15 @@ impl IngestorMetadata {
])
}

/// Updates json with `flight_port` field if not already present
fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result<Self> {
let mut json: Map<String, Value> = serde_json::from_slice(bytes)?;
json.entry("flight_port")
.or_insert_with(|| Value::String(flight_port.to_string()));

Ok(serde_json::from_value(Value::Object(json))?)
}

pub async fn migrate(&self) -> anyhow::Result<Option<IngestorMetadata>> {
let imp = self.file_path();
let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await {
Expand All @@ -327,22 +322,11 @@ impl IngestorMetadata {
return Ok(None);
}
};
let mut json = serde_json::from_slice::<Value>(&bytes)?;
let meta = json
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?;
let fp = meta.get("flight_port");

if fp.is_none() {
meta.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
);
}
let bytes = Bytes::from(serde_json::to_vec(&json)?);

let resource: IngestorMetadata = serde_json::from_value(json)?;
resource.put_on_disk(PARSEABLE.staging_dir())?;
let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?;
let bytes = Bytes::from(serde_json::to_vec(&resource)?);

resource.put_on_disk(PARSEABLE.options.staging_dir())?;

PARSEABLE
.storage
Expand Down Expand Up @@ -394,6 +378,20 @@ mod test {
assert_eq!(rhs, lhs);
}

#[rstest]
fn test_from_bytes_adds_flight_port() {
let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#;
let meta = IngestorMetadata::from_bytes(json, 8002).unwrap();
assert_eq!(meta.flight_port, "8002");
}

#[rstest]
fn test_from_bytes_preserves_existing_flight_port() {
let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"9000"}"#;
let meta = IngestorMetadata::from_bytes(json, 8002).unwrap();
assert_eq!(meta.flight_port, "9000");
}

#[rstest]
fn test_serialize_resource() {
let im = IngestorMetadata::new(
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl QueryServer {
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.to(logstream::get_schema)
.authorize_for_stream(Action::GetSchema),
),
)
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl Server {
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
web::resource("/schema").route(
web::get()
.to(logstream::schema)
.to(logstream::get_schema)
.authorize_for_stream(Action::GetSchema),
),
)
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::handlers::http::base_path_without_preceding_slash;
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::IngestorMetadata;
use crate::utils::get_url;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -61,7 +61,7 @@ struct StorageMetrics {

impl Default for Metrics {
fn default() -> Self {
let url = get_url();
let url = PARSEABLE.options.get_url();
let address = format!(
"http://{}:{}",
url.domain()
Expand Down
Loading
Loading