Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
target
data
staging
limitcache
examples
cert.pem
key.pem
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }

[build-dependencies]
cargo_toml = "0.15"
Expand Down
189 changes: 189 additions & 0 deletions server/src/localcache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{io, path::PathBuf, sync::Arc};

use fs_extra::file::CopyOptions;
use futures_util::TryFutureExt;
use hashlru::Cache;
use itertools::{Either, Itertools};
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use tokio::{fs, sync::Mutex};

use crate::option::CONFIG;

pub const STREAM_CACHE_FILENAME: &str = ".cache.json";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LocalCache {
version: String,
current_size: u64,
capacity: u64,
files: Cache<String, PathBuf>,
}

impl LocalCache {
fn new_with_size(capacity: u64) -> Self {
Self {
version: "v1".to_string(),
current_size: 0,
capacity,
files: Cache::new(100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 100 only here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backing hashlru is capacity based. The capacity is arbitrary and is increased when needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who increases? user or the backing cache?

}
}

fn can_push(&self, size: u64) -> bool {
self.capacity >= self.current_size + size
}
}

pub struct LocalCacheManager {
object_store: Arc<dyn ObjectStore>,
cache_path: PathBuf,
cache_capacity: u64,
copy_options: CopyOptions,
semaphore: Mutex<()>,
}

impl LocalCacheManager {
pub fn global() -> Option<&'static LocalCacheManager> {
static INSTANCE: OnceCell<LocalCacheManager> = OnceCell::new();

let Some(cache_path) = &CONFIG.parseable.local_cache_path else {
return None;
};

Some(INSTANCE.get_or_init(|| {
let cache_path = cache_path.clone();
std::fs::create_dir_all(&cache_path).unwrap();
let object_store = CONFIG.storage().get_store();
LocalCacheManager {
object_store,
cache_path,
cache_capacity: CONFIG.parseable.local_cache_size,
copy_options: CopyOptions {
overwrite: true,
skip_exist: false,
..CopyOptions::new()
},
semaphore: Mutex::new(()),
}
}))
}

pub async fn get_cache(&self, stream: &str) -> Result<LocalCache, CacheError> {
let path = cache_file_path(&self.cache_path, stream).unwrap();
let res = self
.object_store
.get(&path)
.and_then(|resp| resp.bytes())
.await;
let cache = match res {
Ok(bytes) => serde_json::from_slice(&bytes)?,
Err(object_store::Error::NotFound { .. }) => {
LocalCache::new_with_size(self.cache_capacity)
}
Err(err) => return Err(err.into()),
};
Ok(cache)
}

pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> {
let path = cache_file_path(&self.cache_path, stream).unwrap();
let bytes = serde_json::to_vec(cache)?.into();
Ok(self.object_store.put(&path, bytes).await?)
}

pub async fn move_to_cache(
&self,
stream: &str,
key: String,
staging_path: PathBuf,
) -> Result<(), CacheError> {
let lock = self.semaphore.lock().await;
let mut cache_path = self.cache_path.join(stream);
fs::create_dir_all(&cache_path).await?;
cache_path.push(staging_path.file_name().unwrap());
fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?;
let file_size = std::fs::metadata(&cache_path)?.len();
let mut cache = self.get_cache(stream).await?;

while !cache.can_push(file_size) {
if let Some((_, file_for_removal)) = cache.files.pop_lru() {
let lru_file_size = std::fs::metadata(&file_for_removal)?.len();
cache.current_size = cache.current_size.saturating_sub(lru_file_size);
log::info!("removing cache entry");
tokio::spawn(fs::remove_file(file_for_removal));
} else {
log::error!("Cache size too small");
break;
}
}

if cache.files.is_full() {
cache.files.resize(cache.files.capacity() * 2);
}
cache.files.push(key, cache_path);
cache.current_size += file_size;
self.put_cache(stream, &cache).await?;
drop(lock);
Ok(())
}

pub async fn partition_on_cached<T>(
&self,
stream: &str,
collection: Vec<T>,
key: fn(&T) -> &String,
) -> Result<(Vec<(T, PathBuf)>, Vec<T>), CacheError> {
let lock = self.semaphore.lock().await;
let mut cache = self.get_cache(stream).await?;
let (cached, remainder): (Vec<_>, Vec<_>) = collection.into_iter().partition_map(|item| {
let key = key(&item);
match cache.files.get(key).cloned() {
Some(path) => Either::Left((item, path)),
None => Either::Right(item),
}
});
self.put_cache(stream, &cache).await?;
drop(lock);
Ok((cached, remainder))
}
}

fn cache_file_path(
root: impl AsRef<std::path::Path>,
stream: &str,
) -> Result<object_store::path::Path, object_store::path::Error> {
let mut path = root.as_ref().join(stream);
path.set_file_name(STREAM_CACHE_FILENAME);
object_store::path::Path::from_absolute_path(path)
}

#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("{0}")]
Serde(#[from] serde_json::Error),
#[error("{0}")]
IOError(#[from] io::Error),
#[error("{0}")]
MoveError(#[from] fs_extra::error::Error),
#[error("{0}")]
ObjectStoreError(#[from] object_store::Error),
}
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod catalog;
mod event;
mod handlers;
mod livetail;
mod localcache;
mod metadata;
mod metrics;
mod migration;
Expand Down
50 changes: 50 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ pub struct Server {
/// for incoming events and local cache
pub local_staging_path: PathBuf,

/// The local cache path is used for speeding up query on latest data
pub local_cache_path: Option<PathBuf>,

/// Size for local cache
pub local_cache_size: u64,

/// Interval in seconds after which uncommited data would be
/// uploaded to the storage platform.
pub upload_interval: u64,
Expand Down Expand Up @@ -220,6 +226,7 @@ impl FromArgMatches for Server {
}

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
Expand All @@ -235,6 +242,10 @@ impl FromArgMatches for Server {
.get_one::<PathBuf>(Self::STAGING)
.cloned()
.expect("default value for staging");
self.local_cache_size = m
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.upload_interval = m
.get_one::<u64>(Self::UPLOAD_INTERVAL)
.cloned()
Expand Down Expand Up @@ -319,6 +330,8 @@ impl Server {
pub const ADDRESS: &'static str = "address";
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const UPLOAD_INTERVAL: &'static str = "upload-interval";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
Expand Down Expand Up @@ -384,6 +397,25 @@ impl Server {
.value_parser(validation::canonicalize_path)
.help("The local staging path is used as a temporary landing point for incoming events and local cache")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE)
.long(Self::CACHE)
.env("P_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path to be used for caching latest files")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE_SIZE)
.long(Self::CACHE_SIZE)
.env("P_CACHE_SIZE")
.value_name("size")
.default_value("1Gib")
.value_parser(validation::human_size_to_bytes)
.help("Size for cache in human readable format (e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::UPLOAD_INTERVAL)
Expand Down Expand Up @@ -569,8 +601,11 @@ pub mod validation {
fs::{canonicalize, create_dir_all},
net::ToSocketAddrs,
path::PathBuf,
str::FromStr,
};

use human_size::SpecificSize;

pub fn file_path(s: &str) -> Result<PathBuf, String> {
if s.is_empty() {
return Err("empty path".to_owned());
Expand Down Expand Up @@ -606,4 +641,19 @@ pub mod validation {
pub fn url(s: &str) -> Result<url::Url, String> {
url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string())
}

pub fn human_size_to_bytes(s: &str) -> Result<u64, String> {
use human_size::multiples;
fn parse_and_map<T: human_size::Multiple>(
s: &str,
) -> Result<u64, human_size::ParsingError> {
SpecificSize::<T>::from_str(s).map(|x| x.to_bytes())
}

parse_and_map::<multiples::Mebibyte>(s)
.or(parse_and_map::<multiples::Megabyte>(s))
.or(parse_and_map::<multiples::Gigibyte>(s))
.or(parse_and_map::<multiples::Gigabyte>(s))
.map_err(|_| "Could not parse given size".to_string())
}
}
1 change: 0 additions & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
mod filter_optimizer;
mod listing_table_builder;
mod stream_schema_provider;
mod table_provider;

use chrono::{DateTime, Utc};
use chrono::{NaiveDateTime, TimeZone};
Expand Down
Loading