diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 73197e612483..6e915e5a305c 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -54,30 +54,54 @@ fn int_size(v: usize) -> u8 { } /// Write little-endian integer to buffer -fn write_offset(buf: &mut [u8], value: usize, nbytes: u8) { - for i in 0..nbytes { - buf[i as usize] = (value >> (i * 8)) as u8; - } +fn write_offset(buf: &mut Vec, value: usize, nbytes: u8) { + let bytes = value.to_le_bytes(); + buf.extend_from_slice(&bytes[..nbytes as usize]); } -/// Helper to make room for header by moving data -fn make_room_for_header(buffer: &mut Vec, start_pos: usize, header_size: usize) { - let current_len = buffer.len(); - buffer.resize(current_len + header_size, 0); - - let src_start = start_pos; - let src_end = current_len; - let dst_start = start_pos + header_size; +fn write_header(buf: &mut Vec, header_byte: u8, is_large: bool, num_items: usize) { + buf.push(header_byte); - buffer.copy_within(src_start..src_end, dst_start); + if is_large { + let num_items = num_items as u32; + buf.extend_from_slice(&num_items.to_le_bytes()); + } else { + let num_items = num_items as u8; + buf.push(num_items); + }; } - #[derive(Default)] struct ValueBuffer(Vec); impl ValueBuffer { + fn append_u8(&mut self, term: u8) { + self.0.push(term); + } + + fn append_slice(&mut self, other: &[u8]) { + self.0.extend_from_slice(other); + } + + fn append_primitive_header(&mut self, primitive_type: VariantPrimitiveType) { + self.0.push(primitive_header(primitive_type)); + } + + fn inner(&self) -> &[u8] { + &self.0 + } + + fn into_inner(self) -> Vec { + self.0 + } + + fn inner_mut(&mut self) -> &mut Vec { + &mut self.0 + } + + // Variant types below + fn append_null(&mut self) { - self.0.push(primitive_header(VariantPrimitiveType::Null)); + self.append_primitive_header(VariantPrimitiveType::Null); } fn append_bool(&mut self, value: bool) { @@ -86,98 +110,91 @@ impl ValueBuffer { } else { VariantPrimitiveType::BooleanFalse }; - self.0.push(primitive_header(primitive_type)); + self.append_primitive_header(primitive_type); } fn append_int8(&mut self, value: i8) { - self.0.push(primitive_header(VariantPrimitiveType::Int8)); - self.0.push(value as u8); + self.append_primitive_header(VariantPrimitiveType::Int8); + self.append_u8(value as u8); } fn append_int16(&mut self, value: i16) { - self.0.push(primitive_header(VariantPrimitiveType::Int16)); - self.0.extend_from_slice(&value.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Int16); + self.append_slice(&value.to_le_bytes()); } fn append_int32(&mut self, value: i32) { - self.0.push(primitive_header(VariantPrimitiveType::Int32)); - self.0.extend_from_slice(&value.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Int32); + self.append_slice(&value.to_le_bytes()); } fn append_int64(&mut self, value: i64) { - self.0.push(primitive_header(VariantPrimitiveType::Int64)); - self.0.extend_from_slice(&value.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Int64); + self.append_slice(&value.to_le_bytes()); } fn append_float(&mut self, value: f32) { - self.0.push(primitive_header(VariantPrimitiveType::Float)); - self.0.extend_from_slice(&value.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Float); + self.append_slice(&value.to_le_bytes()); } fn append_double(&mut self, value: f64) { - self.0.push(primitive_header(VariantPrimitiveType::Double)); - self.0.extend_from_slice(&value.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Double); + self.append_slice(&value.to_le_bytes()); } fn append_date(&mut self, value: chrono::NaiveDate) { - self.0.push(primitive_header(VariantPrimitiveType::Date)); + self.append_primitive_header(VariantPrimitiveType::Date); let days_since_epoch = value.signed_duration_since(UNIX_EPOCH_DATE).num_days() as i32; - self.0.extend_from_slice(&days_since_epoch.to_le_bytes()); + self.append_slice(&days_since_epoch.to_le_bytes()); } fn append_timestamp_micros(&mut self, value: chrono::DateTime) { - self.0 - .push(primitive_header(VariantPrimitiveType::TimestampMicros)); + self.append_primitive_header(VariantPrimitiveType::TimestampMicros); let micros = value.timestamp_micros(); - self.0.extend_from_slice(µs.to_le_bytes()); + self.append_slice(µs.to_le_bytes()); } fn append_timestamp_ntz_micros(&mut self, value: chrono::NaiveDateTime) { - self.0 - .push(primitive_header(VariantPrimitiveType::TimestampNtzMicros)); + self.append_primitive_header(VariantPrimitiveType::TimestampNtzMicros); let micros = value.and_utc().timestamp_micros(); - self.0.extend_from_slice(µs.to_le_bytes()); + self.append_slice(µs.to_le_bytes()); } fn append_decimal4(&mut self, integer: i32, scale: u8) { - self.0 - .push(primitive_header(VariantPrimitiveType::Decimal4)); - self.0.push(scale); - self.0.extend_from_slice(&integer.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Decimal4); + self.append_u8(scale); + self.append_slice(&integer.to_le_bytes()); } fn append_decimal8(&mut self, integer: i64, scale: u8) { - self.0 - .push(primitive_header(VariantPrimitiveType::Decimal8)); - self.0.push(scale); - self.0.extend_from_slice(&integer.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Decimal8); + self.append_u8(scale); + self.append_slice(&integer.to_le_bytes()); } fn append_decimal16(&mut self, integer: i128, scale: u8) { - self.0 - .push(primitive_header(VariantPrimitiveType::Decimal16)); - self.0.push(scale); - self.0.extend_from_slice(&integer.to_le_bytes()); + self.append_primitive_header(VariantPrimitiveType::Decimal16); + self.append_u8(scale); + self.append_slice(&integer.to_le_bytes()); } fn append_binary(&mut self, value: &[u8]) { - self.0.push(primitive_header(VariantPrimitiveType::Binary)); - self.0 - .extend_from_slice(&(value.len() as u32).to_le_bytes()); - self.0.extend_from_slice(value); + self.append_primitive_header(VariantPrimitiveType::Binary); + self.append_slice(&(value.len() as u32).to_le_bytes()); + self.append_slice(value); } fn append_short_string(&mut self, value: ShortString) { let inner = value.0; - self.0.push(short_string_header(inner.len())); - self.0.extend_from_slice(inner.as_bytes()); + self.append_u8(short_string_header(inner.len())); + self.append_slice(inner.as_bytes()); } fn append_string(&mut self, value: &str) { - self.0.push(primitive_header(VariantPrimitiveType::String)); - self.0 - .extend_from_slice(&(value.len() as u32).to_le_bytes()); - self.0.extend_from_slice(value.as_bytes()); + self.append_primitive_header(VariantPrimitiveType::String); + self.append_slice(&(value.len() as u32).to_le_bytes()); + self.append_slice(value.as_bytes()); } fn offset(&self) -> usize { @@ -227,8 +244,8 @@ struct MetadataBuilder { } impl MetadataBuilder { - /// Add field name to dictionary, return its ID - fn add_field_name(&mut self, field_name: &str) -> u32 { + /// Upsert field name to dictionary, return its ID + fn upsert_field_name(&mut self, field_name: &str) -> u32 { use std::collections::btree_map::Entry; match self.field_name_to_id.entry(field_name.to_string()) { Entry::Occupied(entry) => *entry.get(), @@ -248,6 +265,45 @@ impl MetadataBuilder { fn metadata_size(&self) -> usize { self.field_names.iter().map(|k| k.len()).sum() } + + fn finish(self) -> Vec { + let nkeys = self.num_field_names(); + + // Calculate metadata size + let total_dict_size: usize = self.metadata_size(); + + // Determine appropriate offset size based on the larger of dict size or total string size + let max_offset = std::cmp::max(total_dict_size, nkeys); + let offset_size = int_size(max_offset); + + let offset_start = 1 + offset_size as usize; + let string_start = offset_start + (nkeys + 1) * offset_size as usize; + let metadata_size = string_start + total_dict_size; + + let mut metadata = Vec::with_capacity(metadata_size); + + // Write header: version=1, not sorted, with calculated offset_size + metadata.push(0x01 | ((offset_size - 1) << 6)); + + // Write dictionary size + write_offset(&mut metadata, nkeys, offset_size); + + // Write offsets + let mut cur_offset = 0; + for key in self.field_names.iter() { + write_offset(&mut metadata, cur_offset, offset_size); + cur_offset += key.len(); + } + // Write final offset + write_offset(&mut metadata, cur_offset, offset_size); + + // Write string data + for key in self.field_names.iter() { + metadata.extend_from_slice(key.as_bytes()); + } + + metadata + } } /// Top level builder for [`Variant`] values @@ -388,6 +444,7 @@ impl MetadataBuilder { /// ); /// /// ``` +#[derive(Default)] pub struct VariantBuilder { buffer: ValueBuffer, metadata_builder: MetadataBuilder, @@ -420,54 +477,7 @@ impl VariantBuilder { } pub fn finish(self) -> (Vec, Vec) { - let nkeys = self.metadata_builder.num_field_names(); - - // Calculate metadata size - let total_dict_size: usize = self.metadata_builder.metadata_size(); - - // Determine appropriate offset size based on the larger of dict size or total string size - let max_offset = std::cmp::max(total_dict_size, nkeys); - let offset_size = int_size(max_offset); - - let offset_start = 1 + offset_size as usize; - let string_start = offset_start + (nkeys + 1) * offset_size as usize; - let metadata_size = string_start + total_dict_size; - - // Pre-allocate exact size to avoid reallocations - let mut metadata = vec![0u8; metadata_size]; - - // Write header: version=1, not sorted, with calculated offset_size - metadata[0] = 0x01 | ((offset_size - 1) << 6); - - // Write dictionary size - write_offset(&mut metadata[1..], nkeys, offset_size); - - // Write offsets and string data - let mut cur_offset = 0; - for (i, key) in self.metadata_builder.field_names.iter().enumerate() { - write_offset( - &mut metadata[offset_start + i * offset_size as usize..], - cur_offset, - offset_size, - ); - let start = string_start + cur_offset; - metadata[start..start + key.len()].copy_from_slice(key.as_bytes()); - cur_offset += key.len(); - } - // Write final offset - write_offset( - &mut metadata[offset_start + nkeys * offset_size as usize..], - cur_offset, - offset_size, - ); - - (metadata, self.buffer.0) - } -} - -impl Default for VariantBuilder { - fn default() -> Self { - Self::new() + (self.metadata_builder.finish(), self.buffer.into_inner()) } } @@ -537,40 +547,23 @@ impl<'a> ListBuilder<'a> { let data_size = self.buffer.offset(); let num_elements = self.offsets.len() - 1; let is_large = num_elements > u8::MAX as usize; - let size_bytes = if is_large { 4 } else { 1 }; let offset_size = int_size(data_size); - let header_size = 1 + size_bytes + (num_elements + 1) * offset_size as usize; - - let parent_start_pos = self.parent_buffer.offset(); - - make_room_for_header(&mut self.parent_buffer.0, parent_start_pos, header_size); // Write header - let mut pos = parent_start_pos; - self.parent_buffer.0[pos] = array_header(is_large, offset_size); - pos += 1; - - if is_large { - self.parent_buffer.0[pos..pos + 4] - .copy_from_slice(&(num_elements as u32).to_le_bytes()); - pos += 4; - } else { - self.parent_buffer.0[pos] = num_elements as u8; - pos += 1; - } + write_header( + self.parent_buffer.inner_mut(), + array_header(is_large, offset_size), + is_large, + num_elements, + ); // Write offsets for offset in &self.offsets { - write_offset( - &mut self.parent_buffer.0[pos..pos + offset_size as usize], - *offset, - offset_size, - ); - pos += offset_size as usize; + write_offset(self.parent_buffer.inner_mut(), *offset, offset_size); } // Append values - self.parent_buffer.0.extend_from_slice(&self.buffer.0); + self.parent_buffer.append_slice(self.buffer.inner()); } } @@ -602,7 +595,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> { return; }; - let field_id = self.metadata_builder.add_field_name(field_name); + let field_id = self.metadata_builder.upsert_field_name(field_name); self.fields.insert(field_id, *field_start); self.pending = None; @@ -615,7 +608,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> { pub fn insert<'m, 'd, T: Into>>(&mut self, key: &str, value: T) { self.check_pending_field(); - let field_id = self.metadata_builder.add_field_name(key); + let field_id = self.metadata_builder.upsert_field_name(key); let field_start = self.buffer.offset(); self.fields.insert(field_id, field_start); @@ -655,7 +648,6 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> { let data_size = self.buffer.offset(); let num_fields = self.fields.len(); let is_large = num_fields > u8::MAX as usize; - let size_bytes = if is_large { 4 } else { 1 }; let field_ids_by_sorted_field_name = self .metadata_builder @@ -669,55 +661,28 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> { let id_size = int_size(max_id); let offset_size = int_size(data_size); - let header_size = 1 - + size_bytes - + num_fields * id_size as usize - + (num_fields + 1) * offset_size as usize; - - let parent_start_pos = self.parent_buffer.offset(); - - make_room_for_header(&mut self.parent_buffer.0, parent_start_pos, header_size); - // Write header - let mut pos = parent_start_pos; - self.parent_buffer.0[pos] = object_header(is_large, id_size, offset_size); - pos += 1; - - if is_large { - self.parent_buffer.0[pos..pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); - pos += 4; - } else { - self.parent_buffer.0[pos] = num_fields as u8; - pos += 1; - } + write_header( + self.parent_buffer.inner_mut(), + object_header(is_large, id_size, offset_size), + is_large, + num_fields, + ); // Write field IDs (sorted order) for id in &field_ids_by_sorted_field_name { - write_offset( - &mut self.parent_buffer.0[pos..pos + id_size as usize], - *id as usize, - id_size, - ); - pos += id_size as usize; + write_offset(self.parent_buffer.inner_mut(), *id as usize, id_size); } // Write field offsets for id in &field_ids_by_sorted_field_name { let &offset = self.fields.get(id).unwrap(); - write_offset( - &mut self.parent_buffer.0[pos..pos + offset_size as usize], - offset, - offset_size, - ); - pos += offset_size as usize; + write_offset(self.parent_buffer.inner_mut(), offset, offset_size); } - write_offset( - &mut self.parent_buffer.0[pos..pos + offset_size as usize], - data_size, - offset_size, - ); - self.parent_buffer.0.extend_from_slice(&self.buffer.0); + write_offset(self.parent_buffer.inner_mut(), data_size, offset_size); + + self.parent_buffer.append_slice(self.buffer.inner()); } }