Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum Error {
#[error("JSON provided to query api doesn't contain {0}")]
JsonQuery(&'static str),
#[error("Storage error: {0}")]
Storage(Box<dyn ObjectStorageError>),
Storage(ObjectStorageError),
#[error("Event error: {0}")]
Event(#[from] EventError),
#[error("Parquet error: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn liveness() -> HttpResponse {
}

pub async fn readiness() -> HttpResponse {
if S3::new().is_available().await {
if let Ok(()) = S3::new().check().await {
return HttpResponse::new(StatusCode::OK);
}

Expand Down
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn main() -> anyhow::Result<()> {
CONFIG.print();
CONFIG.validate();
let storage = S3::new();
CONFIG.validate_storage(&storage).await;
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
warn!("could not populate local metadata. {:?}", e);
}
Expand Down
53 changes: 26 additions & 27 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use bytes::Bytes;
use lazy_static::lazy_static;
use log::warn;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::RwLock;
Expand Down Expand Up @@ -126,17 +125,30 @@ impl STREAM_INFO {
// to load the stream metadata based on whatever is available.
//
// TODO: ignore failure(s) if any and skip to next stream
let alert_config = parse_string(storage.get_alert(&stream.name).await)
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?;
let schema = parse_string(storage.get_schema(&stream.name).await)
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;
let metadata = LogStreamMetadata {
schema,
alert_config,
..Default::default()
let alert_config = storage
.get_alert(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()));

let schema = storage
.get_schema(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()));

let metadata = match (alert_config, schema) {
(Ok(alert_config), Ok(schema)) => LogStreamMetadata {
schema,
alert_config,
..Default::default()
},
_ => continue,
};
let mut map = self.write().unwrap();
map.insert(stream.name.to_owned(), metadata);
map.insert(stream.name.clone(), metadata);
}

Ok(())
Expand All @@ -159,21 +171,8 @@ impl STREAM_INFO {
}
}

fn parse_string(result: Result<Bytes, Error>) -> Result<String, Error> {
let mut string = String::new();
let bytes = match result {
Ok(bytes) => bytes,
Err(e) => {
warn!("Storage error: {}", e);
return Ok(string);
}
};

if !bytes.is_empty() {
string = String::from_utf8(bytes.to_vec())?;
}

Ok(string)
fn parse_string(bytes: Bytes) -> Result<String, Error> {
String::from_utf8(bytes.to_vec()).map_err(|e| e.into())
}

#[cfg(test)]
Expand Down Expand Up @@ -214,14 +213,14 @@ mod tests {
#[case::empty_string("")]
fn test_parse_string(#[case] string: String) {
let bytes = Bytes::from(string);
assert!(parse_string(Ok(bytes)).is_ok())
assert!(parse_string(bytes).is_ok())
}

#[test]
fn test_bad_parse_string() {
let bad: Vec<u8> = vec![195, 40];
let bytes = Bytes::from(bad);
assert!(parse_string(Ok(bytes)).is_err());
assert!(parse_string(bytes).is_err());
}

#[rstest]
Expand Down
17 changes: 17 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use structopt::StructOpt;

use crate::banner;
use crate::s3::S3Config;
use crate::storage::{ObjectStorage, ObjectStorageError};

lazy_static::lazy_static! {
#[derive(Debug)]
Expand Down Expand Up @@ -72,6 +73,22 @@ impl Config {
}
}

pub async fn validate_storage(&self, storage: &impl ObjectStorage) {
match storage.check().await {
Ok(_) => (),
Err(ObjectStorageError::NoSuchBucket(name)) => panic!(
"Could not start because the bucket named {bucket} doesn't exist. Please make sure bucket with the name {bucket} exists on {url} and then start parseable again",
bucket = name,
url = self.storage.endpoint_url()
),
Err(ObjectStorageError::ConnectionError(inner)) => panic!(
"Failed to connect to the Object Storage Service\nCaused by: {}",
inner
),
Err(error) => { panic!("{error}") }
}
}

fn status_info(&self, scheme: &str) {
let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined();
eprintln!(
Expand Down
81 changes: 58 additions & 23 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind};
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::types::{ByteStream, SdkError};
use aws_sdk_s3::Error as AwsSdkError;
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
use aws_types::credentials::SharedCredentialsProvider;
Expand All @@ -18,7 +19,6 @@ use std::sync::Arc;
use structopt::StructOpt;
use tokio_stream::StreamExt;

use crate::error::Error;
use crate::metadata::Stats;
use crate::option::{StorageOpt, CONFIG};
use crate::query::Query;
Expand Down Expand Up @@ -96,14 +96,6 @@ impl StorageOpt for S3Config {
}
}

impl ObjectStorageError for AwsSdkError {}

impl From<AwsSdkError> for Error {
fn from(e: AwsSdkError) -> Self {
Self::Storage(Box::new(e))
}
}

struct S3Options {
endpoint: Endpoint,
region: Region,
Expand Down Expand Up @@ -304,70 +296,83 @@ impl S3 {

#[async_trait]
impl ObjectStorage for S3 {
async fn is_available(&self) -> bool {
async fn check(&self) -> Result<(), ObjectStorageError> {
self.client
.head_bucket()
.bucket(&S3_CONFIG.s3_bucket_name)
.send()
.await
.is_ok()
.map(|_| ())
.map_err(|err| err.into())
}

async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error> {
async fn put_schema(
&self,
stream_name: String,
body: String,
) -> Result<(), ObjectStorageError> {
self._put_schema(stream_name, body).await?;

Ok(())
}

async fn create_stream(&self, stream_name: &str) -> Result<(), Error> {
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
self._create_stream(stream_name).await?;

Ok(())
}

async fn delete_stream(&self, stream_name: &str) -> Result<(), Error> {
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
self._delete_stream(stream_name).await?;

Ok(())
}

async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error> {
async fn create_alert(
&self,
stream_name: &str,
body: String,
) -> Result<(), ObjectStorageError> {
self._create_alert(stream_name, body).await?;

Ok(())
}

async fn get_schema(&self, stream_name: &str) -> Result<Bytes, Error> {
async fn get_schema(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
let body_bytes = self._get_schema(stream_name).await?;

Ok(body_bytes)
}

async fn get_alert(&self, stream_name: &str) -> Result<Bytes, Error> {
async fn get_alert(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
let body_bytes = self._alert_exists(stream_name).await?;

Ok(body_bytes)
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, Error> {
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?;

Ok(stats)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, Error> {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
}

async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error> {
async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError> {
self._upload_file(key, path).await?;

Ok(())
}

async fn query(&self, query: &Query, results: &mut Vec<RecordBatch>) -> Result<(), Error> {
async fn query(
&self,
query: &Query,
results: &mut Vec<RecordBatch>,
) -> Result<(), ObjectStorageError> {
let s3_file_system = Arc::new(
S3FileSystem::new(
Some(SharedCredentialsProvider::new(self.options.creds.clone())),
Expand Down Expand Up @@ -397,9 +402,39 @@ impl ObjectStorage for S3 {

// execute the query and collect results
let df = ctx.sql(query.query.as_str()).await?;
results.extend(df.collect().await.map_err(Error::DataFusion)?);
results.extend(df.collect().await?);
}

Ok(())
}
}

impl From<AwsSdkError> for ObjectStorageError {
fn from(error: AwsSdkError) -> Self {
ObjectStorageError::UnhandledError(error.into())
}
}

impl From<SdkError<HeadBucketError>> for ObjectStorageError {
fn from(error: SdkError<HeadBucketError>) -> Self {
match error {
SdkError::ServiceError {
err:
HeadBucketError {
kind: HeadBucketErrorKind::NotFound(_),
..
},
..
} => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()),
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()),
SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err),
err => ObjectStorageError::UnhandledError(err.into()),
}
}
}

impl From<serde_json::Error> for ObjectStorageError {
fn from(error: serde_json::Error) -> Self {
ObjectStorageError::UnhandledError(error.into())
}
}
Loading