1818
1919use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
2020use crate :: utils:: arrow:: get_field;
21- use anyhow:: { anyhow, Error as AnyError } ;
2221use serde:: { Deserialize , Serialize } ;
2322use std:: str;
2423
@@ -63,7 +62,7 @@ pub fn convert_static_schema_to_arrow_schema(
6362 static_schema : StaticSchema ,
6463 time_partition : & str ,
6564 custom_partition : Option < & String > ,
66- ) -> Result < Arc < Schema > , AnyError > {
65+ ) -> Result < Arc < Schema > , StaticSchemaError > {
6766 let mut parsed_schema = ParsedSchema {
6867 fields : Vec :: new ( ) ,
6968 metadata : HashMap :: new ( ) ,
@@ -86,7 +85,7 @@ pub fn convert_static_schema_to_arrow_schema(
8685
8786 for partition in & custom_partition_list {
8887 if !custom_partition_exists. contains_key ( * partition) {
89- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
88+ return Err ( StaticSchemaError :: MissingCustomPartition ( partition. to_string ( ) ) ) ;
9089 }
9190 }
9291 }
@@ -134,29 +133,23 @@ pub fn convert_static_schema_to_arrow_schema(
134133 parsed_schema. fields . push ( parsed_field) ;
135134 }
136135 if !time_partition. is_empty ( ) && !time_partition_exists {
137- return Err ( anyhow ! {
138- format!(
139- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
140- ) ,
141- } ) ;
136+ return Err ( StaticSchemaError :: MissingTimePartition ( time_partition. to_string ( ) ) ) ;
142137 }
143138 add_parseable_fields_to_static_schema ( parsed_schema)
144139}
145140
146141fn add_parseable_fields_to_static_schema (
147142 parsed_schema : ParsedSchema ,
148- ) -> Result < Arc < Schema > , AnyError > {
143+ ) -> Result < Arc < Schema > , StaticSchemaError > {
144+
149145 let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
150146 for field in parsed_schema. fields . iter ( ) {
151147 let field = Field :: new ( field. name . clone ( ) , field. data_type . clone ( ) , field. nullable ) ;
152148 schema. push ( Arc :: new ( field) ) ;
153149 }
154150
155151 if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
156- return Err ( anyhow ! (
157- "field {} is a reserved field" ,
158- DEFAULT_TIMESTAMP_KEY
159- ) ) ;
152+ return Err ( StaticSchemaError :: DefaultTime ) ;
160153 } ;
161154
162155 // add the p_timestamp field to the event schema to the 0th index
@@ -184,21 +177,43 @@ fn default_dict_is_ordered() -> bool {
184177 false
185178}
186179
187- fn validate_field_names (
188- field_name : & str ,
189- existing_fields : & mut HashSet < String > ,
190- ) -> Result < ( ) , AnyError > {
180+ fn validate_field_names ( field_name : & str , existing_fields : & mut HashSet < String > ) -> Result < ( ) , StaticSchemaError > {
181+
191182 if field_name. is_empty ( ) {
192- return Err ( anyhow ! ( "field names should not be empty" ) ) ;
183+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
193184 }
194185
195186 if !existing_fields. insert ( field_name. to_string ( ) ) {
196- return Err ( anyhow ! ( "duplicate field name: {}" , field_name) ) ;
187+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
197188 }
198189
199190 Ok ( ( ) )
200191}
201192
193+
194+ #[ derive( Debug , thiserror:: Error ) ]
195+ pub enum StaticSchemaError {
196+
197+ #[ error(
198+ "custom partition field {0} does not exist in the schema for the static schema logstream"
199+ ) ]
200+ MissingCustomPartition ( String ) ,
201+
202+ #[ error(
203+ "time partition field {0} does not exist in the schema for the static schema logstream"
204+ ) ]
205+ MissingTimePartition ( String ) ,
206+
207+ #[ error( "field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field" ) ]
208+ DefaultTime ,
209+
210+ #[ error( "field name cannot be empty" ) ]
211+ EmptyFieldName ,
212+
213+ #[ error( "duplicate field name: {0}" ) ]
214+ DuplicateField ( String ) ,
215+ }
216+
202217#[ cfg( test) ]
203218mod tests {
204219 use super :: * ;
0 commit comments