From 69e5793d8e3491f164006b8af1c57650ba9273a3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 17:24:08 -0400 Subject: [PATCH 1/2] Prototype new DFSchema implementation --- datafusion/common/src/dfschema.rs | 561 ++++++++++++++---------------- datafusion/common/src/lib.rs | 2 +- 2 files changed, 258 insertions(+), 305 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index e16acbfedc81..ce1cd09f047a 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -33,17 +33,36 @@ use crate::{ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; +use sqlparser::ast::Table; /// A reference-counted reference to a `DFSchema`. pub type DFSchemaRef = Arc; /// DFSchema wraps an Arrow schema and adds relation names +/// +/// # Example +/// ``` +/// Creating a DF schema from an arrow schema +/// ``` +/// +/// ``` +/// Converting from DF schema to arrow schema +/// ``` +/// +/// ``` +/// Iterating over qualified fields +/// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { - /// Fields - fields: Vec, - /// Additional metadata in form of key value pairs - metadata: HashMap, + /// Inner arrow schema + inner: SchemaRef, + + /// Optional qualifiers for each column in this schema. In the same order as + /// the `self.inner.fields()` + /// + /// TODO avoid redundant copies when possible (encapsulate into a new structure) + field_qualifiers: Vec>, + /// Stores functional dependencies in the schema. functional_dependencies: FunctionalDependencies, } @@ -52,27 +71,93 @@ impl DFSchema { /// Creates an empty `DFSchema` pub fn empty() -> Self { Self { - fields: vec![], - metadata: HashMap::new(), + inner: Arc::new(Schema::new([])), + field_qualifiers: vec![], functional_dependencies: FunctionalDependencies::empty(), } } - #[deprecated(since = "7.0.0", note = "please use `new_with_metadata` instead")] - /// Create a new `DFSchema` - pub fn new(fields: Vec) -> Result { - Self::new_with_metadata(fields, HashMap::new()) + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier + /// TODO rename to `new_from_...`?? with deprecation notice? + pub fn from_qualified_schema<'a>( + qualifier: impl Into>, + schema: &SchemaRef, + ) -> Result { + let qualifier = qualifier.into(); + let new_self = Self { + inner: schema.clone(), + field_qualifiers: vec![Some(qualifier.clone()); schema.fields().len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + new_self.check_names()?; + Ok(new_self) } - /// Create a new `DFSchema` - pub fn new_with_metadata( - fields: Vec, - metadata: HashMap, - ) -> Result { + /// Create a `DFSchema` from an Arrow schema where all the fields have no qualifier + /// /// TODO rename to `new_from_...`?? with deprecation notice? + pub fn from_unqualified_schema(schema: &SchemaRef) -> Result { + let new_self = Self { + inner: schema.clone(), + field_qualifiers: vec![None; schema.fields().len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + new_self.check_names()?; + Ok(new_self) + } + + // /// Create a new `DFSchema` + // pub fn new_with_metadata( + // fields: Vec, + // metadata: HashMap, + // ) -> Result { + // let mut qualified_names = HashSet::new(); + // let mut unqualified_names = HashSet::new(); + // + // for field in &fields { + // if let Some(qualifier) = field.qualifier() { + // qualified_names.insert((qualifier, field.name())); + // } else if !unqualified_names.insert(field.name()) { + // return Err(DataFusionError::SchemaError( + // SchemaError::DuplicateUnqualifiedField { + // name: field.name().to_string(), + // }, + // )); + // } + // } + // + // // check for mix of qualified and unqualified field with same unqualified name + // // note that we need to sort the contents of the HashSet first so that errors are + // // deterministic + // let mut qualified_names = qualified_names + // .iter() + // .map(|(l, r)| (l.to_owned(), r.to_owned())) + // .collect::>(); + // qualified_names.sort(); + // for (qualifier, name) in &qualified_names { + // if unqualified_names.contains(name) { + // return Err(DataFusionError::SchemaError( + // SchemaError::AmbiguousReference { + // field: Column { + // relation: Some((*qualifier).clone()), + // name: name.to_string(), + // }, + // }, + // )); + // } + // } + // Ok(Self { + // fields, + // metadata, + // functional_dependencies: FunctionalDependencies::empty(), + // }) + // } + + /// Validates that the fields in self have no duplicate names + fn check_names(&self) -> Result<()> { let mut qualified_names = HashSet::new(); let mut unqualified_names = HashSet::new(); - for field in &fields { + for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) { if let Some(qualifier) = field.qualifier() { qualified_names.insert((qualifier, field.name())); } else if !unqualified_names.insert(field.name()) { @@ -104,27 +189,7 @@ impl DFSchema { )); } } - Ok(Self { - fields, - metadata, - functional_dependencies: FunctionalDependencies::empty(), - }) - } - - /// Create a `DFSchema` from an Arrow schema and a given qualifier - pub fn try_from_qualified_schema<'a>( - qualifier: impl Into>, - schema: &Schema, - ) -> Result { - let qualifier = qualifier.into(); - Self::new_with_metadata( - schema - .fields() - .iter() - .map(|f| DFField::from_qualified(qualifier.clone(), f.clone())) - .collect(), - schema.metadata().clone(), - ) + Ok(()) } /// Assigns functional dependencies. @@ -138,21 +203,40 @@ impl DFSchema { /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. - pub fn join(&self, schema: &DFSchema) -> Result { - let mut fields = self.fields.clone(); - let mut metadata = self.metadata.clone(); - fields.extend_from_slice(schema.fields().as_slice()); - metadata.extend(schema.metadata.clone()); - Self::new_with_metadata(fields, metadata) + pub fn join(&self, other: &DFSchema) -> Result { + let (new_field_qualifiers, new_fields) = self + .iter() + .chain(other.iter()) + .map(|qualifier, field| (qualifier.as_ref().clone(), field.clone())) + .unzip(); + + let mut new_metadata = self.inner.metadata.clone(); + new_metadata.extend(other.inner.metadata.clone()); + + // TODO functional dependencies?? + + let new_self = Self { + inner: Arc::new(Schema::new_with_metadata(new_fields, new_metadata)), + field_qualifiers: new_field_qualifiers, + }; + new_self.check_names()?; + Ok(new_self) } /// Modify this schema by appending the fields from the supplied schema, ignoring any /// duplicate fields. - pub fn merge(&mut self, other_schema: &DFSchema) { - if other_schema.fields.is_empty() { + pub fn merge(&mut self, other: &DFSchema) { + if other.inner.fields().is_empty() { return; } - for field in other_schema.fields() { + + let (new_field_qualifiers, new_fields) = self + .iter() + .chain(other.qualified_field_iter()) + .map(|qualifier, field| (qualifier.as_ref().clone(), field.clone())) + .unzip(); + + for field in other.fields() { // skip duplicate columns let duplicated_field = match field.qualifier() { Some(q) => self.field_with_name(Some(q), field.name()).is_ok(), @@ -163,44 +247,55 @@ impl DFSchema { self.fields.push(field.clone()); } } - self.metadata.extend(other_schema.metadata.clone()) - } - - /// Get a list of fields - pub fn fields(&self) -> &Vec { - &self.fields + self.metadata.extend(other.metadata.clone()) } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector - pub fn field(&self, i: usize) -> &DFField { - &self.fields[i] - } - - #[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")] - /// Find the index of the column with the given unqualified name - pub fn index_of(&self, name: &str) -> Result { - for i in 0..self.fields.len() { - if self.fields[i].name() == name { - return Ok(i); - } else { - // Now that `index_of` is deprecated an error is thrown if - // a fully qualified field name is provided. - match &self.fields[i].qualifier { - Some(qualifier) => { - if (qualifier.to_string() + "." + self.fields[i].name()) == name { - return _plan_err!( - "Fully qualified field name '{name}' was supplied to `index_of` \ - which is deprecated. Please use `index_of_column_by_name` instead" - ); - } - } - None => (), - } - } - } + pub fn field(&self, i: usize) -> &Field { + self.inner.field(i) + } - Err(unqualified_field_not_found(name, self)) + /// returns an iterator for each field and its qualified name + fn iter<'a>( + &'a self, + ) -> impl Iterator, &'a FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.inner.fields().iter()) + .map(|qualifier, field| (qualifier.as_ref(), field)) + } + + /// return true if the qualified field is equal to an unqualified field + fn check_unqualified(qualifier1: &TableReference, name1: &str, name2: &str) -> bool { + // the original field may be aliased with a name that matches the + // original qualified name + let column = Column::from_qualified_name(name2); + if let Column { + relation: Some(r), + name: column_name, + } = column + { + // check qualifier and name matches + &r == qualifier1 && name1 == name2 + } else { + // no qualifier in name2, so can't match + false + } + } + /// returns true if the two qualified fields refer to the same field + fn field_eq( + qualifier1: Option<&TableReference>, + name1: &str, + qualifier2: Option<&TableReference>, + name2: &str, + ) -> bool { + match (qualifier1, qualifier2) { + (Some(q1), Some(q2)) => q1.resolved_eq(q2) && name1 == name2, + (Some(q1), None) => Self::check_unqualified(q1, name1, name2), + (None, Some(q2)) => Self::check_unqualified(q2, name2, name1), + (None, None) => name1 == name2, + } } pub fn index_of_column_by_name( @@ -209,31 +304,10 @@ impl DFSchema { name: &str, ) -> Result> { let mut matches = self - .fields .iter() .enumerate() - .filter(|(_, field)| match (qualifier, &field.qualifier) { - // field to lookup is qualified. - // current field is qualified and not shared between relations, compare both - // qualifier and name. - (Some(q), Some(field_q)) => { - q.resolved_eq(field_q) && field.name() == name - } - // field to lookup is qualified but current field is unqualified. - (Some(qq), None) => { - // the original field may now be aliased with a name that matches the - // original qualified name - let column = Column::from_qualified_name(field.name()); - match column { - Column { - relation: Some(r), - name: column_name, - } => &r == qq && column_name == name, - _ => false, - } - } - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => field.name() == name, + .filter(|(_, (field_qualifier, field))| { + Self::field_eq(qualifier, name, *field_qualifier, field.name()) }) .map(|(idx, _)| idx); Ok(matches.next()) @@ -256,7 +330,7 @@ impl DFSchema { &self, qualifier: Option<&TableReference>, name: &str, - ) -> Result<&DFField> { + ) -> Result<&Field> { if let Some(qualifier) = qualifier { self.field_with_qualified_name(qualifier, name) } else { @@ -265,24 +339,43 @@ impl DFSchema { } /// Find all fields having the given qualifier - pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)) + pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> { + self.iter() + .filter_map(|(field_qualifier, field)| { + field_qualifier.and_then(|field_qualifier| { + if field_qualifier.eq(qualifier) { + Some(field) + } else { + None + } + }) + }) .collect() } /// Find all fields match the given name - pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.name() == name) + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.iter() + .filter_map(|(field_qualifier, field)| { + if field.name() == name { + Some(field) + } else { + None + } + }) .collect() } - /// Find the field with the given name - pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> { - let matches = self.fields_with_unqualified_name(name); + /// Find the field with the given name. + /// + /// Throws an error if the name is ambiguous (e.g. looking for `x` and the + /// schema has `A.x` and `B.x`. + pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { + let matches = self + .iter() + .filter(|(field_qualifier, field)| field.name() == name) + .collect(); + match matches.len() { 0 => Err(unqualified_field_not_found(name, self)), 1 => Ok(matches[0]), @@ -296,7 +389,7 @@ impl DFSchema { // one field without qualifier, we should return it. let fields_without_qualifier = matches .iter() - .filter(|f| f.qualifier.is_none()) + .filter(|(_, f)| f.qualifier.is_none()) .collect::>(); if fields_without_qualifier.len() == 1 { Ok(fields_without_qualifier[0]) @@ -314,21 +407,23 @@ impl DFSchema { } } - /// Find the field with the given qualified name + /// Find the field with the given qualified name, returning an error if not found pub fn field_with_qualified_name( &self, qualifier: &TableReference, name: &str, - ) -> Result<&DFField> { - let idx = self - .index_of_column_by_name(Some(qualifier), name)? - .ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, self))?; + ) -> Result<&Field> { + let item = self + .iter() + .find(|(q, f)| q == qualifier && f.name() == name) + .map(|(q, f)| f) + .next(); - Ok(self.field(idx)) + item.ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, self)) } /// Find the field with the given qualified column - pub fn field_from_column(&self, column: &Column) -> Result<&DFField> { + pub fn field_from_column(&self, column: &Column) -> Result<&Field> { match &column.relation { Some(r) => self.field_with_qualified_name(r, &column.name), None => self.field_with_unqualified_name(&column.name), @@ -337,7 +432,7 @@ impl DFSchema { /// Find if the field exists with the given name pub fn has_column_with_unqualified_name(&self, name: &str) -> bool { - self.fields().iter().any(|field| field.name() == name) + self.inner.fields().iter().any(|field| field.name() == name) } /// Find if the field exists with the given qualified name @@ -362,10 +457,11 @@ impl DFSchema { /// Check to see if unqualified field names matches field names in Arrow schema pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { - self.fields + self.inner + .fields() .iter() .zip(arrow_schema.fields().iter()) - .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name()) + .all(|(f1, f2)| f1.name() == f2.name()) } /// Check to see if fields in 2 Arrow schemas are compatible @@ -373,8 +469,7 @@ impl DFSchema { &self, arrow_schema: &Schema, ) -> Result<()> { - let self_arrow_schema: Schema = self.into(); - self_arrow_schema + self.inner .fields() .iter() .zip(arrow_schema.fields().iter()) @@ -397,14 +492,14 @@ impl DFSchema { /// This is a specialized version of Eq that ignores differences /// in nullability and metadata. pub fn equivalent_names_and_types(&self, other: &Self) -> bool { - if self.fields().len() != other.fields().len() { + if self.inner.fields().len() != other.inner.fields().len() { return false; } - let self_fields = self.fields().iter(); - let other_fields = other.fields().iter(); - self_fields.zip(other_fields).all(|(f1, f2)| { - f1.qualifier() == f2.qualifier() - && f1.name() == f2.name() + let self_fields = self.iter(); + let other_fields = other.iter(); + self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| { + q1 == q2 + && &&f1.name() == f2.name() && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) }) } @@ -462,35 +557,25 @@ impl DFSchema { } /// Strip all field qualifier in schema - pub fn strip_qualifiers(self) -> Self { - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| f.strip_qualifier()) - .collect(), - ..self - } + pub fn strip_qualifiers(mut self) -> Self { + self.field_qualifiers = vec![None; self.inner.fields().len()]; + self } /// Replace all field qualifier with new value in schema - pub fn replace_qualifier(self, qualifier: impl Into) -> Self { + pub fn replace_qualifier( + mut self, + qualifier: impl Into, + ) -> Self { let qualifier = qualifier.into(); - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| DFField::from_qualified(qualifier.clone(), f.field)) - .collect(), - ..self - } + self.field_qualifiers = vec![qualifier; self.inner.fields().len()]; + self } /// Get list of fully-qualified field names in this schema pub fn field_names(&self) -> Vec { - self.fields - .iter() - .map(|f| f.qualified_name()) + self.iter() + .map(|(q, f)| qualified_name(q, f)) .collect::>() } @@ -505,19 +590,27 @@ impl DFSchema { } } +fn qualified_name(qualifier: &Option, name: &str) -> String { + match qualifier { + Some(q) => format!("{}.{}", q, name), + None => name.to_string(), + } +} + impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.fields.into_iter().map(|f| f.field).collect(); - Schema::new_with_metadata(fields, df_schema.metadata) + match Arc::try_unwrap(df_schema.inner) { + Ok(schema) => schema, + Err(arc_schema) => arc_schema.as_ref().clone(), + } } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.fields.iter().map(|f| f.field.clone()).collect(); - Schema::new_with_metadata(fields, df_schema.metadata.clone()) + df_schema.inner.as_ref().clone() } } @@ -525,14 +618,7 @@ impl From<&DFSchema> for Schema { impl TryFrom for DFSchema { type Error = DataFusionError; fn try_from(schema: Schema) -> Result { - Self::new_with_metadata( - schema - .fields() - .iter() - .map(|f| DFField::from(f.clone())) - .collect(), - schema.metadata().clone(), - ) + Self::from_unqualified_schema(&Arc::new(schema)) } } @@ -545,8 +631,8 @@ impl From for SchemaRef { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { - self.fields.hash(state); - self.metadata.len().hash(state); // HashMap is not hashable + self.inner.fields().hash(state); + self.inner.metadata().len().hash(state); // HashMap is not hashable } } @@ -592,12 +678,11 @@ impl Display for DFSchema { write!( f, "fields:[{}], metadata:{:?}", - self.fields - .iter() - .map(|field| field.qualified_name()) + self.iter() + .map(|(q, f)| qualified_name(q, f)) .collect::>() .join(", "), - self.metadata + self.inner.metadata(), ) } } @@ -647,138 +732,6 @@ impl ExprSchema for DFSchema { } } -/// DFField wraps an Arrow field and adds an optional qualifier -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DFField { - /// Optional qualifier (usually a table or relation name) - qualifier: Option, - /// Arrow field definition - field: FieldRef, -} - -impl DFField { - /// Creates a new `DFField` - pub fn new>( - qualifier: Option, - name: &str, - data_type: DataType, - nullable: bool, - ) -> Self { - DFField { - qualifier: qualifier.map(|s| s.into()), - field: Arc::new(Field::new(name, data_type, nullable)), - } - } - - /// Convenience method for creating new `DFField` without a qualifier - pub fn new_unqualified(name: &str, data_type: DataType, nullable: bool) -> Self { - DFField { - qualifier: None, - field: Arc::new(Field::new(name, data_type, nullable)), - } - } - - /// Create a qualified field from an existing Arrow field - pub fn from_qualified<'a>( - qualifier: impl Into>, - field: impl Into, - ) -> Self { - Self { - qualifier: Some(qualifier.into().to_owned_reference()), - field: field.into(), - } - } - - /// Returns an immutable reference to the `DFField`'s unqualified name - pub fn name(&self) -> &String { - self.field.name() - } - - /// Returns an immutable reference to the `DFField`'s data-type - pub fn data_type(&self) -> &DataType { - self.field.data_type() - } - - /// Indicates whether this `DFField` supports null values - pub fn is_nullable(&self) -> bool { - self.field.is_nullable() - } - - pub fn metadata(&self) -> &HashMap { - self.field.metadata() - } - - /// Returns a string to the `DFField`'s qualified name - pub fn qualified_name(&self) -> String { - if let Some(qualifier) = &self.qualifier { - format!("{}.{}", qualifier, self.field.name()) - } else { - self.field.name().to_owned() - } - } - - /// Builds a qualified column based on self - pub fn qualified_column(&self) -> Column { - Column { - relation: self.qualifier.clone(), - name: self.field.name().to_string(), - } - } - - /// Builds an unqualified column based on self - pub fn unqualified_column(&self) -> Column { - Column { - relation: None, - name: self.field.name().to_string(), - } - } - - /// Get the optional qualifier - pub fn qualifier(&self) -> Option<&OwnedTableReference> { - self.qualifier.as_ref() - } - - /// Get the arrow field - pub fn field(&self) -> &FieldRef { - &self.field - } - - /// Return field with qualifier stripped - pub fn strip_qualifier(mut self) -> Self { - self.qualifier = None; - self - } - - /// Return field with nullable specified - pub fn with_nullable(mut self, nullable: bool) -> Self { - let f = self.field().as_ref().clone().with_nullable(nullable); - self.field = f.into(); - self - } - - /// Return field with new metadata - pub fn with_metadata(mut self, metadata: HashMap) -> Self { - let f = self.field().as_ref().clone().with_metadata(metadata); - self.field = f.into(); - self - } -} - -impl From for DFField { - fn from(value: FieldRef) -> Self { - Self { - qualifier: None, - field: value, - } - } -} - -impl From for DFField { - fn from(value: Field) -> Self { - Self::from(Arc::new(value)) - } -} - /// DataFusion-specific extensions to [`Schema`]. pub trait SchemaExt { /// This is a specialized version of Eq that ignores differences diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 53c3cfddff8d..a566c86816da 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -43,7 +43,7 @@ pub mod utils; /// Reexport arrow crate pub use arrow; pub use column::Column; -pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema}; +pub use dfschema::{DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema}; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, From 51cd34e82fe3cf57e7e8f9df3c2af5e06e648929 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 17:25:29 -0400 Subject: [PATCH 2/2] hack --- datafusion/common/src/dfschema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ce1cd09f047a..6db847ef1671 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -233,7 +233,7 @@ impl DFSchema { let (new_field_qualifiers, new_fields) = self .iter() .chain(other.qualified_field_iter()) - .map(|qualifier, field| (qualifier.as_ref().clone(), field.clone())) + .map(|q, f| (q.as_ref().clone(), f.clone())) .unzip(); for field in other.fields() {