Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 208 additions & 3 deletions datafusion/core/tests/datasource/object_store_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource_csv::CsvFormat;
use futures::stream::BoxStream;
use insta::assert_snapshot;
use object_store::memory::InMemory;
Expand Down Expand Up @@ -123,6 +126,163 @@ async fn query_multi_csv_file() {
);
}

#[tokio::test]
async fn query_partitioned_csv_file() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also please add a test with a query that applies predicates to the three partition columns?

Something like

select * from csv_table_partitioned WHERE a = 2;
-- apply predicate to last in directory
select * from csv_table_partitioned WHERE c = 200;
-- apply predicate to both
select * from csv_table_partitioned WHERE a = 2 AND b = 20;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely! I'll go ahead and add those onto this PR rather than a follow-on. It should be pretty quick and we can reduce some PR noise by combining them.

let test = Test::new().with_partitioned_csv().await;
assert_snapshot!(
test.query("select * from csv_table_partitioned").await,
@r"
------- Query Output (6 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00001 | 1e-12 | true | 1 | 10 | 100 |
| 0.00003 | 5e-12 | false | 1 | 10 | 100 |
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
| 0.00003 | 3e-12 | true | 3 | 30 | 300 |
| 0.00003 | 5e-12 | false | 3 | 30 | 300 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 13
- LIST (with delimiter) prefix=data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it super clear what is going on. It is a terrifying number of LIST commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one for each directory! I had initially used 3 files in each directory, but I thought this test produced an even more interesting result because there are more list requests than there are data files.

I will say one thing we can't easily see here is the sequencing and parallelism of the list requests. The current implementation does a pretty good job of hiding the latency behind concurrency.

- LIST (with delimiter) prefix=data/a=1
- LIST (with delimiter) prefix=data/a=2
- LIST (with delimiter) prefix=data/a=3
- LIST (with delimiter) prefix=data/a=1/b=10
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=3/b=30
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
- GET (opts) path=data/a=3/b=30/c=300/file_3.csv
"
);

assert_snapshot!(
test.query("select * from csv_table_partitioned WHERE a=2").await,
@r"
------- Query Output (2 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
- LIST (with delimiter) prefix=data/a=2
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);

assert_snapshot!(
test.query("select * from csv_table_partitioned WHERE b=20").await,
@r"
------- Query Output (2 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 11
- LIST (with delimiter) prefix=data
- LIST (with delimiter) prefix=data/a=1
- LIST (with delimiter) prefix=data/a=2
- LIST (with delimiter) prefix=data/a=3
- LIST (with delimiter) prefix=data/a=1/b=10
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=3/b=30
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);

assert_snapshot!(
test.query("select * from csv_table_partitioned WHERE c=200").await,
@r"
------- Query Output (2 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 11
- LIST (with delimiter) prefix=data
- LIST (with delimiter) prefix=data/a=1
- LIST (with delimiter) prefix=data/a=2
- LIST (with delimiter) prefix=data/a=3
- LIST (with delimiter) prefix=data/a=1/b=10
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=3/b=30
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);

assert_snapshot!(
test.query("select * from csv_table_partitioned WHERE a=2 AND b=20").await,
@r"
------- Query Output (2 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 3
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);

assert_snapshot!(
test.query("select * from csv_table_partitioned WHERE a<2 AND b=10 AND c=100").await,
@r"
------- Query Output (2 rows) -------
+---------+-------+-------+---+----+-----+
| d1 | d2 | d3 | a | b | c |
+---------+-------+-------+---+----+-----+
| 0.00001 | 1e-12 | true | 1 | 10 | 100 |
| 0.00003 | 5e-12 | false | 1 | 10 | 100 |
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 11
- LIST (with delimiter) prefix=data
- LIST (with delimiter) prefix=data/a=1
- LIST (with delimiter) prefix=data/a=2
- LIST (with delimiter) prefix=data/a=3
- LIST (with delimiter) prefix=data/a=1/b=10
- LIST (with delimiter) prefix=data/a=2/b=20
- LIST (with delimiter) prefix=data/a=3/b=30
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
"
);
}

#[tokio::test]
async fn create_single_parquet_file_default() {
// The default metadata size hint is 512KB
Expand Down Expand Up @@ -363,7 +523,7 @@ impl Test {
self
}

/// Register a CSV file at the given path relative to the [`datafusion_test_data`] directory
/// Register a CSV file at the given path
async fn register_csv(self, table_name: &str, path: &str) -> Self {
let mut options = CsvReadOptions::new();
options.has_header = true;
Expand All @@ -375,8 +535,30 @@ impl Test {
self
}

/// Register a Parquet file at the given path relative to the
/// [`datafusion_test_data`] directory
/// Register a partitioned CSV table at the given path
async fn register_partitioned_csv(self, table_name: &str, path: &str) -> Self {
let file_format = Arc::new(CsvFormat::default().with_has_header(true));
let options = ListingOptions::new(file_format);

let url = format!("mem://{path}").parse().unwrap();
let table_url = ListingTableUrl::try_new(url, None).unwrap();

let session_state = self.session_context.state();
let mut config = ListingTableConfig::new(table_url).with_listing_options(options);
config = config
.infer_partitions_from_path(&session_state)
.await
.unwrap();
config = config.infer_schema(&session_state).await.unwrap();

let table = Arc::new(ListingTable::try_new(config).unwrap());
self.session_context
.register_table(table_name, table)
.unwrap();
self
}

/// Register a Parquet file at the given path
async fn register_parquet(self, table_name: &str, path: &str) -> Self {
let path = format!("mem://{path}");
let mut options: ParquetReadOptions<'_> = ParquetReadOptions::new();
Expand Down Expand Up @@ -425,6 +607,29 @@ impl Test {
self.register_csv("csv_table", "/data/").await
}

/// Register three CSV files in a partitioned directory structure, called
/// `csv_table_partitioned`
async fn with_partitioned_csv(mut self) -> Test {
for i in 1..4 {
// upload CSV data to object store
let csv_data1 = format!(
r#"d1,d2,d3
0.0000{i},{i}e-12,true
0.00003,5e-12,false
"#
);
self = self
.with_bytes(
&format!("/data/a={i}/b={}/c={}/file_{i}.csv", i * 10, i * 100,),
csv_data1,
)
.await;
}
// register table
self.register_partitioned_csv("csv_table_partitioned", "/data/")
.await
}

/// Add a single parquet file that has two columns and two row groups named `parquet_table`
///
/// Column "a": Int32 with values 0-100] in row group 1
Expand Down