Skip to content

Commit cd2861a

Browse files
author
Devdutt Shenoi
committed
refactor: code associated with ingestor metadata
1 parent 11e9a9a commit cd2861a

File tree

1 file changed

+58
-75
lines changed

1 file changed

+58
-75
lines changed

src/handlers/http/modal/mod.rs

Lines changed: 58 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use std::{path::Path, sync::Arc};
2121
use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer};
2222
use actix_web_prometheus::PrometheusMetrics;
2323
use async_trait::async_trait;
24-
use base64::Engine;
24+
use base64::{prelude::BASE64_STANDARD, Engine};
2525
use bytes::Bytes;
2626
use openid::Discovered;
2727
use relative_path::RelativePathBuf;
2828
use serde::{Deserialize, Serialize};
29-
use serde_json::Value;
29+
use serde_json::{Map, Value};
3030
use ssl_acceptor::get_ssl_acceptor;
3131
use tokio::sync::oneshot;
3232
use tracing::{error, info, warn};
@@ -215,8 +215,10 @@ impl IngestorMetadata {
215215
/// Capture metadata information by either loading it from staging or starting fresh
216216
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
217217
// all the files should be in the staging directory root
218-
let entries =
219-
std::fs::read_dir(options.staging_dir()).expect("Couldn't read from file");
218+
let entries = options
219+
.staging_dir()
220+
.read_dir()
221+
.expect("Couldn't read from file");
220222
let url = options.get_url();
221223
let port = url.port().unwrap_or(80).to_string();
222224
let url = url.to_string();
@@ -230,65 +232,48 @@ impl IngestorMetadata {
230232
// cause the staging directory will have only one file with ingestor in the name
231233
// so the JSON Parse should not error unless the file is corrupted
232234
let path = entry.expect("Should be a directory entry").path();
233-
let flag = path
235+
if !path
234236
.file_name()
235-
.unwrap_or_default()
236-
.to_str()
237-
.unwrap_or_default()
238-
.contains("ingestor");
239-
240-
if flag {
241-
// get the ingestor metadata from staging
242-
let text = std::fs::read(path).expect("File should be present");
243-
let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON");
244-
245-
// migrate the staging meta
246-
let obj = meta
247-
.as_object_mut()
248-
.expect("Could Not parse Ingestor Metadata Json");
249-
250-
if obj.get("flight_port").is_none() {
251-
obj.insert(
252-
"flight_port".to_owned(),
253-
Value::String(options.flight_port.to_string()),
254-
);
255-
}
256-
257-
let mut meta: IngestorMetadata =
258-
serde_json::from_value(meta).expect("Couldn't write to disk");
259-
260-
// compare url endpoint and port
261-
if meta.domain_name != url {
262-
info!(
263-
"Domain Name was Updated. Old: {} New: {}",
264-
meta.domain_name, url
265-
);
266-
meta.domain_name = url;
267-
}
268-
269-
if meta.port != port {
270-
info!("Port was Updated. Old: {} New: {}", meta.port, port);
271-
meta.port = port;
272-
}
273-
274-
let token =
275-
base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));
276-
277-
let token = format!("Basic {}", token);
278-
279-
if meta.token != token {
280-
// TODO: Update the message to be more informative with username and password
281-
info!(
282-
"Credentials were Updated. Old: {} New: {}",
283-
meta.token, token
284-
);
285-
meta.token = token;
286-
}
287-
288-
meta.put_on_disk(staging_path)
289-
.expect("Couldn't write to disk");
290-
return Arc::new(meta);
237+
.and_then(|s| s.to_str())
238+
.is_some_and(|s| s.contains("ingestor"))
239+
{
240+
continue;
241+
}
242+
243+
// get the ingestor metadata from staging
244+
let bytes = std::fs::read(path).expect("File should be present");
245+
let mut meta = Self::from_bytes(&bytes).expect("Extracted ingestor metadata");
246+
247+
// compare url endpoint and port, update
248+
if meta.domain_name != url {
249+
info!(
250+
"Domain Name was Updated. Old: {} New: {}",
251+
meta.domain_name, url
252+
);
253+
meta.domain_name = url;
254+
}
255+
256+
if meta.port != port {
257+
info!("Port was Updated. Old: {} New: {}", meta.port, port);
258+
meta.port = port;
259+
}
260+
261+
let token = format!(
262+
"Basic {}",
263+
BASE64_STANDARD.encode(format!("{username}:{password}"))
264+
);
265+
if meta.token != token {
266+
// TODO: Update the message to be more informative with username and password
267+
warn!(
268+
"Credentials were Updated. Tokens updated; Old: {} New: {}",
269+
meta.token, token
270+
);
271+
meta.token = token;
291272
}
273+
meta.put_on_disk(staging_path)
274+
.expect("Couldn't write to disk");
275+
276+
return Arc::new(meta);
292277
}
293278

294279
let storage = storage.get_object_store();
@@ -319,6 +304,15 @@ impl IngestorMetadata {
319304
])
320305
}
321306

307+
/// Updates json with `flight_port` field if not already present
308+
fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
309+
let mut json: Map<String, Value> = serde_json::from_slice(bytes)?;
310+
json.entry("flight_port")
311+
.or_insert_with(|| Value::String(PARSEABLE.options.flight_port.to_string()));
312+
313+
Ok(serde_json::from_value(Value::Object(json))?)
314+
}
315+
322316
pub async fn migrate(&self) -> anyhow::Result<Option<IngestorMetadata>> {
323317
let imp = self.file_path();
324318
let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await {
@@ -327,21 +321,10 @@ impl IngestorMetadata {
327321
return Ok(None);
328322
}
329323
};
330-
let mut json = serde_json::from_slice::<Value>(&bytes)?;
331-
let meta = json
332-
.as_object_mut()
333-
.ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?;
334-
let fp = meta.get("flight_port");
335-
336-
if fp.is_none() {
337-
meta.insert(
338-
"flight_port".to_owned(),
339-
Value::String(PARSEABLE.options.flight_port.to_string()),
340-
);
341-
}
342-
let bytes = Bytes::from(serde_json::to_vec(&json)?);
343324

344-
let resource: IngestorMetadata = serde_json::from_value(json)?;
325+
let resource = Self::from_bytes(&bytes)?;
326+
let bytes = Bytes::from(serde_json::to_vec(&resource)?);
327+
345328
resource.put_on_disk(PARSEABLE.options.staging_dir())?;
346329

347330
PARSEABLE

0 commit comments

Comments
 (0)