Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ use crate::{
mod staging;
mod streams;

/// File extension for arrow files in staging
const ARROW_FILE_EXTENSION: &str = "arrows";

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;
Expand Down
20 changes: 14 additions & 6 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
use arrow_schema::Schema;
use byteorder::{LittleEndian, ReadBytesExt};
use itertools::kmerge_by;
use tracing::error;
use tracing::{error, warn};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -82,14 +82,22 @@ pub struct MergedReverseRecordReader {
}

impl MergedReverseRecordReader {
pub fn try_new(files: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(files.len());
for file in files {
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
error!("Invalid file detected, ignoring it: {:?}", file);
pub fn try_new(file_paths: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(file_paths.len());
for path in file_paths {
let Ok(file) = File::open(path) else {
warn!("Error when trying to read file: {path:?}");
continue;
};

let reader = match get_reverse_reader(file) {
Ok(r) => r,
Err(err) => {
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
continue;
}
};

readers.push(reader);
}

Expand Down
35 changes: 21 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ use super::{
writer::Writer,
StagingError,
},
LogStream,
LogStream, ARROW_FILE_EXTENSION,
};

#[derive(Debug, thiserror::Error)]
#[error("Stream not found: {0}")]
pub struct StreamNotFound(pub String);

const ARROW_FILE_EXTENSION: &str = "data.arrows";

pub type StreamRef = Arc<Stream>;

/// Gets the unix timestamp for the minute as described by the `SystemTime`
Expand Down Expand Up @@ -165,7 +163,7 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -495,10 +493,12 @@ impl Stream {
}
writer.close()?;

if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
if part_file.metadata().expect("File was just created").len()
< parquet::file::FOOTER_SIZE as u64
{
error!(
"Invalid parquet file {:?} detected for stream {}, removing it",
&part_path, &self.stream_name
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
&self.stream_name
);
remove_file(part_path).unwrap();
} else {
Expand All @@ -510,15 +510,22 @@ impl Stream {
}

for file in arrow_files {
// warn!("file-\n{file:?}\n");
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if remove_file(file.clone()).is_err() {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
continue;
}
};
if remove_file(&file).is_err() {
error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", &self.stream_name, file_type])
.with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION])
.sub(file_size as i64);
}
}
Expand Down Expand Up @@ -883,7 +890,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -917,7 +924,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down
14 changes: 5 additions & 9 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
) -> Result<Option<Manifest>, ObjectStorageError> {
let path = manifest_path(path.as_str());
match self.get_object(&path).await {
Ok(bytes) => Ok(Some(
serde_json::from_slice(&bytes).expect("manifest is valid json"),
)),
Err(err) => {
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
Ok(None)
} else {
Err(err)
}
Ok(bytes) => {
let manifest = serde_json::from_slice(&bytes)?;
Ok(Some(manifest))
}
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
Err(err) => Err(err),
}
}

Expand Down
Loading