Skip to content

Refine catalog #2720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 144 additions & 15 deletions crates/persistence/src/backend/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,20 @@ impl ParquetDataCatalog {

/// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
///
/// Supports various URI schemes including local file paths, S3 URIs, and other
/// object store backends supported by the `object_store` crate.
/// Supports various URI schemes including local file paths and multiple cloud storage backends
/// supported by the `object_store` crate.
///
/// # Supported URI Schemes
///
/// - **AWS S3**: `s3://bucket/path`
/// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`
/// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://[email protected]/path`
/// - **HTTP/WebDAV**: `http://` or `https://`
/// - **Local files**: `file://path` or plain paths
///
/// # Parameters
///
/// - `uri`: The URI for the data storage location (e.g., "<s3://bucket/path>", "/local/path").
/// - `uri`: The URI for the data storage location.
/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
/// - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
/// - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
Expand Down Expand Up @@ -239,7 +247,19 @@ impl ParquetDataCatalog {
/// None, None, None, None
/// )?;
///
/// // S3 with custom endpoint
/// // Google Cloud Storage
/// let gcs_catalog = ParquetDataCatalog::from_uri(
/// "gs://my-bucket/nautilus-data",
/// None, None, None, None
/// )?;
///
/// // Azure Blob Storage
/// let azure_catalog = ParquetDataCatalog::from_uri(
/// "azure://account/container/nautilus-data",
/// None, None, None, None
/// )?;
///
/// // S3 with custom endpoint and credentials
/// let mut storage_options = HashMap::new();
/// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
/// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
Expand Down Expand Up @@ -720,16 +740,31 @@ impl ParquetDataCatalog {
})
}

/// Helper method to reconstruct full URI for S3 paths
/// Helper method to reconstruct full URI for remote object store paths
fn reconstruct_full_uri(&self, path_str: &str) -> String {
if self.original_uri.starts_with("s3://") {
// Extract bucket from the original URI
let url = url::Url::parse(&self.original_uri).unwrap();
let bucket = url.host_str().unwrap();
format!("s3://{bucket}/{path_str}")
} else {
path_str.to_string()
// Check if this is a remote URI scheme that needs reconstruction
if self.is_remote_uri() {
// Extract the base URL (scheme + host) from the original URI
if let Ok(url) = url::Url::parse(&self.original_uri) {
if let Some(host) = url.host_str() {
return format!("{}://{}/{}", url.scheme(), host, path_str);
}
}
}

// For local paths or if URL parsing fails, return the path as-is
path_str.to_string()
}

/// Helper method to check if the original URI uses a remote object store scheme
fn is_remote_uri(&self) -> bool {
self.original_uri.starts_with("s3://")
|| self.original_uri.starts_with("gs://")
|| self.original_uri.starts_with("gcs://")
|| self.original_uri.starts_with("azure://")
|| self.original_uri.starts_with("abfs://")
|| self.original_uri.starts_with("http://")
|| self.original_uri.starts_with("https://")
}

/// Consolidates all data files in the catalog by merging multiple files into single files per directory.
Expand Down Expand Up @@ -1135,12 +1170,12 @@ impl ParquetDataCatalog {
where
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
{
// Register the object store with the session
if self.original_uri.starts_with("s3://") {
// Register the object store with the session for remote URIs
if self.is_remote_uri() {
let url = url::Url::parse(&self.original_uri)?;
let host = url
.host_str()
.ok_or_else(|| anyhow::anyhow!("S3 URI missing bucket name"))?;
.ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
self.session
.register_object_store(&base_url, self.object_store.clone());
Expand Down Expand Up @@ -2077,4 +2112,98 @@ mod tests {
.starts_with(base_dir.to_string_lossy().as_ref())
);
}

#[test]
fn test_is_remote_uri() {
// Test S3 URIs
let s3_catalog =
ParquetDataCatalog::from_uri("s3://bucket/path", None, None, None, None).unwrap();
assert!(s3_catalog.is_remote_uri());

// Test GCS URIs
let gcs_catalog =
ParquetDataCatalog::from_uri("gs://bucket/path", None, None, None, None).unwrap();
assert!(gcs_catalog.is_remote_uri());

let gcs2_catalog =
ParquetDataCatalog::from_uri("gcs://bucket/path", None, None, None, None).unwrap();
assert!(gcs2_catalog.is_remote_uri());

// Test Azure URIs
let azure_catalog =
ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
.unwrap();
assert!(azure_catalog.is_remote_uri());

let abfs_catalog = ParquetDataCatalog::from_uri(
"abfs://[email protected]/path",
None,
None,
None,
None,
)
.unwrap();
assert!(abfs_catalog.is_remote_uri());

// Test HTTP URIs
let http_catalog =
ParquetDataCatalog::from_uri("http://example.com/path", None, None, None, None)
.unwrap();
assert!(http_catalog.is_remote_uri());

let https_catalog =
ParquetDataCatalog::from_uri("https://example.com/path", None, None, None, None)
.unwrap();
assert!(https_catalog.is_remote_uri());

// Test local paths (should not be remote)
let tmp = tempfile::tempdir().unwrap();
let local_catalog =
ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
assert!(!local_catalog.is_remote_uri());

let tmp_file = tempfile::tempdir().unwrap();
let file_uri = format!("file://{}", tmp_file.path().display());
let file_catalog = ParquetDataCatalog::from_uri(&file_uri, None, None, None, None).unwrap();
assert!(!file_catalog.is_remote_uri());
}

#[test]
fn test_reconstruct_full_uri() {
// Test S3 URI reconstruction
let s3_catalog =
ParquetDataCatalog::from_uri("s3://bucket/base/path", None, None, None, None).unwrap();
let reconstructed = s3_catalog.reconstruct_full_uri("data/quotes/file.parquet");
assert_eq!(reconstructed, "s3://bucket/data/quotes/file.parquet");

// Test GCS URI reconstruction
let gcs_catalog =
ParquetDataCatalog::from_uri("gs://bucket/base/path", None, None, None, None).unwrap();
let reconstructed = gcs_catalog.reconstruct_full_uri("data/trades/file.parquet");
assert_eq!(reconstructed, "gs://bucket/data/trades/file.parquet");

// Test Azure URI reconstruction
let azure_catalog =
ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
.unwrap();
let reconstructed = azure_catalog.reconstruct_full_uri("data/bars/file.parquet");
assert_eq!(reconstructed, "azure://account/data/bars/file.parquet");

// Test HTTP URI reconstruction
let http_catalog =
ParquetDataCatalog::from_uri("https://example.com/base/path", None, None, None, None)
.unwrap();
let reconstructed = http_catalog.reconstruct_full_uri("data/quotes/file.parquet");
assert_eq!(
reconstructed,
"https://example.com/data/quotes/file.parquet"
);

// Test local path (should return path as-is)
let tmp = tempfile::tempdir().unwrap();
let local_catalog =
ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
let reconstructed = local_catalog.reconstruct_full_uri("data/quotes/file.parquet");
assert_eq!(reconstructed, "data/quotes/file.parquet");
}
}
10 changes: 5 additions & 5 deletions nautilus_trader/backtest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,22 +294,22 @@ def query(self) -> dict[str, Any]: # noqa: C901
else:
filter_expr = self.filter_expr

used_instrument_ids = None
used_identifiers = None

if self.instrument_id is not None:
used_instrument_ids = [self.instrument_id]
used_identifiers = [self.instrument_id]
elif self.instrument_ids is not None:
used_instrument_ids = self.instrument_ids
used_identifiers = self.instrument_ids
elif self.bar_types is not None:
bar_types: list[BarType] = [
BarType.from_str(bar_type) if type(bar_type) is str else bar_type
for bar_type in self.bar_types
]
used_instrument_ids = [bar_type.instrument_id for bar_type in bar_types]
used_identifiers = [bar_type.instrument_id for bar_type in bar_types]

return {
"data_cls": self.data_type,
"instrument_ids": used_instrument_ids,
"identifiers": used_identifiers,
"start": self.start_time,
"end": self.end_time,
"filter_expr": parse_filters_expr(filter_expr),
Expand Down
3 changes: 1 addition & 2 deletions nautilus_trader/backtest/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,7 @@ def _run_streaming( # noqa: C901

session = catalog.backend_session(
data_cls=config.data_type,
instrument_ids=used_instrument_ids,
bar_types=(used_bar_types if len(used_bar_types) > 0 else None),
identifiers=(used_bar_types or used_instrument_ids),
start=used_start,
end=used_end,
session=session,
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/data/engine.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ cdef class DataEngine(Component):

cpdef void _handle_response(self, DataResponse response)
cpdef void _handle_instruments(self, list instruments, bint update_catalog = *, bint force_update_catalog = *)
cpdef tuple[datetime, object] _catalog_last_timestamp(self, type data_cls, instrument_id: str | None = *)
cpdef tuple[datetime, object] _catalog_last_timestamp(self, type data_cls, identifier: str | None = *)
cpdef void _new_query_group(self, UUID4 correlation_id, int n_components)
cpdef DataResponse _handle_query_group(self, DataResponse response)
cdef DataResponse _handle_query_group_aux(self, DataResponse response)
Expand Down
Loading
Loading