Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ pub async fn remove_manifest_from_snapshot(
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index => unimplemented!(),
}
}

Expand All @@ -350,6 +351,7 @@ pub async fn get_first_event(
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::Index => unimplemented!(),
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
Expand Down
1 change: 1 addition & 0 deletions src/enterprise/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod utils;
160 changes: 160 additions & 0 deletions src/enterprise/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use datafusion::{common::Column, prelude::Expr};
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::query::stream_schema_provider::extract_primary_filter;
use crate::{
catalog::{
manifest::{File, Manifest},
snapshot, Snapshot,
},
event,
parseable::PARSEABLE,
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
utils::time::TimeRange,
};

pub fn create_time_filter(
time_range: &TimeRange,
time_partition: Option<String>,
table_name: &str,
) -> Vec<Expr> {
let mut new_filters = vec![];
let start_time = time_range.start.naive_utc();
let end_time = time_range.end.naive_utc();
let mut _start_time_filter: Expr;
let mut _end_time_filter: Expr;

match time_partition {
Some(time_partition) => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
);
}
None => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
}

new_filters.push(_start_time_filter);
new_filters.push(_end_time_filter);

new_filters
}

pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();

let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();

let time_partition = object_store_format.time_partition;

let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);

let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);

let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();

let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}

let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await
.unwrap();

let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();

let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();

for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}

selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);
Comment on lines +125 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check for potential out-of-bounds slice access.

Splitting on "/" and calling date.as_slice()[1..4] can panic if the path has fewer than four segments. Handle or verify the path structure before slicing to prevent a runtime panic.

Possible fix:

- let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
+ if date.len() < 4 {
+     // Decide whether to skip this file or handle the error
+     return Err(
+         ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
+     );
+ }
+ let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
if date.len() < 4 {
// Decide whether to skip this file or handle the error
return Err(
ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
);
}
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);


parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});

Ok(parquet_files)
}

async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await.unwrap()
}));
});

let mut op = Vec::new();
for task in tasks {
let file = task.await.unwrap();
op.push(file);
}

Ok(op
.into_iter()
.map(|res| serde_json::from_slice(&res).unwrap())
.collect())
}
19 changes: 19 additions & 0 deletions src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,25 @@ where
Ok(res)
})
}

Mode::Index => {
let accessable_endpoints = ["create", "delete"];
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
if !cond {
Box::pin(async {
Err(actix_web::error::ErrorUnauthorized(
"Only Index API can be accessed in Index Mode",
))
})
} else {
let fut = self.service.call(req);

Box::pin(async move {
let res = fut.await?;
Ok(res)
})
}
}
}
}
}
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ pub mod alerts;
pub mod analytics;
pub mod audit;
pub mod banner;
mod catalog;
pub mod catalog;
mod cli;
#[cfg(feature = "kafka")]
pub mod connectors;
pub mod correlation;
pub mod enterprise;
mod event;
pub mod handlers;
pub mod hottier;
Expand All @@ -37,15 +38,15 @@ mod oidc;
pub mod option;
pub mod otel;
pub mod parseable;
mod query;
pub mod query;
pub mod rbac;
mod response;
mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod users;
mod utils;
pub mod utils;
mod validator;

use std::time::Duration;
Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::process::exit;

/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
Expand Down Expand Up @@ -37,6 +39,10 @@ async fn main() -> anyhow::Result<()> {
let server: Box<dyn ParseableServer> = match &PARSEABLE.options.mode {
Mode::Query => Box::new(QueryServer),
Mode::Ingest => Box::new(IngestServer),
Mode::Index => {
println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
exit(0)
}
Mode::All => Box::new(Server),
};

Expand Down
2 changes: 2 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
pub enum Mode {
Query,
Ingest,
Index,
#[default]
All,
}
Expand Down Expand Up @@ -128,6 +129,7 @@ pub mod validation {
"query" => Ok(Mode::Query),
"ingest" => Ok(Mode::Ingest),
"all" => Ok(Mode::All),
"index" => Ok(Mode::Index),
_ => Err("Invalid MODE provided".to_string()),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl Parseable {
match self.options.mode {
Mode::Query => "Distributed (Query)",
Mode::Ingest => "Distributed (Ingest)",
Mode::Index => "Distributed (Index)",
Mode::All => "Standalone",
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,7 @@ impl Stream {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
warn!("File ({}) not found; Error = {err}", file.display());
continue;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ pub fn extract_primary_filter(
.collect()
}

trait ManifestExt: ManifestFile {
pub trait ManifestExt: ManifestFile {
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> {
let name = match partial_filter {
Expr::BinaryExpr(binary_expr) => {
Expand Down
23 changes: 22 additions & 1 deletion src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use datafusion::{
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use object_store::{
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
buffered::BufReader,
limit::LimitStore,
path::Path as StorePath,
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
};
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};
Expand Down Expand Up @@ -423,6 +424,26 @@ impl BlobStore {

#[async_trait]
impl ObjectStorage for BlobStore {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Buffered reader not implemented for Blob Storage yet",
),
)))
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Head operation not implemented for Blob Storage yet",
),
)))
}

async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
Ok(self._get_object(path).await?)
}
Expand Down
20 changes: 20 additions & 0 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use bytes::Bytes;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use fs_extra::file::CopyOptions;
use futures::{stream::FuturesUnordered, TryStreamExt};
use object_store::{buffered::BufReader, ObjectMeta};
use relative_path::{RelativePath, RelativePathBuf};
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -103,6 +104,25 @@ impl LocalFS {

#[async_trait]
impl ObjectStorage for LocalFS {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Buffered reader not implemented for LocalFS yet",
),
)))
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Head operation not implemented for LocalFS yet",
),
)))
}
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
let time = Instant::now();
let file_path = self.path_in_root(path);
Expand Down
Loading
Loading