diff --git a/Cargo.toml b/Cargo.toml
index dab988441c..5aaef032ee 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,4 +55,5 @@ tokio = { version = "1", features = ["macros"] }
typed-builder = "^0.18"
url = "2"
urlencoding = "2"
-uuid = "1.5.0"
+# We pin uuid's version to 1.5.0 because this bug: https://github.com/uuid-rs/uuid/issues/720
+uuid = "~1.5.0"
diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs
index 653cfd4f5e..59ccca0541 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -26,8 +26,10 @@ use serde::de::DeserializeOwned;
use typed_builder::TypedBuilder;
use urlencoding::encode;
-use crate::catalog::_serde::LoadTableResponse;
-use iceberg::io::{FileIO, FileIOBuilder};
+use crate::catalog::_serde::{
+ CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse,
+};
+use iceberg::io::FileIO;
use iceberg::table::Table;
use iceberg::Result;
use iceberg::{
@@ -308,13 +310,51 @@ impl Catalog for RestCatalog {
/// Create a new table inside the namespace.
async fn create_table(
&self,
- _namespace: &NamespaceIdent,
- _creation: TableCreation,
+ namespace: &NamespaceIdent,
+ creation: TableCreation,
) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Creating table not supported yet!",
- ))
+ let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
+
+ let request = self
+ .client
+ .0
+ .post(self.config.tables_endpoint(namespace))
+ .json(&CreateTableRequest {
+ name: creation.name,
+ location: creation.location,
+ schema: creation.schema,
+ partition_spec: creation.partition_spec,
+ write_order: creation.sort_order,
+ // We don't support stage create yet.
+ stage_create: None,
+ properties: if creation.properties.is_empty() {
+ None
+ } else {
+ Some(creation.properties)
+ },
+ })
+ .build()?;
+
+ let resp = self
+ .client
+ .query::(request)
+ .await?;
+
+ let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?;
+
+ let table = Table::builder()
+ .identifier(table_ident)
+ .file_io(file_io)
+ .metadata(resp.metadata)
+ .metadata_location(resp.metadata_location.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Metadata location missing in create table response!",
+ )
+ })?)
+ .build();
+
+ Ok(table)
}
/// Load table from the catalog.
@@ -330,20 +370,7 @@ impl Catalog for RestCatalog {
.query::(request)
.await?;
- let mut props = self.config.props.clone();
- if let Some(config) = resp.config {
- props.extend(config);
- }
-
- let file_io = match self
- .config
- .warehouse
- .as_ref()
- .or_else(|| resp.metadata_location.as_ref())
- {
- Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
- None => FileIOBuilder::new("s3").with_props(props).build()?,
- };
+ let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?;
let table_builder = Table::builder()
.identifier(table.clone())
@@ -401,14 +428,31 @@ impl Catalog for RestCatalog {
.await
}
- /// Update a table to the catalog.
- async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result {
- todo!()
- }
+ /// Update table.
+ async fn update_table(&self, mut commit: TableCommit) -> Result {
+ let request = self
+ .client
+ .0
+ .post(self.config.table_endpoint(commit.identifier()))
+ .json(&CommitTableRequest {
+ identifier: commit.identifier().clone(),
+ requirements: commit.take_requirements(),
+ updates: commit.take_updates(),
+ })
+ .build()?;
+
+ let resp = self
+ .client
+ .query::(request)
+ .await?;
- /// Update multiple tables to the catalog as an atomic operation.
- async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> {
- todo!()
+ let file_io = self.load_file_io(Some(&resp.metadata_location), None)?;
+ Ok(Table::builder()
+ .identifier(commit.identifier().clone())
+ .file_io(file_io)
+ .metadata(resp.metadata)
+ .metadata_location(resp.metadata_location)
+ .build())
}
}
@@ -446,6 +490,29 @@ impl RestCatalog {
Ok(())
}
+
+ fn load_file_io(
+ &self,
+ metadata_location: Option<&str>,
+ extra_config: Option>,
+ ) -> Result {
+ let mut props = self.config.props.clone();
+ if let Some(config) = extra_config {
+ props.extend(config);
+ }
+
+ let file_io = match self.config.warehouse.as_deref().or(metadata_location) {
+ Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
+ None => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unable to load file io, neither warehouse nor metadata location is set!",
+ ))?
+ }
+ };
+
+ Ok(file_io)
+ }
}
/// Requests and responses for rest api.
@@ -454,8 +521,8 @@ mod _serde {
use serde_derive::{Deserialize, Serialize};
- use iceberg::spec::TableMetadata;
- use iceberg::{Error, ErrorKind, Namespace, TableIdent};
+ use iceberg::spec::{PartitionSpec, Schema, SortOrder, TableMetadata};
+ use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate};
pub(super) const OK: u16 = 200u16;
pub(super) const NO_CONTENT: u16 = 204u16;
@@ -586,16 +653,46 @@ mod _serde {
pub(super) metadata: TableMetadata,
pub(super) config: Option>,
}
+
+ #[derive(Debug, Serialize, Deserialize)]
+ #[serde(rename_all = "kebab-case")]
+ pub(super) struct CreateTableRequest {
+ pub(super) name: String,
+ pub(super) location: Option,
+ pub(super) schema: Schema,
+ pub(super) partition_spec: Option,
+ pub(super) write_order: Option,
+ pub(super) stage_create: Option,
+ pub(super) properties: Option>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct CommitTableRequest {
+ pub(super) identifier: TableIdent,
+ pub(super) requirements: Vec,
+ pub(super) updates: Vec,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ #[serde(rename_all = "kebab-case")]
+ pub(super) struct CommitTableResponse {
+ pub(super) metadata_location: String,
+ pub(super) metadata: TableMetadata,
+ }
}
#[cfg(test)]
mod tests {
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
- FormatVersion, NestedField, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog,
- SortOrder, Summary, Type,
+ FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
+ PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary,
+ Transform, Type,
};
+ use iceberg::transaction::Transaction;
use mockito::{Mock, Server, ServerGuard};
+ use std::fs::File;
+ use std::io::BufReader;
use std::sync::Arc;
use uuid::uuid;
@@ -1017,31 +1114,31 @@ mod tests {
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
- ("spark.app.id", "local-1646787004168"),
- ("added-data-files", "1"),
- ("added-records", "1"),
- ("added-files-size", "697"),
- ("changed-partition-count", "1"),
- ("total-records", "1"),
- ("total-files-size", "697"),
- ("total-data-files", "1"),
- ("total-delete-files", "0"),
- ("total-position-deletes", "0"),
- ("total-equality-deletes", "0")
- ].iter().map(|p|(p.0.to_string(), p.1.to_string())))
+ ("spark.app.id", "local-1646787004168"),
+ ("added-data-files", "1"),
+ ("added-records", "1"),
+ ("added-files-size", "697"),
+ ("changed-partition-count", "1"),
+ ("total-records", "1"),
+ ("total-files-size", "697"),
+ ("total-data-files", "1"),
+ ("total-delete-files", "0"),
+ ("total-position-deletes", "0"),
+ ("total-equality-deletes", "0")
+ ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
}).build().unwrap()
)], table.metadata().snapshots().collect::>());
assert_eq!(
&[SnapshotLog {
timestamp_ms: 1646787054459,
- snapshot_id: 3497810964824022504
+ snapshot_id: 3497810964824022504,
}],
table.metadata().history()
);
assert_eq!(
vec![&Arc::new(SortOrder {
order_id: 0,
- fields: vec![]
+ fields: vec![],
})],
table.metadata().sort_orders_iter().collect::>()
);
@@ -1092,4 +1189,375 @@ mod tests {
config_mock.assert_async().await;
rename_table_mock.assert_async().await;
}
+
+ #[tokio::test]
+ async fn test_create_table() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let create_table_mock = server
+ .mock("POST", "/v1/namespaces/ns1/tables")
+ .with_status(200)
+ .with_body_from_file(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "create_table_response.json"
+ ))
+ .create_async()
+ .await;
+
+ let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table_creation = TableCreation::builder()
+ .name("test1".to_string())
+ .schema(
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
+ .into(),
+ ])
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .build()
+ .unwrap(),
+ )
+ .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
+ .partition_spec(
+ PartitionSpec::builder()
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .field_id(1000)
+ .transform(Transform::Truncate(3))
+ .name("id".to_string())
+ .build()])
+ .with_spec_id(1)
+ .build()
+ .unwrap(),
+ )
+ .sort_order(
+ SortOrder::builder()
+ .with_sort_field(
+ SortField::builder()
+ .source_id(2)
+ .transform(Transform::Identity)
+ .direction(SortDirection::Ascending)
+ .null_order(NullOrder::First)
+ .build(),
+ )
+ .build()
+ .unwrap(),
+ )
+ .build();
+
+ let table = catalog
+ .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
+ table.identifier()
+ );
+ assert_eq!(
+ "s3://warehouse/database/table/metadata.json",
+ table.metadata_location().unwrap()
+ );
+ assert_eq!(FormatVersion::V1, table.metadata().format_version());
+ assert_eq!("s3://warehouse/database/table", table.metadata().location());
+ assert_eq!(
+ uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
+ table.metadata().uuid()
+ );
+ assert_eq!(1657810967051, table.metadata().last_updated_ms());
+ assert_eq!(
+ vec![&Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
+ .into(),
+ ])
+ .with_schema_id(0)
+ .with_identifier_field_ids(vec![2])
+ .build()
+ .unwrap()
+ )],
+ table.metadata().schemas_iter().collect::>()
+ );
+ assert_eq!(
+ &HashMap::from([
+ (
+ "write.delete.parquet.compression-codec".to_string(),
+ "zstd".to_string()
+ ),
+ (
+ "write.metadata.compression-codec".to_string(),
+ "gzip".to_string()
+ ),
+ (
+ "write.summary.partition-limit".to_string(),
+ "100".to_string()
+ ),
+ (
+ "write.parquet.compression-codec".to_string(),
+ "zstd".to_string()
+ ),
+ ]),
+ table.metadata().properties()
+ );
+ assert!(table.metadata().current_snapshot().is_none());
+ assert!(table.metadata().history().is_empty());
+ assert_eq!(
+ vec![&Arc::new(SortOrder {
+ order_id: 0,
+ fields: vec![],
+ })],
+ table.metadata().sort_orders_iter().collect::>()
+ );
+
+ config_mock.assert_async().await;
+ create_table_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_create_table_409() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let create_table_mock = server
+ .mock("POST", "/v1/namespaces/ns1/tables")
+ .with_status(409)
+ .with_body(r#"
+{
+ "error": {
+ "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+ "type": "AlreadyExistsException",
+ "code": 409
+ }
+}
+ "#)
+ .create_async()
+ .await;
+
+ let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table_creation = TableCreation::builder()
+ .name("test1".to_string())
+ .schema(
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
+ .into(),
+ ])
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .build()
+ .unwrap(),
+ )
+ .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
+ .build();
+
+ let table_result = catalog
+ .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
+ .await;
+
+ assert!(table_result.is_err());
+ assert!(table_result
+ .err()
+ .unwrap()
+ .message()
+ .contains("Table already exists"));
+
+ config_mock.assert_async().await;
+ create_table_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_update_table() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let update_table_mock = server
+ .mock("POST", "/v1/namespaces/ns1/tables/test1")
+ .with_status(200)
+ .with_body_from_file(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "update_table_response.json"
+ ))
+ .create_async()
+ .await;
+
+ let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table1 = {
+ let file = File::open(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "create_table_response.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp.metadata)
+ .metadata_location(resp.metadata_location.unwrap())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .build()
+ };
+
+ let table = Transaction::new(&table1)
+ .upgrade_table_version(FormatVersion::V2)
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
+ table.identifier()
+ );
+ assert_eq!(
+ "s3://warehouse/database/table/metadata.json",
+ table.metadata_location().unwrap()
+ );
+ assert_eq!(FormatVersion::V2, table.metadata().format_version());
+ assert_eq!("s3://warehouse/database/table", table.metadata().location());
+ assert_eq!(
+ uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
+ table.metadata().uuid()
+ );
+ assert_eq!(1657810967051, table.metadata().last_updated_ms());
+ assert_eq!(
+ vec![&Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
+ .into(),
+ ])
+ .with_schema_id(0)
+ .with_identifier_field_ids(vec![2])
+ .build()
+ .unwrap()
+ )],
+ table.metadata().schemas_iter().collect::>()
+ );
+ assert_eq!(
+ &HashMap::from([
+ (
+ "write.delete.parquet.compression-codec".to_string(),
+ "zstd".to_string()
+ ),
+ (
+ "write.metadata.compression-codec".to_string(),
+ "gzip".to_string()
+ ),
+ (
+ "write.summary.partition-limit".to_string(),
+ "100".to_string()
+ ),
+ (
+ "write.parquet.compression-codec".to_string(),
+ "zstd".to_string()
+ ),
+ ]),
+ table.metadata().properties()
+ );
+ assert!(table.metadata().current_snapshot().is_none());
+ assert!(table.metadata().history().is_empty());
+ assert_eq!(
+ vec![&Arc::new(SortOrder {
+ order_id: 0,
+ fields: vec![],
+ })],
+ table.metadata().sort_orders_iter().collect::>()
+ );
+
+ config_mock.assert_async().await;
+ update_table_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_update_table_404() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let update_table_mock = server
+ .mock("POST", "/v1/namespaces/ns1/tables/test1")
+ .with_status(404)
+ .with_body(
+ r#"
+{
+ "error": {
+ "message": "The given table does not exist",
+ "type": "NoSuchTableException",
+ "code": 404
+ }
+}
+ "#,
+ )
+ .create_async()
+ .await;
+
+ let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table1 = {
+ let file = File::open(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "create_table_response.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp.metadata)
+ .metadata_location(resp.metadata_location.unwrap())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .build()
+ };
+
+ let table_result = Transaction::new(&table1)
+ .upgrade_table_version(FormatVersion::V2)
+ .unwrap()
+ .commit(&catalog)
+ .await;
+
+ assert!(table_result.is_err());
+ assert!(table_result
+ .err()
+ .unwrap()
+ .message()
+ .contains("The given table does not exist"));
+
+ config_mock.assert_async().await;
+ update_table_mock.assert_async().await;
+ }
}
diff --git a/crates/catalog/rest/testdata/create_table_response.json b/crates/catalog/rest/testdata/create_table_response.json
new file mode 100644
index 0000000000..e01a52fdce
--- /dev/null
+++ b/crates/catalog/rest/testdata/create_table_response.json
@@ -0,0 +1,53 @@
+{
+ "metadata-location": "s3://warehouse/database/table/metadata.json",
+ "metadata": {
+ "format-version": 1,
+ "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+ "location": "s3://warehouse/database/table",
+ "last-updated-ms": 1657810967051,
+ "last-column-id": 3,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": false, "type": "string"},
+ {"id": 2, "name": "bar", "required": true, "type": "int"},
+ {"id": 3, "name": "baz", "required": false, "type": "boolean"}
+ ]
+ },
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": false, "type": "string"},
+ {"id": 2, "name": "bar", "required": true, "type": "int"},
+ {"id": 3, "name": "baz", "required": false, "type": "boolean"}
+ ]
+ }
+ ],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {
+ "write.delete.parquet.compression-codec": "zstd",
+ "write.metadata.compression-codec": "gzip",
+ "write.summary.partition-limit": "100",
+ "write.parquet.compression-codec": "zstd"
+ },
+ "current-snapshot-id": -1,
+ "refs": {},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": []
+ },
+ "config": {
+ "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory",
+ "region": "us-west-2"
+ }
+}
\ No newline at end of file
diff --git a/crates/catalog/rest/testdata/update_table_response.json b/crates/catalog/rest/testdata/update_table_response.json
new file mode 100644
index 0000000000..80ec269a16
--- /dev/null
+++ b/crates/catalog/rest/testdata/update_table_response.json
@@ -0,0 +1,40 @@
+{
+ "metadata-location": "s3://warehouse/database/table/metadata.json",
+ "metadata": {
+ "format-version": 2,
+ "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+ "location": "s3://warehouse/database/table",
+ "last-sequence-number" : 1,
+ "last-updated-ms": 1657810967051,
+ "last-column-id": 3,
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": false, "type": "string"},
+ {"id": 2, "name": "bar", "required": true, "type": "int"},
+ {"id": 3, "name": "baz", "required": false, "type": "boolean"}
+ ]
+ }
+ ],
+ "partition-specs": [],
+ "default-spec-id": 0,
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {
+ "write.delete.parquet.compression-codec": "zstd",
+ "write.metadata.compression-codec": "gzip",
+ "write.summary.partition-limit": "100",
+ "write.parquet.compression-codec": "zstd"
+ },
+ "current-snapshot-id": -1,
+ "refs": {},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": []
+ }
+}
\ No newline at end of file
diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs
index b2d8d8ebde..ce3c5e7a0e 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -20,12 +20,15 @@
use serde_derive::{Deserialize, Serialize};
use urlencoding::encode;
-use crate::spec::{PartitionSpec, Schema, SortOrder};
+use crate::spec::{FormatVersion, PartitionSpec, Schema, Snapshot, SnapshotReference, SortOrder};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use async_trait::async_trait;
use std::collections::HashMap;
+use std::mem::take;
use std::ops::Deref;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
/// The catalog API for Iceberg Rust.
#[async_trait]
@@ -84,10 +87,7 @@ pub trait Catalog: std::fmt::Debug {
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
/// Update a table to the catalog.
- async fn update_table(&self, table: &TableIdent, commit: TableCommit) -> Result;
-
- /// Update multiple tables to the catalog as an atomic operation.
- async fn update_tables(&self, tables: &[(TableIdent, TableCommit)]) -> Result<()>;
+ async fn update_table(&self, commit: TableCommit) -> Result;
}
/// NamespaceIdent represents the identifier of a namespace in the catalog.
@@ -215,73 +215,230 @@ impl TableIdent {
}
/// TableCreation represents the creation of a table in the catalog.
-#[derive(Debug)]
+#[derive(Debug, TypedBuilder)]
pub struct TableCreation {
/// The name of the table.
pub name: String,
/// The location of the table.
- pub location: String,
+ #[builder(default, setter(strip_option))]
+ pub location: Option,
/// The schema of the table.
pub schema: Schema,
/// The partition spec of the table, could be None.
+ #[builder(default, setter(strip_option))]
pub partition_spec: Option,
/// The sort order of the table.
- pub sort_order: SortOrder,
+ #[builder(default, setter(strip_option))]
+ pub sort_order: Option,
/// The properties of the table.
+ #[builder(default)]
pub properties: HashMap,
}
/// TableCommit represents the commit of a table in the catalog.
-#[derive(Debug)]
+#[derive(Debug, TypedBuilder)]
+#[builder(build_method(vis = "pub(crate)"))]
pub struct TableCommit {
/// The table ident.
- pub ident: TableIdent,
+ ident: TableIdent,
/// The requirements of the table.
///
/// Commit will fail if the requirements are not met.
- pub requirements: Vec,
+ requirements: Vec,
/// The updates of the table.
- pub updates: Vec,
+ updates: Vec,
+}
+
+impl TableCommit {
+ /// Return the table identifier.
+ pub fn identifier(&self) -> &TableIdent {
+ &self.ident
+ }
+
+ /// Take all requirements.
+ pub fn take_requirements(&mut self) -> Vec {
+ take(&mut self.requirements)
+ }
+
+ /// Take all updates.
+ pub fn take_updates(&mut self) -> Vec {
+ take(&mut self.updates)
+ }
}
/// TableRequirement represents a requirement for a table in the catalog.
-#[derive(Debug)]
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[serde(tag = "type")]
pub enum TableRequirement {
/// The table must not already exist; used for create transactions
+ #[serde(rename = "assert-create")]
NotExist,
/// The table UUID must match the requirement.
- UuidMatch(String),
+ #[serde(rename = "assert-table-uuid")]
+ UuidMatch {
+ /// Uuid of original table.
+ uuid: Uuid,
+ },
/// The table branch or tag identified by the requirement's `reference` must
/// reference the requirement's `snapshot-id`.
+ #[serde(rename = "assert-ref-snapshot-id")]
RefSnapshotIdMatch {
/// The reference of the table to assert.
- reference: String,
+ r#ref: String,
/// The snapshot id of the table to assert.
/// If the id is `None`, the ref must not already exist.
+ #[serde(rename = "snapshot-id")]
snapshot_id: Option,
},
/// The table's last assigned column id must match the requirement.
- LastAssignedFieldIdMatch(i64),
+ #[serde(rename = "assert-last-assigned-field-id")]
+ LastAssignedFieldIdMatch {
+ /// The last assigned field id of the table to assert.
+ #[serde(rename = "last-assigned-field-id")]
+ last_assigned_field_id: i64,
+ },
/// The table's current schema id must match the requirement.
- CurrentSchemaIdMatch(i64),
+ #[serde(rename = "assert-current-schema-id")]
+ CurrentSchemaIdMatch {
+ /// Current schema id of the table to assert.
+ #[serde(rename = "current-schema-id")]
+ current_schema_id: i64,
+ },
/// The table's last assigned partition id must match the
/// requirement.
- LastAssignedPartitionIdMatch(i64),
+ #[serde(rename = "assert-last-assigned-partition-id")]
+ LastAssignedPartitionIdMatch {
+ /// Last assigned partition id of the table to assert.
+ #[serde(rename = "last-assigned-partition-id")]
+ last_assigned_partition_id: i64,
+ },
/// The table's default spec id must match the requirement.
- DefaultSpecIdMatch(i64),
+ #[serde(rename = "assert-default-spec-id")]
+ DefaultSpecIdMatch {
+ /// Default spec id of the table to assert.
+ #[serde(rename = "default-spec-id")]
+ default_spec_id: i64,
+ },
/// The table's default sort order id must match the requirement.
- DefaultSortOrderIdMatch(i64),
+ #[serde(rename = "assert-default-sort-order-id")]
+ DefaultSortOrderIdMatch {
+ /// Default sort order id of the table to assert.
+ #[serde(rename = "default-sort-order-id")]
+ default_sort_order_id: i64,
+ },
}
/// TableUpdate represents an update to a table in the catalog.
-///
-/// TODO: we should fill with UpgradeFormatVersionUpdate, AddSchemaUpdate and so on.
-#[derive(Debug)]
-pub enum TableUpdate {}
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[serde(tag = "action", rename_all = "kebab-case")]
+pub enum TableUpdate {
+ /// Upgrade table's format version
+ #[serde(rename_all = "kebab-case")]
+ UpgradeFormatVersion {
+ /// Target format upgrade to.
+ format_version: FormatVersion,
+ },
+ /// Assign a new UUID to the table
+ #[serde(rename_all = "kebab-case")]
+ AssignUuid {
+ /// The new UUID to assign.
+ uuid: Uuid,
+ },
+ /// Add a new schema to the table
+ #[serde(rename_all = "kebab-case")]
+ AddSchema {
+ /// The schema to add.
+ schema: Schema,
+ /// The last column id of the table.
+ last_column_id: Option,
+ },
+ /// Set table's current schema
+ #[serde(rename_all = "kebab-case")]
+ SetCurrentSchema {
+ /// Schema ID to set as current, or -1 to set last added schema
+ schema_id: i32,
+ },
+ /// Add a new partition spec to the table
+ AddSpec {
+ /// The partition spec to add.
+ spec: PartitionSpec,
+ },
+ /// Set table's default spec
+ #[serde(rename_all = "kebab-case")]
+ SetDefaultSpec {
+ /// Partition spec ID to set as the default, or -1 to set last added spec
+ spec_id: i32,
+ },
+ /// Add sort order to table.
+ #[serde(rename_all = "kebab-case")]
+ AddSortOrder {
+ /// Sort order to add.
+ sort_order: SortOrder,
+ },
+ /// Set table's default sort order
+ #[serde(rename_all = "kebab-case")]
+ SetDefaultSortOrder {
+ /// Sort order ID to set as the default, or -1 to set last added sort order
+ sort_order_id: i32,
+ },
+ /// Add snapshot to table.
+ #[serde(rename_all = "kebab-case")]
+ AddSnapshot {
+ /// Snapshot to add.
+ snapshot: Snapshot,
+ },
+ /// Set table's snapshot ref.
+ #[serde(rename_all = "kebab-case")]
+ SetSnapshotRef {
+ /// Name of snapshot reference to set.
+ ref_name: String,
+ /// Snapshot reference to set.
+ #[serde(flatten)]
+ reference: SnapshotReference,
+ },
+ /// Remove table's snapshots
+ #[serde(rename_all = "kebab-case")]
+ RemoveSnapshots {
+ /// Snapshot ids to remove.
+ snapshot_ids: Vec,
+ },
+ /// Remove snapshot reference
+ #[serde(rename_all = "kebab-case")]
+ RemoveSnapshotRef {
+ /// Name of snapshot reference to remove.
+ ref_name: String,
+ },
+ /// Update table's location
+ SetLocation {
+ /// New location for table.
+ location: String,
+ },
+ /// Update table's properties
+ SetProperties {
+ /// Properties to update for table.
+ updates: HashMap,
+ },
+ /// Remove table's properties
+ RemoveProperties {
+ /// Properties to remove
+ removals: Vec,
+ },
+}
#[cfg(test)]
mod tests {
- use crate::{NamespaceIdent, TableIdent};
+ use crate::spec::ManifestListLocation::ManifestListFile;
+ use crate::spec::{
+ FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
+ PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
+ SortField, SortOrder, Summary, Transform, Type,
+ };
+ use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate};
+ use serde::de::DeserializeOwned;
+ use serde::Serialize;
+ use std::collections::HashMap;
+ use std::fmt::Debug;
+ use uuid::uuid;
#[test]
fn test_create_table_id() {
@@ -292,4 +449,628 @@ mod tests {
assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
}
+
+ fn test_serde_json(
+ json: impl ToString,
+ expected: T,
+ ) {
+ let json_str = json.to_string();
+ let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
+ assert_eq!(actual, expected, "Parsed value is not equal to expected");
+
+ let restored: T = serde_json::from_str(
+ &serde_json::to_string(&actual).expect("Failed to serialize to json"),
+ )
+ .expect("Failed to parse from serialized json");
+
+ assert_eq!(
+ restored, expected,
+ "Parsed restored value is not equal to expected"
+ );
+ }
+
+ #[test]
+ fn test_table_uuid() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-table-uuid",
+ "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
+}
+ "#,
+ TableRequirement::UuidMatch {
+ uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
+ },
+ );
+ }
+
+ #[test]
+ fn test_assert_table_not_exists() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-create"
+}
+ "#,
+ TableRequirement::NotExist,
+ );
+ }
+
+ #[test]
+ fn test_assert_ref_snapshot_id() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-ref-snapshot-id",
+ "ref": "snapshot-name",
+ "snapshot-id": null
+}
+ "#,
+ TableRequirement::RefSnapshotIdMatch {
+ r#ref: "snapshot-name".to_string(),
+ snapshot_id: None,
+ },
+ );
+
+ test_serde_json(
+ r#"
+{
+ "type": "assert-ref-snapshot-id",
+ "ref": "snapshot-name",
+ "snapshot-id": 1
+}
+ "#,
+ TableRequirement::RefSnapshotIdMatch {
+ r#ref: "snapshot-name".to_string(),
+ snapshot_id: Some(1),
+ },
+ );
+ }
+
+ #[test]
+ fn test_assert_last_assigned_field_id() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-last-assigned-field-id",
+ "last-assigned-field-id": 12
+}
+ "#,
+ TableRequirement::LastAssignedFieldIdMatch {
+ last_assigned_field_id: 12,
+ },
+ );
+ }
+
+ #[test]
+ fn test_assert_current_schema_id() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-current-schema-id",
+ "current-schema-id": 4
+}
+ "#,
+ TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: 4,
+ },
+ );
+ }
+
+ #[test]
+ fn test_assert_last_assigned_partition_id() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-last-assigned-partition-id",
+ "last-assigned-partition-id": 1004
+}
+ "#,
+ TableRequirement::LastAssignedPartitionIdMatch {
+ last_assigned_partition_id: 1004,
+ },
+ );
+ }
+
+ #[test]
+ fn test_assert_default_spec_id() {
+ test_serde_json(
+ r#"
+{
+ "type": "assert-default-spec-id",
+ "default-spec-id": 5
+}
+ "#,
+ TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
+ );
+ }
+
+ #[test]
+ fn test_assert_default_sort_order() {
+ let json = r#"
+{
+ "type": "assert-default-sort-order-id",
+ "default-sort-order-id": 10
+}
+ "#;
+
+ let update = TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: 10,
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_parse_assert_invalid() {
+ assert!(
+ serde_json::from_str::(
+ r#"
+{
+ "default-sort-order-id": 10
+}
+"#
+ )
+ .is_err(),
+ "Table requirements should not be parsed without type."
+ );
+ }
+
+ #[test]
+ fn test_assign_uuid() {
+ test_serde_json(
+ r#"
+{
+ "action": "assign-uuid",
+ "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
+}
+ "#,
+ TableUpdate::AssignUuid {
+ uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
+ },
+ );
+ }
+
+ #[test]
+ fn test_upgrade_format_version() {
+ test_serde_json(
+ r#"
+{
+ "action": "upgrade-format-version",
+ "format-version": 2
+}
+ "#,
+ TableUpdate::UpgradeFormatVersion {
+ format_version: FormatVersion::V2,
+ },
+ );
+ }
+
+ #[test]
+ fn test_add_schema() {
+ let test_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .with_fields(vec![
+ NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap();
+ test_serde_json(
+ r#"
+{
+ "action": "add-schema",
+ "schema": {
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "foo",
+ "required": false,
+ "type": "string"
+ },
+ {
+ "id": 2,
+ "name": "bar",
+ "required": true,
+ "type": "int"
+ },
+ {
+ "id": 3,
+ "name": "baz",
+ "required": false,
+ "type": "boolean"
+ }
+ ],
+ "identifier-field-ids": [
+ 2
+ ]
+ },
+ "last-column-id": 3
+}
+ "#,
+ TableUpdate::AddSchema {
+ schema: test_schema.clone(),
+ last_column_id: Some(3),
+ },
+ );
+
+ test_serde_json(
+ r#"
+{
+ "action": "add-schema",
+ "schema": {
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "foo",
+ "required": false,
+ "type": "string"
+ },
+ {
+ "id": 2,
+ "name": "bar",
+ "required": true,
+ "type": "int"
+ },
+ {
+ "id": 3,
+ "name": "baz",
+ "required": false,
+ "type": "boolean"
+ }
+ ],
+ "identifier-field-ids": [
+ 2
+ ]
+ }
+}
+ "#,
+ TableUpdate::AddSchema {
+ schema: test_schema.clone(),
+ last_column_id: None,
+ },
+ );
+ }
+
+ #[test]
+ fn test_set_current_schema() {
+ test_serde_json(
+ r#"
+{
+ "action": "set-current-schema",
+ "schema-id": 23
+}
+ "#,
+ TableUpdate::SetCurrentSchema { schema_id: 23 },
+ );
+ }
+
+ #[test]
+ fn test_add_spec() {
+ test_serde_json(
+ r#"
+{
+ "action": "add-spec",
+ "spec": {
+ "spec-id": 1,
+ "fields": [
+ {
+ "source-id": 4,
+ "field-id": 1000,
+ "name": "ts_day",
+ "transform": "day"
+ },
+ {
+ "source-id": 1,
+ "field-id": 1001,
+ "name": "id_bucket",
+ "transform": "bucket[16]"
+ },
+ {
+ "source-id": 2,
+ "field-id": 1002,
+ "name": "id_truncate",
+ "transform": "truncate[4]"
+ }
+ ]
+ }
+}
+ "#,
+ TableUpdate::AddSpec {
+ spec: PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(4)
+ .field_id(1000)
+ .name("ts_day".to_string())
+ .transform(Transform::Day)
+ .build(),
+ )
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(1)
+ .field_id(1001)
+ .name("id_bucket".to_string())
+ .transform(Transform::Bucket(16))
+ .build(),
+ )
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(2)
+ .field_id(1002)
+ .name("id_truncate".to_string())
+ .transform(Transform::Truncate(4))
+ .build(),
+ )
+ .build()
+ .unwrap(),
+ },
+ );
+ }
+
+ #[test]
+ fn test_set_default_spec() {
+ test_serde_json(
+ r#"
+{
+ "action": "set-default-spec",
+ "spec-id": 1
+}
+ "#,
+ TableUpdate::SetDefaultSpec { spec_id: 1 },
+ )
+ }
+
+ #[test]
+ fn test_add_sort_order() {
+ let json = r#"
+{
+ "action": "add-sort-order",
+ "sort-order": {
+ "order-id": 1,
+ "fields": [
+ {
+ "transform": "identity",
+ "source-id": 2,
+ "direction": "asc",
+ "null-order": "nulls-first"
+ },
+ {
+ "transform": "bucket[4]",
+ "source-id": 3,
+ "direction": "desc",
+ "null-order": "nulls-last"
+ }
+ ]
+ }
+}
+ "#;
+
+ let update = TableUpdate::AddSortOrder {
+ sort_order: SortOrder::builder()
+ .with_order_id(1)
+ .with_sort_field(
+ SortField::builder()
+ .source_id(2)
+ .direction(SortDirection::Ascending)
+ .null_order(NullOrder::First)
+ .transform(Transform::Identity)
+ .build(),
+ )
+ .with_sort_field(
+ SortField::builder()
+ .source_id(3)
+ .direction(SortDirection::Descending)
+ .null_order(NullOrder::Last)
+ .transform(Transform::Bucket(4))
+ .build(),
+ )
+ .build()
+ .unwrap(),
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_set_default_order() {
+ let json = r#"
+{
+ "action": "set-default-sort-order",
+ "sort-order-id": 2
+}
+ "#;
+ let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_add_snapshot() {
+ let json = r#"
+{
+ "action": "add-snapshot",
+ "snapshot": {
+ "snapshot-id": 3055729675574597000,
+ "parent-snapshot-id": 3051729675574597000,
+ "timestamp-ms": 1555100955770,
+ "sequence-number": 1,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://a/b/2.avro",
+ "schema-id": 1
+ }
+}
+ "#;
+
+ let update = TableUpdate::AddSnapshot {
+ snapshot: Snapshot::builder()
+ .with_snapshot_id(3055729675574597000)
+ .with_parent_snapshot_id(Some(3051729675574597000))
+ .with_timestamp_ms(1555100955770)
+ .with_sequence_number(1)
+ .with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string()))
+ .with_schema_id(1)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::default(),
+ })
+ .build()
+ .unwrap(),
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_remove_snapshots() {
+ let json = r#"
+{
+ "action": "remove-snapshots",
+ "snapshot-ids": [
+ 1,
+ 2
+ ]
+}
+ "#;
+
+ let update = TableUpdate::RemoveSnapshots {
+ snapshot_ids: vec![1, 2],
+ };
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_remove_snapshot_ref() {
+ let json = r#"
+{
+ "action": "remove-snapshot-ref",
+ "ref-name": "snapshot-ref"
+}
+ "#;
+
+ let update = TableUpdate::RemoveSnapshotRef {
+ ref_name: "snapshot-ref".to_string(),
+ };
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_set_snapshot_ref_tag() {
+ let json = r#"
+{
+ "action": "set-snapshot-ref",
+ "type": "tag",
+ "ref-name": "hank",
+ "snapshot-id": 1,
+ "max-ref-age-ms": 1
+}
+ "#;
+
+ let update = TableUpdate::SetSnapshotRef {
+ ref_name: "hank".to_string(),
+ reference: SnapshotReference {
+ snapshot_id: 1,
+ retention: SnapshotRetention::Tag { max_ref_age_ms: 1 },
+ },
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_set_snapshot_ref_branch() {
+ let json = r#"
+{
+ "action": "set-snapshot-ref",
+ "type": "branch",
+ "ref-name": "hank",
+ "snapshot-id": 1,
+ "min-snapshots-to-keep": 2,
+ "max-snapshot-age-ms": 3,
+ "max-ref-age-ms": 4
+}
+ "#;
+
+ let update = TableUpdate::SetSnapshotRef {
+ ref_name: "hank".to_string(),
+ reference: SnapshotReference {
+ snapshot_id: 1,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(2),
+ max_snapshot_age_ms: Some(3),
+ max_ref_age_ms: Some(4),
+ },
+ },
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_set_properties() {
+ let json = r#"
+{
+ "action": "set-properties",
+ "updates": {
+ "prop1": "v1",
+ "prop2": "v2"
+ }
+}
+ "#;
+
+ let update = TableUpdate::SetProperties {
+ updates: vec![
+ ("prop1".to_string(), "v1".to_string()),
+ ("prop2".to_string(), "v2".to_string()),
+ ]
+ .into_iter()
+ .collect(),
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_remove_properties() {
+ let json = r#"
+{
+ "action": "remove-properties",
+ "removals": [
+ "prop1",
+ "prop2"
+ ]
+}
+ "#;
+
+ let update = TableUpdate::RemoveProperties {
+ removals: vec!["prop1".to_string(), "prop2".to_string()],
+ };
+
+ test_serde_json(json, update);
+ }
+
+ #[test]
+ fn test_set_location() {
+ let json = r#"
+{
+ "action": "set-location",
+ "location": "s3://bucket/warehouse/tbl_location"
+}
+ "#;
+
+ let update = TableUpdate::SetLocation {
+ location: "s3://bucket/warehouse/tbl_location".to_string(),
+ };
+
+ test_serde_json(json, update);
+ }
}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 710eb960a9..378164838e 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -44,4 +44,5 @@ mod avro;
pub mod io;
pub mod spec;
+pub mod transaction;
pub mod transform;
diff --git a/crates/iceberg/src/rest.rs b/crates/iceberg/src/rest.rs
deleted file mode 100644
index 31d4da8b67..0000000000
--- a/crates/iceberg/src/rest.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This module provide rest catalog implementation.
-
-#[derive(Debug)]
-pub struct RestCatalog {
- url: String,
-}
diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs
index c5ea8f319a..cfdbb6f17a 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -20,12 +20,13 @@
*/
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use typed_builder::TypedBuilder;
use super::transform::Transform;
/// Reference to [`PartitionSpec`].
pub type PartitionSpecRef = Arc;
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
/// Partition fields capture the transform from table data to partition values.
pub struct PartitionField {
diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs
index 3679445e62..3aa1f8b5b4 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -28,7 +28,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
-use std::sync::{Arc, OnceLock};
+use std::sync::Arc;
use _serde::SchemaEnum;
@@ -37,7 +37,7 @@ pub type SchemaRef = Arc;
const DEFAULT_SCHEMA_ID: i32 = 0;
/// Defines schema in iceberg.
-#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
+#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(try_from = "SchemaEnum", into = "SchemaEnum")]
pub struct Schema {
r#struct: StructType,
@@ -50,9 +50,18 @@ pub struct Schema {
name_to_id: HashMap,
id_to_name: HashMap,
- lower_case_name_to_id: OnceLock>,
}
+impl PartialEq for Schema {
+ fn eq(&self, other: &Self) -> bool {
+ self.r#struct == other.r#struct
+ && self.schema_id == other.schema_id
+ && self.identifier_field_ids == other.identifier_field_ids
+ }
+}
+
+impl Eq for Schema {}
+
/// Schema builder.
#[derive(Debug)]
pub struct SchemaBuilder {
@@ -117,7 +126,6 @@ impl SchemaBuilder {
name_to_id,
id_to_name,
- lower_case_name_to_id: OnceLock::default(),
})
}
diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs
index a04bb99980..ca9db4156a 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -23,6 +23,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use super::table_metadata::SnapshotLog;
+use _serde::SnapshotV2;
/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc;
@@ -57,7 +58,8 @@ impl Default for Operation {
}
}
-#[derive(Debug, PartialEq, Eq, Clone, Builder)]
+#[derive(Debug, PartialEq, Eq, Clone, Builder, Serialize, Deserialize)]
+#[serde(from = "SnapshotV2", into = "SnapshotV2")]
#[builder(setter(prefix = "with"))]
/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
pub struct Snapshot {
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
index dcb19d3594..2150421b7e 100644
--- a/crates/iceberg/src/spec/sort.rs
+++ b/crates/iceberg/src/spec/sort.rs
@@ -20,6 +20,7 @@
*/
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use typed_builder::TypedBuilder;
use super::transform::Transform;
@@ -47,12 +48,12 @@ pub enum NullOrder {
Last,
}
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
/// Entry for every column that is to be sorted
pub struct SortField {
/// A source column id from the table’s schema
- pub source_id: i64,
+ pub source_id: i32,
/// A transform that is used to produce values to be sorted on from the source column.
pub transform: Transform,
/// A sort direction, that can only be either asc or desc
@@ -61,13 +62,14 @@ pub struct SortField {
pub null_order: NullOrder,
}
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder, Default)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
/// A sort order is defined by a sort order id and a list of sort fields.
/// The order of the sort fields within the list defines the order in which the sort is applied to the data.
pub struct SortOrder {
/// Identifier for SortOrder, order_id `0` is no sort order.
+ #[builder(default)]
pub order_id: i64,
/// Details of the sort
#[builder(setter(each(name = "with_sort_field")), default)]
diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs
index 6abe959646..a28a4f117b 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-/*!
-Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
-The main struct here is [TableMetadataV2] which defines the data for a table.
-*/
+//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
+//! The main struct here is [TableMetadataV2] which defines the data for a table.
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
+use std::cmp::Ordering;
+use std::fmt::{Display, Formatter};
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
@@ -746,9 +746,30 @@ pub(super) mod _serde {
/// Iceberg format version
pub enum FormatVersion {
/// Iceberg spec version 1
- V1 = b'1',
+ V1 = 1u8,
/// Iceberg spec version 2
- V2 = b'2',
+ V2 = 2u8,
+}
+
+impl PartialOrd for FormatVersion {
+ fn partial_cmp(&self, other: &Self) -> Option {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for FormatVersion {
+ fn cmp(&self, other: &Self) -> Ordering {
+ (*self as u8).cmp(&(*other as u8))
+ }
+}
+
+impl Display for FormatVersion {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ FormatVersion::V1 => write!(f, "v1"),
+ FormatVersion::V2 => write!(f, "v2"),
+ }
+ }
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
@@ -773,7 +794,6 @@ pub struct SnapshotLog {
#[cfg(test)]
mod tests {
-
use std::{collections::HashMap, fs, sync::Arc};
use anyhow::Result;
@@ -1041,7 +1061,7 @@ mod tests {
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
- .with_summary(Summary{operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])})
+ .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build().unwrap();
let expected = TableMetadata {
@@ -1060,13 +1080,13 @@ mod tests {
snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
current_snapshot_id: Some(638933773299822130),
last_sequence_number: 0,
- properties: HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]),
+ properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
snapshot_log: vec![SnapshotLog {
snapshot_id: 638933773299822130,
timestamp_ms: 1662532818843,
}],
- metadata_log: vec![MetadataLog{metadata_file:"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245}],
- refs: HashMap::from_iter(vec![("main".to_string(),SnapshotReference{snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None }})])
+ metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
+ refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
};
check_table_metadata_serde(data, expected);
@@ -1083,6 +1103,7 @@ mod tests {
assert!(serde_json::from_str::(data).is_err());
Ok(())
}
+
#[test]
fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
let data = r#"
@@ -1471,4 +1492,11 @@ mod tests {
"data did not match any variant of untagged enum TableMetadataEnum"
)
}
+
+ #[test]
+ fn order_of_format_version() {
+ assert!(FormatVersion::V1 < FormatVersion::V2);
+ assert_eq!(FormatVersion::V1, FormatVersion::V1);
+ assert_eq!(FormatVersion::V2, FormatVersion::V2);
+ }
}
diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs
new file mode 100644
index 0000000000..4ea89a2977
--- /dev/null
+++ b/crates/iceberg/src/transaction.rs
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains transaction api.
+
+use crate::error::Result;
+use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform};
+use crate::table::Table;
+use crate::TableUpdate::UpgradeFormatVersion;
+use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
+use std::cmp::Ordering;
+use std::collections::HashMap;
+use std::mem::discriminant;
+
+/// Table transaction.
+pub struct Transaction<'a> {
+ table: &'a Table,
+ updates: Vec,
+ requirements: Vec,
+}
+
+impl<'a> Transaction<'a> {
+ /// Creates a new transaction.
+ pub fn new(table: &'a Table) -> Self {
+ Self {
+ table,
+ updates: vec![],
+ requirements: vec![],
+ }
+ }
+
+ fn append_updates(&mut self, updates: Vec) -> Result<()> {
+ for update in &updates {
+ for up in &self.updates {
+ if discriminant(up) == discriminant(update) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot apply update with same type at same time: {:?}",
+ update
+ ),
+ ));
+ }
+ }
+ }
+ self.updates.extend(updates);
+ Ok(())
+ }
+
+ fn append_requirements(&mut self, requirements: Vec) -> Result<()> {
+ self.requirements.extend(requirements);
+ Ok(())
+ }
+
+ /// Sets table to a new version.
+ pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result {
+ let current_version = self.table.metadata().format_version();
+ match current_version.cmp(&format_version) {
+ Ordering::Greater => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot downgrade table version from {} to {}",
+ current_version, format_version
+ ),
+ ));
+ }
+ Ordering::Less => {
+ self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
+ }
+ Ordering::Equal => {
+ // Do nothing.
+ }
+ }
+ Ok(self)
+ }
+
+ /// Update table's property.
+ pub fn set_properties(mut self, props: HashMap) -> Result {
+ self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
+ Ok(self)
+ }
+
+ /// Creates replace sort order action.
+ pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
+ ReplaceSortOrderAction {
+ tx: self,
+ sort_fields: vec![],
+ }
+ }
+
+ /// Remove properties in table.
+ pub fn remove_properties(mut self, keys: Vec) -> Result {
+ self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
+ Ok(self)
+ }
+
+ /// Commit transaction.
+ pub async fn commit(self, catalog: &impl Catalog) -> Result {
+ let table_commit = TableCommit::builder()
+ .ident(self.table.identifier().clone())
+ .updates(self.updates)
+ .requirements(self.requirements)
+ .build();
+
+ catalog.update_table(table_commit).await
+ }
+}
+
+/// Transaction action for replacing sort order.
+pub struct ReplaceSortOrderAction<'a> {
+ tx: Transaction<'a>,
+ sort_fields: Vec,
+}
+
+impl<'a> ReplaceSortOrderAction<'a> {
+ /// Adds a field for sorting in ascending order.
+ pub fn asc(self, name: &str, null_order: NullOrder) -> Result {
+ self.add_sort_field(name, SortDirection::Ascending, null_order)
+ }
+
+ /// Adds a field for sorting in descending order.
+ pub fn desc(self, name: &str, null_order: NullOrder) -> Result {
+ self.add_sort_field(name, SortDirection::Descending, null_order)
+ }
+
+ /// Finished building the action and apply it to the transaction.
+ pub fn apply(mut self) -> Result> {
+ let updates = vec![
+ TableUpdate::AddSortOrder {
+ sort_order: SortOrder {
+ fields: self.sort_fields,
+ ..SortOrder::default()
+ },
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
+ ];
+
+ let requirements = vec![
+ TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64,
+ },
+ TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: self
+ .tx
+ .table
+ .metadata()
+ .default_sort_order()
+ .unwrap()
+ .order_id,
+ },
+ ];
+
+ self.tx.append_requirements(requirements)?;
+ self.tx.append_updates(updates)?;
+ Ok(self.tx)
+ }
+
+ fn add_sort_field(
+ mut self,
+ name: &str,
+ sort_direction: SortDirection,
+ null_order: NullOrder,
+ ) -> Result {
+ let field_id = self
+ .tx
+ .table
+ .metadata()
+ .current_schema()
+ .field_id_by_name(name)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot find field {} in table schema", name),
+ )
+ })?;
+
+ let sort_field = SortField::builder()
+ .source_id(field_id)
+ .transform(Transform::Identity)
+ .direction(sort_direction)
+ .null_order(null_order)
+ .build();
+
+ self.sort_fields.push(sort_field);
+ Ok(self)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::io::FileIO;
+ use crate::spec::{FormatVersion, TableMetadata};
+ use crate::table::Table;
+ use crate::transaction::Transaction;
+ use crate::{TableIdent, TableRequirement, TableUpdate};
+ use std::collections::HashMap;
+ use std::fs::File;
+ use std::io::BufReader;
+
+ fn make_v1_table() -> Table {
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV1Valid.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp)
+ .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .build()
+ }
+
+ fn make_v2_table() -> Table {
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV2Valid.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp)
+ .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .build()
+ }
+
+ #[test]
+ fn test_upgrade_table_version_v1_to_v2() {
+ let table = make_v1_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::UpgradeFormatVersion {
+ format_version: FormatVersion::V2
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_upgrade_table_version_v2_to_v2() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+ assert!(
+ tx.updates.is_empty(),
+ "Upgrade table to same version should not generate any updates"
+ );
+ assert!(
+ tx.requirements.is_empty(),
+ "Upgrade table to same version should not generate any requirements"
+ );
+ }
+
+ #[test]
+ fn test_downgrade_table_version() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V1);
+
+ assert!(tx.is_err(), "Downgrade table version should fail!");
+ }
+
+ #[test]
+ fn test_set_table_property() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .set_properties(HashMap::from([("a".to_string(), "b".to_string())]))
+ .unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::SetProperties {
+ updates: HashMap::from([("a".to_string(), "b".to_string())])
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_remove_property() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .remove_properties(vec!["a".to_string(), "b".to_string()])
+ .unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::RemoveProperties {
+ removals: vec!["a".to_string(), "b".to_string()]
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_replace_sort_order() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.replace_sort_order().apply().unwrap();
+
+ assert_eq!(
+ vec![
+ TableUpdate::AddSortOrder {
+ sort_order: Default::default()
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
+ ],
+ tx.updates
+ );
+
+ assert_eq!(
+ vec![
+ TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: 1
+ },
+ TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: 3
+ }
+ ],
+ tx.requirements
+ );
+ }
+
+ #[test]
+ fn test_do_same_update_in_same_transaction() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .remove_properties(vec!["a".to_string(), "b".to_string()])
+ .unwrap();
+
+ let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
+
+ assert!(
+ tx.is_err(),
+ "Should not allow to do same kinds update in same transaction"
+ );
+ }
+}