diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d18dd38b9b44..b5f535b38d3c 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -394,9 +394,24 @@ mod tests { use std::path::Path; use super::*; - use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; + use datafusion_proto::bytes::{ + logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes, + physical_plan_to_bytes, + }; + + fn get_tpch_data_path() -> Result { + let path = + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); + if !Path::new(&path).exists() { + return Err(DataFusionError::Execution(format!( + "Benchmark data not found (set TPCH_DATA env var to override): {}", + path + ))); + } + Ok(path) + } - async fn serde_round_trip(query: usize) -> Result<()> { + async fn round_trip_logical_plan(query: usize) -> Result<()> { let ctx = SessionContext::default(); let path = get_tpch_data_path()?; let opt = DataFusionBenchmarkOpt { @@ -425,125 +440,99 @@ mod tests { Ok(()) } - #[tokio::test] - async fn serde_q1() -> Result<()> { - serde_round_trip(1).await - } - - #[tokio::test] - async fn serde_q2() -> Result<()> { - serde_round_trip(2).await - } - - #[tokio::test] - async fn serde_q3() -> Result<()> { - serde_round_trip(3).await - } - - #[tokio::test] - async fn serde_q4() -> Result<()> { - serde_round_trip(4).await - } - - #[tokio::test] - async fn serde_q5() -> Result<()> { - serde_round_trip(5).await - } - - #[tokio::test] - async fn serde_q6() -> Result<()> { - serde_round_trip(6).await - } - - #[tokio::test] - async fn serde_q7() -> Result<()> { - serde_round_trip(7).await - } - - #[tokio::test] - async fn serde_q8() -> Result<()> { - serde_round_trip(8).await - } - - #[tokio::test] - async fn serde_q9() -> Result<()> { - serde_round_trip(9).await - } - - #[tokio::test] - async fn serde_q10() -> Result<()> { - serde_round_trip(10).await - } - - #[tokio::test] - async fn serde_q11() -> Result<()> { - serde_round_trip(11).await - } - - #[tokio::test] - async fn serde_q12() -> Result<()> { - serde_round_trip(12).await - } - - #[tokio::test] - async fn serde_q13() -> Result<()> { - serde_round_trip(13).await - } - - #[tokio::test] - async fn serde_q14() -> Result<()> { - serde_round_trip(14).await - } - - #[tokio::test] - async fn serde_q15() -> Result<()> { - serde_round_trip(15).await - } - - #[tokio::test] - async fn serde_q16() -> Result<()> { - serde_round_trip(16).await - } - - #[tokio::test] - async fn serde_q17() -> Result<()> { - serde_round_trip(17).await - } - - #[tokio::test] - async fn serde_q18() -> Result<()> { - serde_round_trip(18).await - } - - #[tokio::test] - async fn serde_q19() -> Result<()> { - serde_round_trip(19).await - } - - #[tokio::test] - async fn serde_q20() -> Result<()> { - serde_round_trip(20).await + async fn round_trip_physical_plan(query: usize) -> Result<()> { + let ctx = SessionContext::default(); + let path = get_tpch_data_path()?; + let opt = DataFusionBenchmarkOpt { + query: Some(query), + debug: false, + iterations: 1, + partitions: 2, + batch_size: 8192, + path: PathBuf::from(path.to_string()), + file_format: "tbl".to_string(), + mem_table: false, + output_path: None, + disable_statistics: false, + }; + register_tables(&opt, &ctx).await?; + let queries = get_query_sql(query)?; + for query in queries { + let plan = ctx.sql(&query).await?; + let plan = plan.create_physical_plan().await?; + let bytes = physical_plan_to_bytes(plan.clone())?; + let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; + let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false)); + let plan2_formatted = + format!("{}", displayable(plan2.as_ref()).indent(false)); + assert_eq!(plan_formatted, plan2_formatted); + } + Ok(()) } - #[tokio::test] - async fn serde_q21() -> Result<()> { - serde_round_trip(21).await + macro_rules! test_round_trip_logical { + ($tn:ident, $query:expr) => { + #[tokio::test] + async fn $tn() -> Result<()> { + round_trip_logical_plan($query).await + } + }; } - #[tokio::test] - async fn serde_q22() -> Result<()> { - serde_round_trip(22).await + macro_rules! test_round_trip_physical { + ($tn:ident, $query:expr) => { + #[tokio::test] + async fn $tn() -> Result<()> { + round_trip_physical_plan($query).await + } + }; } - fn get_tpch_data_path() -> Result { - let path = - std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); - if !Path::new(&path).exists() { - return Err(DataFusionError::Execution(format!( - "Benchmark data not found (set TPCH_DATA env var to override): {}", - path - ))); - } - Ok(path) - } + // logical plan tests + test_round_trip_logical!(round_trip_logical_plan_q1, 1); + test_round_trip_logical!(round_trip_logical_plan_q2, 2); + test_round_trip_logical!(round_trip_logical_plan_q3, 3); + test_round_trip_logical!(round_trip_logical_plan_q4, 4); + test_round_trip_logical!(round_trip_logical_plan_q5, 5); + test_round_trip_logical!(round_trip_logical_plan_q6, 6); + test_round_trip_logical!(round_trip_logical_plan_q7, 7); + test_round_trip_logical!(round_trip_logical_plan_q8, 8); + test_round_trip_logical!(round_trip_logical_plan_q9, 9); + test_round_trip_logical!(round_trip_logical_plan_q10, 10); + test_round_trip_logical!(round_trip_logical_plan_q11, 11); + test_round_trip_logical!(round_trip_logical_plan_q12, 12); + test_round_trip_logical!(round_trip_logical_plan_q13, 13); + test_round_trip_logical!(round_trip_logical_plan_q14, 14); + test_round_trip_logical!(round_trip_logical_plan_q15, 15); + test_round_trip_logical!(round_trip_logical_plan_q16, 16); + test_round_trip_logical!(round_trip_logical_plan_q17, 17); + test_round_trip_logical!(round_trip_logical_plan_q18, 18); + test_round_trip_logical!(round_trip_logical_plan_q19, 19); + test_round_trip_logical!(round_trip_logical_plan_q20, 20); + test_round_trip_logical!(round_trip_logical_plan_q21, 21); + test_round_trip_logical!(round_trip_logical_plan_q22, 22); + + // physical plan tests + test_round_trip_physical!(round_trip_physical_plan_q1, 1); + test_round_trip_physical!(round_trip_physical_plan_q2, 2); + test_round_trip_physical!(round_trip_physical_plan_q3, 3); + test_round_trip_physical!(round_trip_physical_plan_q4, 4); + test_round_trip_physical!(round_trip_physical_plan_q5, 5); + test_round_trip_physical!(round_trip_physical_plan_q6, 6); + test_round_trip_physical!(round_trip_physical_plan_q7, 7); + test_round_trip_physical!(round_trip_physical_plan_q8, 8); + test_round_trip_physical!(round_trip_physical_plan_q9, 9); + test_round_trip_physical!(round_trip_physical_plan_q10, 10); + test_round_trip_physical!(round_trip_physical_plan_q11, 11); + test_round_trip_physical!(round_trip_physical_plan_q12, 12); + test_round_trip_physical!(round_trip_physical_plan_q13, 13); + test_round_trip_physical!(round_trip_physical_plan_q14, 14); + test_round_trip_physical!(round_trip_physical_plan_q15, 15); + test_round_trip_physical!(round_trip_physical_plan_q16, 16); + test_round_trip_physical!(round_trip_physical_plan_q17, 17); + test_round_trip_physical!(round_trip_physical_plan_q18, 18); + test_round_trip_physical!(round_trip_physical_plan_q19, 19); + test_round_trip_physical!(round_trip_physical_plan_q20, 20); + test_round_trip_physical!(round_trip_physical_plan_q21, 21); + test_round_trip_physical!(round_trip_physical_plan_q22, 22); }