Skip to content

Commit e041454

Browse files
committed
Refine catalog
1 parent 6a50097 commit e041454

File tree

7 files changed

+282
-190
lines changed

7 files changed

+282
-190
lines changed

crates/persistence/src/backend/catalog.rs

Lines changed: 144 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,20 @@ impl ParquetDataCatalog {
200200

201201
/// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
202202
///
203-
/// Supports various URI schemes including local file paths, S3 URIs, and other
204-
/// object store backends supported by the `object_store` crate.
203+
/// Supports various URI schemes including local file paths and multiple cloud storage backends
204+
/// supported by the `object_store` crate.
205+
///
206+
/// # Supported URI Schemes
207+
///
208+
/// - **AWS S3**: `s3://bucket/path`
209+
/// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`
210+
/// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://[email protected]/path`
211+
/// - **HTTP/WebDAV**: `http://` or `https://`
212+
/// - **Local files**: `file://path` or plain paths
205213
///
206214
/// # Parameters
207215
///
208-
/// - `uri`: The URI for the data storage location (e.g., "<s3://bucket/path>", "/local/path").
216+
/// - `uri`: The URI for the data storage location.
209217
/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
210218
/// - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
211219
/// - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
@@ -239,7 +247,19 @@ impl ParquetDataCatalog {
239247
/// None, None, None, None
240248
/// )?;
241249
///
242-
/// // S3 with custom endpoint
250+
/// // Google Cloud Storage
251+
/// let gcs_catalog = ParquetDataCatalog::from_uri(
252+
/// "gs://my-bucket/nautilus-data",
253+
/// None, None, None, None
254+
/// )?;
255+
///
256+
/// // Azure Blob Storage
257+
/// let azure_catalog = ParquetDataCatalog::from_uri(
258+
/// "azure://account/container/nautilus-data",
259+
/// None, None, None, None
260+
/// )?;
261+
///
262+
/// // S3 with custom endpoint and credentials
243263
/// let mut storage_options = HashMap::new();
244264
/// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
245265
/// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
@@ -720,16 +740,31 @@ impl ParquetDataCatalog {
720740
})
721741
}
722742

723-
/// Helper method to reconstruct full URI for S3 paths
743+
/// Helper method to reconstruct full URI for remote object store paths
724744
fn reconstruct_full_uri(&self, path_str: &str) -> String {
725-
if self.original_uri.starts_with("s3://") {
726-
// Extract bucket from the original URI
727-
let url = url::Url::parse(&self.original_uri).unwrap();
728-
let bucket = url.host_str().unwrap();
729-
format!("s3://{bucket}/{path_str}")
730-
} else {
731-
path_str.to_string()
745+
// Check if this is a remote URI scheme that needs reconstruction
746+
if self.is_remote_uri() {
747+
// Extract the base URL (scheme + host) from the original URI
748+
if let Ok(url) = url::Url::parse(&self.original_uri) {
749+
if let Some(host) = url.host_str() {
750+
return format!("{}://{}/{}", url.scheme(), host, path_str);
751+
}
752+
}
732753
}
754+
755+
// For local paths or if URL parsing fails, return the path as-is
756+
path_str.to_string()
757+
}
758+
759+
/// Helper method to check if the original URI uses a remote object store scheme
760+
fn is_remote_uri(&self) -> bool {
761+
self.original_uri.starts_with("s3://")
762+
|| self.original_uri.starts_with("gs://")
763+
|| self.original_uri.starts_with("gcs://")
764+
|| self.original_uri.starts_with("azure://")
765+
|| self.original_uri.starts_with("abfs://")
766+
|| self.original_uri.starts_with("http://")
767+
|| self.original_uri.starts_with("https://")
733768
}
734769

735770
/// Consolidates all data files in the catalog by merging multiple files into single files per directory.
@@ -1135,12 +1170,12 @@ impl ParquetDataCatalog {
11351170
where
11361171
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
11371172
{
1138-
// Register the object store with the session
1139-
if self.original_uri.starts_with("s3://") {
1173+
// Register the object store with the session for remote URIs
1174+
if self.is_remote_uri() {
11401175
let url = url::Url::parse(&self.original_uri)?;
11411176
let host = url
11421177
.host_str()
1143-
.ok_or_else(|| anyhow::anyhow!("S3 URI missing bucket name"))?;
1178+
.ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
11441179
let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
11451180
self.session
11461181
.register_object_store(&base_url, self.object_store.clone());
@@ -2077,4 +2112,98 @@ mod tests {
20772112
.starts_with(base_dir.to_string_lossy().as_ref())
20782113
);
20792114
}
2115+
2116+
#[test]
2117+
fn test_is_remote_uri() {
2118+
// Test S3 URIs
2119+
let s3_catalog =
2120+
ParquetDataCatalog::from_uri("s3://bucket/path", None, None, None, None).unwrap();
2121+
assert!(s3_catalog.is_remote_uri());
2122+
2123+
// Test GCS URIs
2124+
let gcs_catalog =
2125+
ParquetDataCatalog::from_uri("gs://bucket/path", None, None, None, None).unwrap();
2126+
assert!(gcs_catalog.is_remote_uri());
2127+
2128+
let gcs2_catalog =
2129+
ParquetDataCatalog::from_uri("gcs://bucket/path", None, None, None, None).unwrap();
2130+
assert!(gcs2_catalog.is_remote_uri());
2131+
2132+
// Test Azure URIs
2133+
let azure_catalog =
2134+
ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2135+
.unwrap();
2136+
assert!(azure_catalog.is_remote_uri());
2137+
2138+
let abfs_catalog = ParquetDataCatalog::from_uri(
2139+
"abfs://[email protected]/path",
2140+
None,
2141+
None,
2142+
None,
2143+
None,
2144+
)
2145+
.unwrap();
2146+
assert!(abfs_catalog.is_remote_uri());
2147+
2148+
// Test HTTP URIs
2149+
let http_catalog =
2150+
ParquetDataCatalog::from_uri("http://example.com/path", None, None, None, None)
2151+
.unwrap();
2152+
assert!(http_catalog.is_remote_uri());
2153+
2154+
let https_catalog =
2155+
ParquetDataCatalog::from_uri("https://example.com/path", None, None, None, None)
2156+
.unwrap();
2157+
assert!(https_catalog.is_remote_uri());
2158+
2159+
// Test local paths (should not be remote)
2160+
let tmp = tempfile::tempdir().unwrap();
2161+
let local_catalog =
2162+
ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2163+
assert!(!local_catalog.is_remote_uri());
2164+
2165+
let tmp_file = tempfile::tempdir().unwrap();
2166+
let file_uri = format!("file://{}", tmp_file.path().display());
2167+
let file_catalog = ParquetDataCatalog::from_uri(&file_uri, None, None, None, None).unwrap();
2168+
assert!(!file_catalog.is_remote_uri());
2169+
}
2170+
2171+
#[test]
2172+
fn test_reconstruct_full_uri() {
2173+
// Test S3 URI reconstruction
2174+
let s3_catalog =
2175+
ParquetDataCatalog::from_uri("s3://bucket/base/path", None, None, None, None).unwrap();
2176+
let reconstructed = s3_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2177+
assert_eq!(reconstructed, "s3://bucket/data/quotes/file.parquet");
2178+
2179+
// Test GCS URI reconstruction
2180+
let gcs_catalog =
2181+
ParquetDataCatalog::from_uri("gs://bucket/base/path", None, None, None, None).unwrap();
2182+
let reconstructed = gcs_catalog.reconstruct_full_uri("data/trades/file.parquet");
2183+
assert_eq!(reconstructed, "gs://bucket/data/trades/file.parquet");
2184+
2185+
// Test Azure URI reconstruction
2186+
let azure_catalog =
2187+
ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2188+
.unwrap();
2189+
let reconstructed = azure_catalog.reconstruct_full_uri("data/bars/file.parquet");
2190+
assert_eq!(reconstructed, "azure://account/data/bars/file.parquet");
2191+
2192+
// Test HTTP URI reconstruction
2193+
let http_catalog =
2194+
ParquetDataCatalog::from_uri("https://example.com/base/path", None, None, None, None)
2195+
.unwrap();
2196+
let reconstructed = http_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2197+
assert_eq!(
2198+
reconstructed,
2199+
"https://example.com/data/quotes/file.parquet"
2200+
);
2201+
2202+
// Test local path (should return path as-is)
2203+
let tmp = tempfile::tempdir().unwrap();
2204+
let local_catalog =
2205+
ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2206+
let reconstructed = local_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2207+
assert_eq!(reconstructed, "data/quotes/file.parquet");
2208+
}
20802209
}

nautilus_trader/data/engine.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ cdef class DataEngine(Component):
225225

226226
cpdef void _handle_response(self, DataResponse response)
227227
cpdef void _handle_instruments(self, list instruments, bint update_catalog = *, bint force_update_catalog = *)
228-
cpdef tuple[datetime, object] _catalog_last_timestamp(self, type data_cls, instrument_id: str | None = *)
228+
cpdef tuple[datetime, object] _catalog_last_timestamp(self, type data_cls, identifier: str | None = *)
229229
cpdef void _new_query_group(self, UUID4 correlation_id, int n_components)
230230
cpdef DataResponse _handle_query_group(self, DataResponse response)
231231
cdef DataResponse _handle_query_group_aux(self, DataResponse response)

0 commit comments

Comments
 (0)