diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 195f1949c9..f56a2fc414 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,7 +52,7 @@ serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" url = "2" -uuid = "1.4.1" +uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index e4ae576d82..4d822e5a15 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -21,6 +21,8 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use derive_builder::UninitializedFieldError; + /// Result that is a wrapper of `Result` pub type Result = std::result::Result; @@ -192,6 +194,14 @@ impl std::error::Error for Error { } } +// We define a from implementation from builder Error to Iceberg Error +impl From for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, "Some fields of table metadata not inited") + .with_source(ufe) + } +} + impl Error { /// Create a new Error with error kind and message. pub fn new(kind: ErrorKind, message: impl Into) -> Self { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index f40b63e2ec..dd97ab2027 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -20,13 +20,13 @@ Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). The main struct here is [TableMetadataV2] which defines the data for a table. */ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, TableCreation}; use super::{ partition::PartitionSpec, @@ -41,43 +41,64 @@ static MAIN_BRANCH: &str = "main"; static DEFAULT_SPEC_ID: i32 = 0; static DEFAULT_SORT_ORDER_ID: i64 = 0; -#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone, Builder)] #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] +#[builder( + setter(prefix = "with"), + build_fn(validate = "Self::validate", error = "Error") +)] /// Fields for the version 2 of the table metadata. pub struct TableMetadata { /// Integer Version for the format. + #[builder(default = "FormatVersion::V2")] format_version: FormatVersion, /// A UUID that identifies the table + #[builder(default = "Uuid::new_v4()")] table_uuid: Uuid, /// Location tables base location + #[builder(setter(into))] location: String, /// The tables highest sequence number + #[builder(default, setter(custom))] last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. + #[builder(default = "Self::current_time_ms()", setter(custom))] last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. + #[builder(setter(custom))] last_column_id: i32, /// A list of schemas, stored as objects with schema-id. + #[builder(setter(custom))] schemas: HashMap>, /// ID of the table’s current schema. + #[builder(setter(custom))] current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. + #[builder( + default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])", + setter(custom) + )] partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. + #[builder(default = "DEFAULT_SPEC_ID", setter(custom))] default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. + #[builder(default = "-1", setter(custom))] last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. + #[builder(default)] properties: HashMap, /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. + #[builder(default = "None", setter(custom))] current_snapshot_id: Option, ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. + #[builder(default = "None", setter(custom))] snapshots: Option>>, /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id @@ -85,6 +106,7 @@ pub struct TableMetadata { /// and the new current-snapshot-id. When snapshots are expired from /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. + #[builder(default, setter(custom))] snapshot_log: Vec, /// A list (optional) of timestamp and metadata file location pairs @@ -93,22 +115,271 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap, } +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + self + } + + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + self.default_spec_id = Some(partition_spec.spec_id); + self.with_partition_spec(partition_spec) + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_branch_snapshot(&mut self, branch: String, snapshot: Snapshot) -> &mut Self { + if Some(snapshot.sequence_number()) > self.last_sequence_number { + self.last_sequence_number = Some(snapshot.sequence_number()); + } + if self.last_updated_ms < Some(snapshot.timestamp()) { + self.last_updated_ms = Some(snapshot.timestamp()); + } + self.with_ref( + branch, + SnapshotReference::new( + snapshot.snapshot_id(), + SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + ), + ); + if let Some(Some(map)) = self.snapshots.as_mut() { + map.insert(snapshot.snapshot_id(), Arc::new(snapshot)); + } else { + self.snapshots = Some(Some(HashMap::from([( + snapshot.snapshot_id(), + Arc::new(snapshot), + )]))); + } + self + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_snapshot(&mut self, snapshot: Snapshot) -> &mut Self { + self.with_branch_snapshot("main".to_string(), snapshot) + } + + /// Add or replace a sort_order + /// sort_order : SortOrder to be added or replaced + pub fn with_sort_order(&mut self, sort_order: SortOrder) -> &mut Self { + if let Some(map) = self.sort_orders.as_mut() { + map.insert(sort_order.order_id, sort_order); + } else { + self.sort_orders = Some(HashMap::from([(sort_order.order_id, sort_order)])); + }; + self + } + + /// Add or replace a sort_order and set the default sort order to this one. + /// sort_order : SortOrder to be added or replaced + pub fn with_default_sort_order(&mut self, sort_order: SortOrder) -> &mut Self { + self.default_sort_order_id = Some(sort_order.order_id); + self.with_sort_order(sort_order) + } + + /// Check if the default key exists in the map. + /// Verify incoherent behavior and throw Error if : + /// - the map is not defined but the key exists + /// - the default key exists but there is no map + /// except if key is a default value + /// Params : + /// - key : the key to look for in the map + /// - map : the map to scan + /// - default : default value for the key if any + /// - field : map name for throwing a better error + fn check_id_in_map( + key: Option, + map: &Option>, + field: &str, + ) -> Result<(), Error> { + if let Some(k) = key { + if let Some(m) = map { + // Key and map exists, let check if entry for the given key exists in map + let entry = m.get(&k); + if entry.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Default {} id {} is provided but there are no corresponding entry in {}", + field, k, field + ), + )) + } else { + Ok(()) + } + } else { + // Key exist and map does not + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Default {} id {} is provided but there are no {} defined", + field, k, field + ), + )) + } + } else if map.is_some() { + // Key does not exist but map does + Err(Error::new( + ErrorKind::DataInvalid, + format!("Default id is not provided for the field {}", field), + )) + } else { + // Key and map does not exist, builder will handle required fields + Ok(()) + } + } + + /// validate the content of the TableMetada Struct + fn validate(&self) -> Result<(), Error> { + // check default key and maps are coherents + Self::check_id_in_map(self.default_spec_id, &self.partition_specs, "partitions") + .and(Self::check_id_in_map( + self.current_schema_id, + &self.schemas, + "schemas", + )) + .and(Self::check_id_in_map( + self.default_sort_order_id, + &self.sort_orders, + "sort_order", + )) + .and(Self::check_id_in_map( + self.current_snapshot_id.unwrap_or(None), + self.snapshots.as_ref().unwrap_or(&None), + "snapshots", + )) + } +} + impl TableMetadata { + /// Create partition spec builer + pub fn builder() -> TableMetadataBuilder { + TableMetadataBuilder::default() + } + /// Get current schema #[inline] pub fn current_schema(&self) -> Result, Error> { @@ -392,7 +663,10 @@ pub(super) mod _serde { snapshot_log: value.snapshot_log.unwrap_or_default(), metadata_log: value.metadata_log.unwrap_or_default(), sort_orders: HashMap::from_iter( - value.sort_orders.into_iter().map(|x| (x.order_id, x)), + value + .sort_orders + .into_iter() + .map(|x: SortOrder| (x.order_id, x)), ), default_sort_order_id: value.default_sort_order_id, refs: value.refs.unwrap_or_else(|| { @@ -677,10 +951,13 @@ mod tests { use pretty_assertions::assert_eq; - use crate::spec::{ - table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, - PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, - SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, + use crate::{ + spec::{ + table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, + PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, + SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, + }, + TableCreation, }; use super::{FormatVersion, MetadataLog, SnapshotLog}; @@ -995,48 +1272,10 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap(); - let schema1 = Schema::builder() - .with_schema_id(0) - .with_fields(vec![Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - ))]) - .build() - .unwrap(); + let schema1 = generate_schema(0, 1, None); + let schema2 = generate_schema(1, 3, Some(vec![1, 2])); - let schema2 = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .with_identifier_field_ids(vec![1, 2]) - .build() - .unwrap(); - - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let partition_spec = generate_partition_spec(0, 1); let sort_order = SortOrder::builder() .with_order_id(3) @@ -1137,37 +1376,9 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap(); - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .build() - .unwrap(); + let schema = generate_schema(0, 3, None); - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let partition_spec = generate_partition_spec(0, 1); let sort_order = SortOrder::builder() .with_order_id(3) @@ -1216,37 +1427,8 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap(); - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .build() - .unwrap(); - - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let schema = generate_schema(0, 3, None); + let partition_spec = generate_partition_spec(0, 1); let expected = TableMetadata { format_version: FormatVersion::V1, @@ -1367,4 +1549,101 @@ mod tests { "data did not match any variant of untagged enum TableMetadataEnum" ) } + + fn generate_schema(id: i32, length: usize, identifier_fields_id: Option>) -> Schema { + let mut test_data: Vec<(i32, &str, PrimitiveType, Option<&str>)> = vec![ + (1, "x", PrimitiveType::Long, None), + (2, "y", PrimitiveType::Long, Some("comment")), + (3, "z", PrimitiveType::Long, None), + ]; + test_data.truncate(length); + let data: Vec> = test_data + .iter() + .map(|x| { + ( + NestedField::required(x.0, x.1, Type::Primitive(x.2.clone())), + x.3, + ) + }) + .map(|x| { + if x.1.is_some() { + x.0.with_doc(x.1.unwrap()) + } else { + x.0 + } + }) + .map(|x| Arc::new(x)) + .collect(); + Schema::builder() + .with_schema_id(id) + .with_fields(data) + .with_identifier_field_ids(identifier_fields_id.unwrap_or_default()) + .build() + .unwrap() + } + + fn generate_partition_spec(id: i32, length: usize) -> PartitionSpec { + let mut test_data = vec![ + ("x", Transform::Identity, 1, 1000), + ("y", Transform::Identity, 2, 1001), + ("z", Transform::Identity, 3, 1002), + ]; + test_data.truncate(length); + let data: Vec = test_data + .iter() + .map(|x| PartitionField { + name: x.0.to_string(), + transform: x.1, + source_id: x.2, + field_id: x.3, + }) + .collect(); + PartitionSpec::builder() + .with_spec_id(id) + .with_fields(data) + .build() + .unwrap() + } + + #[test] + fn test_metadata_builder() { + let schema = generate_schema(0, 3, None); + let partition_spec = generate_partition_spec(0, 1); + + let built_table_metadata = TableMetadata::builder() + .with_location("s3://bucket/test/location") + .with_current_schema(schema) + .with_default_partition_spec(partition_spec) + .build() + .unwrap(); + + assert_eq!(built_table_metadata.format_version, FormatVersion::V2); + assert_eq!( + built_table_metadata.location, + "s3://bucket/test/location".to_string() + ); + assert_eq!(built_table_metadata.last_column_id, 3); + assert_eq!(built_table_metadata.current_schema_id, 0); + assert_eq!(built_table_metadata.default_spec_id, 0); + assert_eq!(built_table_metadata.last_partition_id, 1000); + assert_eq!( + built_table_metadata.refs.get("main").unwrap().snapshot_id, + -1 + ); + assert_eq!(built_table_metadata.snapshots, None); + + let table_creation = TableCreation { + name: "test".to_string(), + location: "s3://bucket/test/location".to_string(), + schema: generate_schema(0, 3, None), + partition_spec: Some(generate_partition_spec(0, 1)), + sort_order: SortOrder::builder().with_order_id(0).build().unwrap(), + properties: HashMap::new(), + }; + let built_table_metadata = TableMetadata::builder() + .from_table_creation(table_creation) + .build(); + + assert!(built_table_metadata.is_ok()) + } }