From ccb214d69344e174ada83b9c01863c3a2b8479eb Mon Sep 17 00:00:00 2001 From: "Ning Sun (aider)" Date: Mon, 16 Jun 2025 17:48:35 -0700 Subject: [PATCH 1/3] feat: add datafusion feature to arrow-pg --- arrow-pg/Cargo.toml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/arrow-pg/Cargo.toml b/arrow-pg/Cargo.toml index 4d563e0..2599d4b 100644 --- a/arrow-pg/Cargo.toml +++ b/arrow-pg/Cargo.toml @@ -12,10 +12,15 @@ documentation.workspace = true readme = "../README.md" rust-version.workspace = true +[features] +default = [] +datafusion = ["dep:datafusion-core"] + [dependencies] arrow.workspace = true bytes.workspace = true -chrono.workspace = true +chrono.workspace = true +datafusion-core = { workspace = true, optional = true } futures.workspace = true pgwire = { workspace = true, features = ["server-api"] } postgres-types.workspace = true From 5d08e8e44cd532884080e85f4712702762ebbc10 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 16 Jun 2025 22:04:15 -0700 Subject: [PATCH 2/3] refactor: move datafusion encoding logic into arrow-pg --- .gitignore | 3 +- Cargo.lock | 2 +- arrow-pg/Cargo.toml | 11 +++---- arrow-pg/src/datatypes.rs | 10 +++++-- .../src/datatypes/df.rs | 6 ++-- arrow-pg/src/encoder.rs | 6 ++-- arrow-pg/src/lib.rs | 5 ++++ arrow-pg/src/list_encoder.rs | 29 +++++++++++++++---- arrow-pg/src/row_encoder.rs | 4 +++ arrow-pg/src/struct_encoder.rs | 4 +++ datafusion-postgres/Cargo.toml | 2 +- datafusion-postgres/src/handlers.rs | 9 +++--- datafusion-postgres/src/lib.rs | 1 - 13 files changed, 65 insertions(+), 27 deletions(-) rename datafusion-postgres/src/datatypes.rs => arrow-pg/src/datatypes/df.rs (97%) diff --git a/.gitignore b/.gitignore index a7bba04..cabdeda 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target .direnv .envrc -.vscode \ No newline at end of file +.vscode +.aider* diff --git a/Cargo.lock b/Cargo.lock index b06096a..f6f814d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -358,6 +358,7 @@ dependencies = [ "arrow", "bytes", "chrono", + "datafusion", "futures", "pgwire", "postgres-types", @@ -1502,7 +1503,6 @@ dependencies = [ "datafusion", "datafusion-postgres", "env_logger", - "pgwire", "structopt", "tokio", ] diff --git a/arrow-pg/Cargo.toml b/arrow-pg/Cargo.toml index 2599d4b..030477a 100644 --- a/arrow-pg/Cargo.toml +++ b/arrow-pg/Cargo.toml @@ -13,14 +13,15 @@ readme = "../README.md" rust-version.workspace = true [features] -default = [] -datafusion = ["dep:datafusion-core"] +default = ["arrow"] +arrow = ["dep:arrow"] +datafusion = ["dep:datafusion"] [dependencies] -arrow.workspace = true +arrow = { workspace = true, optional = true } bytes.workspace = true -chrono.workspace = true -datafusion-core = { workspace = true, optional = true } +chrono.workspace = true +datafusion = { workspace = true, optional = true } futures.workspace = true pgwire = { workspace = true, features = ["server-api"] } postgres-types.workspace = true diff --git a/arrow-pg/src/datatypes.rs b/arrow-pg/src/datatypes.rs index 06dafe2..6ec61e5 100644 --- a/arrow-pg/src/datatypes.rs +++ b/arrow-pg/src/datatypes.rs @@ -1,7 +1,10 @@ use std::sync::Arc; -use arrow::datatypes::*; -use arrow::record_batch::RecordBatch; +#[cfg(not(feature = "datafusion"))] +use arrow::{datatypes::*, record_batch::RecordBatch}; +#[cfg(feature = "datafusion")] +use datafusion::arrow::{datatypes::*, record_batch::RecordBatch}; + use pgwire::api::portal::Format; use pgwire::api::results::FieldInfo; use pgwire::api::Type; @@ -11,6 +14,9 @@ use postgres_types::Kind; use crate::row_encoder::RowEncoder; +#[cfg(feature = "datafusion")] +pub mod df; + pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult { Ok(match arrow_type { DataType::Null => Type::UNKNOWN, diff --git a/datafusion-postgres/src/datatypes.rs b/arrow-pg/src/datatypes/df.rs similarity index 97% rename from datafusion-postgres/src/datatypes.rs rename to arrow-pg/src/datatypes/df.rs index cbae22b..ba6ce73 100644 --- a/datafusion-postgres/src/datatypes.rs +++ b/arrow-pg/src/datatypes/df.rs @@ -17,9 +17,9 @@ use pgwire::messages::data::DataRow; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; -use arrow_pg::datatypes::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type}; +use super::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type}; -pub(crate) async fn encode_dataframe<'a>( +pub async fn encode_dataframe<'a>( df: DataFrame, format: &Format, ) -> PgWireResult> { @@ -51,7 +51,7 @@ pub(crate) async fn encode_dataframe<'a>( /// If the type is empty or unknown, we fallback to datafusion inferenced type /// from `inferenced_types`. /// An error will be raised when neither sources can provide type information. -pub(crate) fn deserialize_parameters( +pub fn deserialize_parameters( portal: &Portal, inferenced_types: &[Option<&DataType>], ) -> PgWireResult diff --git a/arrow-pg/src/encoder.rs b/arrow-pg/src/encoder.rs index cda5ba7..9b3b930 100644 --- a/arrow-pg/src/encoder.rs +++ b/arrow-pg/src/encoder.rs @@ -3,11 +3,13 @@ use std::io::Write; use std::str::FromStr; use std::sync::Arc; -use arrow::array::*; -use arrow::datatypes::*; +#[cfg(not(feature = "datafusion"))] +use arrow::{array::*, datatypes::*}; use bytes::BufMut; use bytes::BytesMut; use chrono::{NaiveDate, NaiveDateTime}; +#[cfg(feature = "datafusion")] +use datafusion::arrow::{array::*, datatypes::*}; use pgwire::api::results::DataRowEncoder; use pgwire::api::results::FieldFormat; use pgwire::error::PgWireError; diff --git a/arrow-pg/src/lib.rs b/arrow-pg/src/lib.rs index dd77bce..e33a375 100644 --- a/arrow-pg/src/lib.rs +++ b/arrow-pg/src/lib.rs @@ -1,3 +1,8 @@ +//! Arrow data encoding and type mapping for Postgres(pgwire). + +// #[cfg(all(feature = "arrow", feature = "datafusion"))] +// compile_error!("Feature arrow and datafusion cannot be enabled at same time. Use no-default-features when activating datafusion"); + pub mod datatypes; pub mod encoder; mod error; diff --git a/arrow-pg/src/list_encoder.rs b/arrow-pg/src/list_encoder.rs index 766da3f..157bd71 100644 --- a/arrow-pg/src/list_encoder.rs +++ b/arrow-pg/src/list_encoder.rs @@ -1,12 +1,13 @@ use std::{str::FromStr, sync::Arc}; -use arrow::array::{ - timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, - LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, -}; +#[cfg(not(feature = "datafusion"))] use arrow::{ + array::{ + timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, + LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + }, datatypes::{ DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, @@ -14,6 +15,22 @@ use arrow::{ }, temporal_conversions::{as_date, as_time}, }; +#[cfg(feature = "datafusion")] +use datafusion::arrow::{ + array::{ + timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, + LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + }, + datatypes::{ + DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + }, + temporal_conversions::{as_date, as_time}, +}; + use bytes::{BufMut, BytesMut}; use chrono::{DateTime, TimeZone, Utc}; use pgwire::api::results::FieldFormat; diff --git a/arrow-pg/src/row_encoder.rs b/arrow-pg/src/row_encoder.rs index 3eab8c7..145c9ab 100644 --- a/arrow-pg/src/row_encoder.rs +++ b/arrow-pg/src/row_encoder.rs @@ -1,6 +1,10 @@ use std::sync::Arc; +#[cfg(not(feature = "datafusion"))] use arrow::array::RecordBatch; +#[cfg(feature = "datafusion")] +use datafusion::arrow::array::RecordBatch; + use pgwire::{ api::results::{DataRowEncoder, FieldInfo}, error::PgWireResult, diff --git a/arrow-pg/src/struct_encoder.rs b/arrow-pg/src/struct_encoder.rs index 96c9467..49fce1b 100644 --- a/arrow-pg/src/struct_encoder.rs +++ b/arrow-pg/src/struct_encoder.rs @@ -1,6 +1,10 @@ use std::sync::Arc; +#[cfg(not(feature = "datafusion"))] use arrow::array::{Array, StructArray}; +#[cfg(feature = "datafusion")] +use datafusion::arrow::array::{Array, StructArray}; + use bytes::{BufMut, BytesMut}; use pgwire::api::results::FieldFormat; use pgwire::error::PgWireResult; diff --git a/datafusion-postgres/Cargo.toml b/datafusion-postgres/Cargo.toml index 1a1bb2e..d4e79af 100644 --- a/datafusion-postgres/Cargo.toml +++ b/datafusion-postgres/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" rust-version.workspace = true [dependencies] -arrow-pg = { path = "../arrow-pg", version = "0.1.1" } +arrow-pg = { path = "../arrow-pg", version = "0.1.1", default-features = false, features = ["datafusion"] } bytes.workspace = true async-trait = "0.1" chrono.workspace = true diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 1b122c1..5cb94f7 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -19,7 +19,7 @@ use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type}; use pgwire::error::{PgWireError, PgWireResult}; use tokio::sync::Mutex; -use crate::datatypes; +use arrow_pg::datatypes::df; use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type}; pub struct HandlerFactory(pub Arc); @@ -213,7 +213,7 @@ impl SimpleQueryHandler for DfSessionService { Ok(vec![Response::Execution(tag)]) } else { // For non-INSERT queries, return a regular Query response - let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?; + let resp = df::encode_dataframe(df, &Format::UnifiedText).await?; Ok(vec![Response::Query(resp)]) } } @@ -304,8 +304,7 @@ impl ExtendedQueryHandler for DfSessionService { let param_types = plan .get_parameter_types() .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - let param_values = - datatypes::deserialize_parameters(portal, &ordered_param_types(¶m_types))?; // Fixed: Use ¶m_types + let param_values = df::deserialize_parameters(portal, &ordered_param_types(¶m_types))?; // Fixed: Use ¶m_types let plan = plan .clone() .replace_params_with_values(¶m_values) @@ -315,7 +314,7 @@ impl ExtendedQueryHandler for DfSessionService { .execute_logical_plan(plan) .await .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - let resp = datatypes::encode_dataframe(dataframe, &portal.result_column_format).await?; + let resp = df::encode_dataframe(dataframe, &portal.result_column_format).await?; Ok(Response::Query(resp)) } } diff --git a/datafusion-postgres/src/lib.rs b/datafusion-postgres/src/lib.rs index db957c5..a771695 100644 --- a/datafusion-postgres/src/lib.rs +++ b/datafusion-postgres/src/lib.rs @@ -1,4 +1,3 @@ -mod datatypes; mod handlers; pub mod pg_catalog; From 9a79ca865fd1f9fd03cff3343c06976e3b88ee50 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 16 Jun 2025 22:09:14 -0700 Subject: [PATCH 3/3] ci: add test for cargo-pg default feature Signed-off-by: Ning Sun --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a39451c..3f063b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,6 +53,9 @@ jobs: override: true - name: Build and run tests run: cargo test --all-features + - name: Test arrow-pg default features + working-directory: arrow-pg + run: cargo test integration: name: Integration tests