Skip to content
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

# Development instructions

Many dependencies are behind [cargo feature flags](https://doc.rust-lang.org/cargo/reference/features.html). When developing locally you may want to enable some or all of them. You can use `--all-features` option with cargo. If you're using VSCode you may want to add: `"rust-analyzer.cargo.features": "all"` to your `.vscode/settings.json` file to have full IDE support.

Before submitting PR run linter clippy with command like: `cargo clippy --all-features -- -D warnings` or `cargo clippy --features azure -- -D warnings` for a specific feature. It's part of CI suite so any warnings here would be flagged in a PR.

Additionally remember to run code formatter with command: `cargo fmt --all` as part of CI suite is running: `cargo fmt --all -- --check` which will throw error if it detects that some code is not formatted properly.

## Running Tests

Tests can be run using `cargo`
Expand Down
28 changes: 28 additions & 0 deletions src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub struct MicrosoftAzureBuilder {
fabric_cluster_identifier: Option<String>,
/// The [`HttpConnector`] to use
http_connector: Option<Arc<dyn HttpConnector>>,
/// When listing objects, ignore paths that throw and error when parsing
ignore_unparsable_paths: ConfigValue<bool>,
}

/// Configuration keys for [`MicrosoftAzureBuilder`]
Expand Down Expand Up @@ -380,6 +382,18 @@ pub enum AzureConfigKey {
/// - `fabric_cluster_identifier`
FabricClusterIdentifier,

/// When listing objects, ignore paths that do not conform to the Path class assumptions
///
/// e.g. Azure allows to have paths like `foo/bar//baz` which would throw an error
/// when trying to parse them into a Path.
///
/// When set to true, these paths will be ignored.
///
/// Supported keys:
/// - `azure_ignore_unparsable_paths`
/// - `ignore_unparsable_paths`
IgnoreUnparsablePaths,

/// Client options
Client(ClientConfigKey),
}
Expand Down Expand Up @@ -410,6 +424,7 @@ impl AsRef<str> for AzureConfigKey {
Self::FabricWorkloadHost => "azure_fabric_workload_host",
Self::FabricSessionToken => "azure_fabric_session_token",
Self::FabricClusterIdentifier => "azure_fabric_cluster_identifier",
Self::IgnoreUnparsablePaths => "azure_ignore_unparsable_paths",
Self::Client(key) => key.as_ref(),
}
}
Expand Down Expand Up @@ -466,6 +481,9 @@ impl FromStr for AzureConfigKey {
"azure_fabric_cluster_identifier" | "fabric_cluster_identifier" => {
Ok(Self::FabricClusterIdentifier)
}
"azure_ignore_unparsable_paths" | "ignore_unparsable_paths" => {
Ok(Self::IgnoreUnparsablePaths)
}
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.strip_prefix("azure_").unwrap_or(s).parse() {
Expand Down Expand Up @@ -594,6 +612,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::FabricClusterIdentifier => {
self.fabric_cluster_identifier = Some(value.into())
}
AzureConfigKey::IgnoreUnparsablePaths => self.ignore_unparsable_paths.parse(value),
};
self
}
Expand Down Expand Up @@ -635,6 +654,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::FabricWorkloadHost => self.fabric_workload_host.clone(),
AzureConfigKey::FabricSessionToken => self.fabric_session_token.clone(),
AzureConfigKey::FabricClusterIdentifier => self.fabric_cluster_identifier.clone(),
AzureConfigKey::IgnoreUnparsablePaths => Some(self.ignore_unparsable_paths.to_string()),
}
}

Expand Down Expand Up @@ -901,6 +921,13 @@ impl MicrosoftAzureBuilder {
self
}

/// If set to `true` list operations will ignore paths that do not conform to the Path class assumptions
/// e.g. foo/bar//baz.txt
pub fn with_ignore_unparsable_paths(mut self, ignore_unparsable_paths: bool) -> Self {
self.ignore_unparsable_paths = ignore_unparsable_paths.into();
self
}

/// Configure a connection to container with given name on Microsoft Azure Blob store.
pub fn build(mut self) -> Result<MicrosoftAzure> {
if let Some(url) = self.url.take() {
Expand Down Expand Up @@ -1044,6 +1071,7 @@ impl MicrosoftAzureBuilder {
client_options: self.client_options,
service: storage_url,
credentials: auth,
ignore_unparsable_paths: self.ignore_unparsable_paths.get()?,
};

let http_client = http.connect(&config.client_options)?;
Expand Down
172 changes: 164 additions & 8 deletions src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub(crate) struct AzureConfig {
pub skip_signature: bool,
pub disable_tagging: bool,
pub client_options: ClientOptions,
pub ignore_unparsable_paths: bool,
}

impl AzureConfig {
Expand Down Expand Up @@ -1021,7 +1022,7 @@ impl ListClient for Arc<AzureClient> {
let token = response.next_marker.take().filter(|x| !x.is_empty());

Ok(PaginatedListResult {
result: to_list_result(response, prefix)?,
result: to_list_result(response, prefix, self.config.ignore_unparsable_paths)?,
page_token: token,
})
}
Expand All @@ -1038,7 +1039,11 @@ struct ListResultInternal {
pub blobs: Blobs,
}

fn to_list_result(value: ListResultInternal, prefix: Option<&str>) -> Result<ListResult> {
fn to_list_result(
value: ListResultInternal,
prefix: Option<&str>,
ignore_unparsable_paths: bool,
) -> Result<ListResult> {
let prefix = prefix.unwrap_or_default();
let common_prefixes = value
.blobs
Expand All @@ -1058,6 +1063,9 @@ fn to_list_result(value: ListResultInternal, prefix: Option<&str>) -> Result<Lis
!matches!(blob.properties.resource_type.as_ref(), Some(typ) if typ == "directory")
&& blob.name.len() > prefix.len()
})
.map(BlobInternal::try_from)
.filter(|parsed| !ignore_unparsable_paths || parsed.is_ok())
.map(|parsed| parsed.unwrap())
.map(ObjectMeta::try_from)
.collect::<Result<_>>()?;

Expand Down Expand Up @@ -1096,15 +1104,31 @@ struct Blob {
pub metadata: Option<HashMap<String, String>>,
}

impl TryFrom<Blob> for ObjectMeta {
struct BlobInternal {
pub blob: Blob,
pub path: Path,
}

impl TryFrom<Blob> for BlobInternal {
type Error = crate::path::Error;

fn try_from(value: Blob) -> Result<Self, crate::path::Error> {
Ok(Self {
path: Path::parse(&value.name)?,
blob: value,
})
}
}

impl TryFrom<BlobInternal> for ObjectMeta {
type Error = crate::Error;

fn try_from(value: Blob) -> Result<Self> {
fn try_from(value: BlobInternal) -> Result<Self> {
Ok(Self {
location: Path::parse(value.name)?,
last_modified: value.properties.last_modified,
size: value.properties.content_length,
e_tag: value.properties.e_tag,
location: value.path,
last_modified: value.blob.properties.last_modified,
size: value.blob.properties.content_length,
e_tag: value.blob.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
}
Expand Down Expand Up @@ -1403,6 +1427,7 @@ mod tests {
skip_signature: false,
disable_tagging: false,
client_options: Default::default(),
ignore_unparsable_paths: Default::default(),
};

let client = AzureClient::new(config, HttpClient::new(Client::new()));
Expand Down Expand Up @@ -1540,4 +1565,135 @@ Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
assert_eq!("404", code);
assert_eq!("The specified blob does not exist.", reason);
}

#[tokio::test]
async fn test_list_blobs() {
let fake_properties = BlobProperties {
last_modified: Utc::now(),
content_length: 8,
content_type: "text/plain".to_string(),
content_encoding: None,
content_language: None,
e_tag: Some("etag".to_string()),
resource_type: Some("resource".to_string()),
};
let fake_result = ListResultInternal {
prefix: None,
max_results: None,
delimiter: None,
next_marker: None,
blobs: Blobs {
blob_prefix: vec![],
blobs: vec![
Blob {
name: "blob0.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
Blob {
name: "blob1.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
],
},
};
let result = to_list_result(fake_result, None, false).unwrap();
assert_eq!(result.common_prefixes.len(), 0);
assert_eq!(result.objects.len(), 2);
assert_eq!(result.objects[0].location, Path::from("blob0.txt"));
assert_eq!(result.objects[1].location, Path::from("blob1.txt"));
}

#[tokio::test]
#[should_panic]
async fn test_list_blobs_invalid_paths() {
let fake_properties = BlobProperties {
last_modified: Utc::now(),
content_length: 8,
content_type: "text/plain".to_string(),
content_encoding: None,
content_language: None,
e_tag: Some("etag".to_string()),
resource_type: Some("resource".to_string()),
};
let fake_result = ListResultInternal {
prefix: None,
max_results: None,
delimiter: None,
next_marker: None,
blobs: Blobs {
blob_prefix: vec![],
blobs: vec![
Blob {
name: "foo/blob0.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
Blob {
name: "foo//blob1.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
],
},
};
to_list_result(fake_result, None, false).unwrap();
}

#[tokio::test]
async fn test_list_blobs_ignore_invalid_paths() {
let fake_properties = BlobProperties {
last_modified: Utc::now(),
content_length: 8,
content_type: "text/plain".to_string(),
content_encoding: None,
content_language: None,
e_tag: Some("etag".to_string()),
resource_type: Some("resource".to_string()),
};
let fake_result = ListResultInternal {
prefix: None,
max_results: None,
delimiter: None,
next_marker: None,
blobs: Blobs {
blob_prefix: vec![],
blobs: vec![
Blob {
name: "foo/blob0.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
Blob {
name: "foo//blob1.txt".to_string(),
version_id: None,
is_current_version: None,
deleted: None,
properties: fake_properties.clone(),
metadata: None,
},
],
},
};
let result = to_list_result(fake_result, None, true).unwrap();
assert_eq!(result.common_prefixes.len(), 0);
assert_eq!(result.objects.len(), 1);
assert_eq!(result.objects[0].location, Path::from("foo/blob0.txt"));
}
}