Skip to content

Commit acbb892

Browse files
author
Devdutt Shenoi
committed
fix: handle missing staging directory
1 parent 730d7dd commit acbb892

File tree

8 files changed

+29
-20
lines changed

8 files changed

+29
-20
lines changed

src/banner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
104104
Staging Path: \"{}\"",
105105
"Storage:".to_string().bold(),
106106
config.get_storage_mode_string(),
107-
config.staging_dir().to_string_lossy(),
107+
config.options.staging_dir().to_string_lossy(),
108108
);
109109

110110
if let Some(path) = &config.options.hot_tier_storage_path {

src/cli.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use clap::Parser;
20-
use std::{env, path::PathBuf};
20+
use std::{env, fs, path::PathBuf};
2121

2222
use url::Url;
2323

@@ -386,6 +386,14 @@ impl Options {
386386
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
387387
}
388388

389+
/// Path to staging directory, ensures that it exists or panics
390+
pub fn staging_dir(&self) -> &PathBuf {
391+
fs::create_dir_all(&self.local_staging_path)
392+
.expect("Should be able to create dir if doesn't exist");
393+
394+
&self.local_staging_path
395+
}
396+
389397
/// TODO: refactor and document
390398
pub fn get_url(&self) -> Url {
391399
if self.ingestor_endpoint.is_empty() {

src/handlers/http/about.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
6666
let staging = if PARSEABLE.options.mode == Mode::Query {
6767
"".to_string()
6868
} else {
69-
PARSEABLE.staging_dir().display().to_string()
69+
PARSEABLE.options.staging_dir().display().to_string()
7070
};
7171
let grpc_port = PARSEABLE.options.grpc_port;
7272

src/handlers/http/modal/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ impl IngestorMetadata {
342342
let bytes = Bytes::from(serde_json::to_vec(&json)?);
343343

344344
let resource: IngestorMetadata = serde_json::from_value(json)?;
345-
resource.put_on_disk(PARSEABLE.staging_dir())?;
345+
resource.put_on_disk(PARSEABLE.options.staging_dir())?;
346346

347347
PARSEABLE
348348
.storage

src/migration/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
325325
}
326326

327327
pub fn get_staging_metadata(config: &Parseable) -> anyhow::Result<Option<serde_json::Value>> {
328-
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir());
328+
let path =
329+
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.options.staging_dir());
329330
let bytes = match std::fs::read(path) {
330331
Ok(bytes) => bytes,
331332
Err(err) => match err.kind() {
@@ -351,7 +352,7 @@ pub fn put_staging_metadata(
351352
config: &Parseable,
352353
metadata: &serde_json::Value,
353354
) -> anyhow::Result<()> {
354-
let path = config.staging_dir().join(".parseable.json");
355+
let path = config.options.staging_dir().join(".parseable.json");
355356
let mut file = OpenOptions::new()
356357
.create(true)
357358
.truncate(true)

src/parseable/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,6 @@ impl Parseable {
217217
self.storage.clone()
218218
}
219219

220-
pub fn staging_dir(&self) -> &PathBuf {
221-
&self.options.local_staging_path
222-
}
223-
224220
pub fn hot_tier_dir(&self) -> &Option<PathBuf> {
225221
&self.options.hot_tier_storage_path
226222
}

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
645645
}
646646

647647
async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
648-
if !Path::new(&PARSEABLE.staging_dir()).exists() {
648+
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
649649
return Ok(());
650650
}
651651

src/storage/store_metadata.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl Default for StorageMetadata {
7070
Self {
7171
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
7272
mode: PARSEABLE.storage.name().to_owned(),
73-
staging: PARSEABLE.staging_dir().to_path_buf(),
73+
staging: PARSEABLE.options.staging_dir().to_path_buf(),
7474
storage: PARSEABLE.storage.get_endpoint(),
7575
deployment_id: uid::gen(),
7676
server_mode: PARSEABLE.options.mode,
@@ -134,8 +134,8 @@ pub async fn resolve_parseable_metadata(
134134
if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest {
135135
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
136136
} else {
137-
create_dir_all(PARSEABLE.staging_dir())?;
138-
metadata.staging = PARSEABLE.staging_dir().canonicalize()?;
137+
create_dir_all(PARSEABLE.options.staging_dir())?;
138+
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
139139
// this flag is set to true so that metadata is copied to staging
140140
overwrite_staging = true;
141141
// overwrite remote in all and query mode
@@ -151,20 +151,20 @@ pub async fn resolve_parseable_metadata(
151151
Mode::Query => {
152152
overwrite_remote = true;
153153
metadata.server_mode = PARSEABLE.options.mode;
154-
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
154+
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
155155
},
156156
Mode::Ingest => {
157157
// if ingest server is started fetch the metadata from remote
158158
// update the server mode for local metadata
159159
metadata.server_mode = PARSEABLE.options.mode;
160-
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
160+
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
161161
},
162162
}
163163
Ok(metadata)
164164
}
165165
}
166166
EnvChange::CreateBoth => {
167-
create_dir_all(PARSEABLE.staging_dir())?;
167+
create_dir_all(PARSEABLE.options.staging_dir())?;
168168
let metadata = StorageMetadata::default();
169169
// new metadata needs to be set
170170
// if mode is query or all then both staging and remote
@@ -237,7 +237,8 @@ pub enum EnvChange {
237237
}
238238

239239
pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
240-
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(PARSEABLE.staging_dir());
240+
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
241+
.to_path(PARSEABLE.options.staging_dir());
241242
let bytes = match fs::read(path) {
242243
Ok(bytes) => bytes,
243244
Err(err) => match err.kind() {
@@ -259,8 +260,11 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec
259260
pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
260261
let mut staging_metadata = meta.clone();
261262
staging_metadata.server_mode = PARSEABLE.options.mode;
262-
staging_metadata.staging = PARSEABLE.staging_dir().to_path_buf();
263-
let path = PARSEABLE.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
263+
staging_metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
264+
let path = PARSEABLE
265+
.options
266+
.staging_dir()
267+
.join(PARSEABLE_METADATA_FILE_NAME);
264268
let mut file = OpenOptions::new()
265269
.create(true)
266270
.truncate(true)

0 commit comments

Comments
 (0)