Skip to content

Commit fe332cc

Browse files
Table provider factory for Arrow Flight and Flight SQL (#76)
Co-authored-by: Phillip LeBlanc <[email protected]>
1 parent e75565b commit fe332cc

File tree

11 files changed

+1260
-2
lines changed

11 files changed

+1260
-2
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ Cargo.lock
1717

1818
.vscode/settings.json
1919

20-
.DS_Store
20+
.DS_Store
21+
22+
.idea

Cargo.toml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,25 @@ edition = "2021"
66
repository = "https://github.com/datafusion-contrib/datafusion-table-providers"
77

88
[dependencies]
9-
arrow = "52.0.0"
9+
arrow = "52.2.0"
10+
arrow-array = { version = "52.2.0", optional = true }
11+
arrow-flight = { version = "52.2.0", optional = true, features = ["flight-sql-experimental", "tls"] }
12+
arrow-schema = { version = "52.2.0", optional = true, features = ["serde"] }
1013
arrow-json = "52.2.0"
1114
async-stream = { version = "0.3.5", optional = true }
1215
async-trait = "0.1.80"
1316
num-bigint = "0.4.4"
17+
base64 = { version = "0.22.1", optional = true }
18+
bytes = { version = "1.7.1", optional = true }
1419
bigdecimal = "0.4.5"
1520
bigdecimal_0_3_0 = { package = "bigdecimal", version = "0.3.0" }
1621
byteorder = "1.5.0"
1722
chrono = "0.4.38"
1823
datafusion = "41.0.0"
24+
datafusion-expr = { version = "41.0.0", optional = true }
25+
datafusion-physical-expr = { version = "41.0.0", optional = true }
26+
datafusion-physical-plan = { version = "41.0.0", optional = true }
27+
datafusion-proto = { version = "41.0.0", optional = true }
1928
duckdb = { version = "1", features = [
2029
"bundled",
2130
"r2d2",
@@ -26,10 +35,12 @@ duckdb = { version = "1", features = [
2635
fallible-iterator = "0.3.0"
2736
futures = "0.3.30"
2837
mysql_async = { version = "0.34.1", features = ["native-tls-tls", "chrono"], optional = true }
38+
prost = { version = "0.12" , optional = true } # pinned for arrow-flight compat
2939
r2d2 = { version = "0.8.10", optional = true }
3040
rusqlite = { version = "0.31.0", optional = true }
3141
sea-query = { version = "0.31.0", features = ["backend-sqlite", "backend-postgres", "postgres-array", "with-rust_decimal", "with-bigdecimal", "with-time", "with-chrono"] }
3242
secrecy = "0.8.0"
43+
serde = { version = "1.0.209", optional = true }
3344
serde_json = "1.0.124"
3445
snafu = "0.8.3"
3546
time = "0.3.36"
@@ -45,6 +56,7 @@ trust-dns-resolver = "0.23.2"
4556
url = "2.5.1"
4657
pem = { version = "3.0.4", optional = true }
4758
tokio-rusqlite = { version = "0.5.1", optional = true }
59+
tonic = { version = "0.11", optional = true } # pinned for arrow-flight compat
4860
datafusion-federation = "0.1"
4961
datafusion-federation-sql = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "21f07bec7284bcbff2bf4e570008290b66e3dc6f" }
5062
itertools = "0.13.0"
@@ -60,12 +72,27 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
6072
test-log = { version = "0.2.16", features = ["trace"] }
6173
rstest = "0.22.0"
6274
geozero = { version = "0.13.0", features = ["with-wkb"] }
75+
tokio-stream = { version = "0.1.15", features = ["net"] }
6376

6477
[features]
6578
mysql = ["dep:mysql_async", "dep:async-stream"]
6679
postgres = ["dep:tokio-postgres", "dep:uuid", "dep:postgres-native-tls", "dep:bb8", "dep:bb8-postgres", "dep:native-tls", "dep:pem", "dep:async-stream"]
6780
sqlite = ["dep:rusqlite", "dep:tokio-rusqlite"]
6881
duckdb = ["dep:duckdb", "dep:r2d2", "dep:uuid"]
82+
flight = [
83+
"dep:arrow-array",
84+
"dep:arrow-flight",
85+
"dep:arrow-schema",
86+
"dep:base64",
87+
"dep:bytes",
88+
"dep:datafusion-expr",
89+
"dep:datafusion-physical-expr",
90+
"dep:datafusion-physical-plan",
91+
"dep:datafusion-proto",
92+
"dep:prost",
93+
"dep:serde",
94+
"dep:tonic",
95+
]
6996
duckdb-federation = ["duckdb"]
7097
sqlite-federation = ["sqlite"]
7198

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Many of the table providers in this repo are for querying data from other databa
1212
- MySQL
1313
- SQLite
1414
- DuckDB
15+
- Flight SQL
1516

1617
## Examples
1718

@@ -79,3 +80,13 @@ EOF
7980
```bash
8081
cargo run --example mysql --features mysql
8182
```
83+
84+
### Flight SQL
85+
```bash
86+
brew install roapi
87+
# or
88+
#cargo install --locked --git https://github.com/roapi/roapi --branch main --bins roapi
89+
roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet &
90+
91+
cargo run --example flight-sql --features flight
92+
```

examples/flight-sql.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::prelude::SessionContext;
19+
use datafusion_table_providers::flight::sql::{FlightSqlDriver, QUERY};
20+
use datafusion_table_providers::flight::FlightTableFactory;
21+
use std::collections::HashMap;
22+
use std::sync::Arc;
23+
24+
/// Prerequisites:
25+
/// ```
26+
/// $ brew install roapi
27+
/// $ roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
28+
/// ```
29+
#[tokio::main]
30+
async fn main() -> datafusion::common::Result<()> {
31+
let ctx = SessionContext::new();
32+
let flight_sql = FlightTableFactory::new(Arc::new(FlightSqlDriver::default()));
33+
let table = flight_sql
34+
.open_table(
35+
"http://localhost:32010",
36+
HashMap::from([(QUERY.into(), "SELECT * FROM taxi".into())]),
37+
)
38+
.await?;
39+
ctx.register_table("trip_data", Arc::new(table))?;
40+
ctx.sql("select * from trip_data limit 10")
41+
.await?
42+
.show()
43+
.await?;
44+
45+
// The same table created using DDL
46+
ctx.state_ref()
47+
.write()
48+
.table_factories_mut()
49+
.insert("FLIGHT_SQL".into(), Arc::new(flight_sql));
50+
let _ = ctx
51+
.sql(
52+
r#"
53+
CREATE EXTERNAL TABLE trip_data2 STORED AS FLIGHT_SQL
54+
LOCATION 'http://localhost:32010'
55+
OPTIONS (
56+
'flight.sql.query' 'SELECT * FROM taxi'
57+
)
58+
"#,
59+
)
60+
.await?;
61+
62+
let df = ctx
63+
.sql(
64+
r#"
65+
SELECT "VendorID", COUNT(*), SUM(passenger_count), SUM(total_amount)
66+
FROM trip_data2
67+
GROUP BY "VendorID"
68+
ORDER BY COUNT(*) DESC
69+
"#,
70+
)
71+
.await?;
72+
df.clone().explain(false, false)?.show().await?;
73+
df.show().await
74+
}

0 commit comments

Comments
 (0)