1717 */
1818
1919use actix_web:: http:: header:: ContentType ;
20+ use arrow_schema:: { DataType , Schema } ;
2021use async_trait:: async_trait;
2122use chrono:: Utc ;
2223use datafusion:: logical_expr:: { LogicalPlan , Projection } ;
@@ -41,6 +42,7 @@ pub mod alerts_utils;
4142pub mod target;
4243
4344use crate :: alerts:: target:: TARGETS ;
45+ use crate :: handlers:: http:: fetch_schema;
4446use crate :: handlers:: http:: query:: create_streams_for_distributed;
4547use crate :: parseable:: { PARSEABLE , StreamNotFound } ;
4648use crate :: query:: { QUERY_SESSION , resolve_stream_names} ;
@@ -575,7 +577,7 @@ impl AlertConfig {
575577 store : & dyn crate :: storage:: ObjectStorage ,
576578 ) -> Result < AlertConfig , AlertError > {
577579 let basic_fields = Self :: parse_basic_fields ( alert_json) ?;
578- let query = Self :: build_query_from_v1 ( alert_json) ?;
580+ let query = Self :: build_query_from_v1 ( alert_json) . await ?;
579581 let threshold_config = Self :: extract_threshold_config ( alert_json) ?;
580582 let eval_config = Self :: extract_eval_config ( alert_json) ?;
581583 let targets = Self :: extract_targets ( alert_json) ?;
@@ -634,7 +636,7 @@ impl AlertConfig {
634636 }
635637
636638 /// Build SQL query from v1 alert structure
637- fn build_query_from_v1 ( alert_json : & JsonValue ) -> Result < String , AlertError > {
639+ async fn build_query_from_v1 ( alert_json : & JsonValue ) -> Result < String , AlertError > {
638640 let stream = alert_json[ "stream" ]
639641 . as_str ( )
640642 . ok_or_else ( || AlertError :: CustomError ( "Missing stream in v1 alert" . to_string ( ) ) ) ?;
@@ -644,7 +646,7 @@ impl AlertConfig {
644646
645647 let aggregate_function = Self :: parse_aggregate_function ( aggregate_config) ?;
646648 let base_query = Self :: build_base_query ( & aggregate_function, aggregate_config, stream) ?;
647- let final_query = Self :: add_where_conditions ( base_query, aggregate_config) ?;
649+ let final_query = Self :: add_where_conditions ( base_query, aggregate_config, stream ) . await ?;
648650
649651 Ok ( final_query)
650652 }
@@ -709,10 +711,11 @@ impl AlertConfig {
709711 Ok ( query)
710712 }
711713
712- /// Add WHERE conditions to the base query
713- fn add_where_conditions (
714+ /// Add WHERE conditions to the base query with data type conversion
715+ async fn add_where_conditions (
714716 base_query : String ,
715717 aggregate_config : & JsonValue ,
718+ stream : & str ,
716719 ) -> Result < String , AlertError > {
717720 let Some ( conditions) = aggregate_config[ "conditions" ] . as_object ( ) else {
718721 return Ok ( base_query) ;
@@ -726,6 +729,16 @@ impl AlertConfig {
726729 return Ok ( base_query) ;
727730 }
728731
732+ // Fetch the stream schema for data type conversion
733+ let schema = match fetch_schema ( stream) . await {
734+ Ok ( schema) => schema,
735+ Err ( e) => {
736+ return Err ( AlertError :: CustomError ( format ! (
737+ "Failed to fetch schema for stream '{stream}' during migration: {e}. Migration cannot proceed without schema information." ,
738+ ) ) ) ;
739+ }
740+ } ;
741+
729742 let mut where_clauses = Vec :: new ( ) ;
730743 for condition in condition_config {
731744 let column = condition[ "column" ] . as_str ( ) . unwrap_or ( "" ) ;
@@ -737,7 +750,8 @@ impl AlertConfig {
737750 let value = condition[ "value" ] . as_str ( ) . unwrap_or ( "" ) ;
738751
739752 let operator = Self :: parse_where_operator ( operator_str) ;
740- let where_clause = Self :: format_where_clause ( column, & operator, value) ;
753+ let where_clause =
754+ Self :: format_where_clause_with_types ( column, & operator, value, & schema) ?;
741755 where_clauses. push ( where_clause) ;
742756 }
743757
@@ -769,41 +783,129 @@ impl AlertConfig {
769783 }
770784 }
771785
772- /// Format a single WHERE clause
773- fn format_where_clause ( column : & str , operator : & WhereConfigOperator , value : & str ) -> String {
786+ /// Format a single WHERE clause with proper data type conversion
787+ fn format_where_clause_with_types (
788+ column : & str ,
789+ operator : & WhereConfigOperator ,
790+ value : & str ,
791+ schema : & Schema ,
792+ ) -> Result < String , AlertError > {
774793 match operator {
775794 WhereConfigOperator :: IsNull | WhereConfigOperator :: IsNotNull => {
776- format ! ( "\" {}\" {}" , column, operator. as_str( ) )
777- }
778- WhereConfigOperator :: Contains => {
779- format ! ( "\" {}\" LIKE '%{}%'" , column, value. replace( '\'' , "''" ) )
795+ Ok ( format ! ( "\" {column}\" {}" , operator. as_str( ) ) )
780796 }
781- WhereConfigOperator :: BeginsWith => {
782- format ! ( "\" {}\" LIKE '{}%'" , column, value. replace( '\'' , "''" ) )
797+ WhereConfigOperator :: Contains => Ok ( format ! (
798+ "\" {column}\" LIKE '%{}%'" ,
799+ value. replace( '\'' , "''" )
800+ ) ) ,
801+ WhereConfigOperator :: BeginsWith => Ok ( format ! (
802+ "\" {column}\" LIKE '{}%'" ,
803+ value. replace( '\'' , "''" )
804+ ) ) ,
805+ WhereConfigOperator :: EndsWith => Ok ( format ! (
806+ "\" {column}\" LIKE '%{}'" ,
807+ value. replace( '\'' , "''" )
808+ ) ) ,
809+ WhereConfigOperator :: DoesNotContain => Ok ( format ! (
810+ "\" {column}\" NOT LIKE '%{}%'" ,
811+ value. replace( '\'' , "''" )
812+ ) ) ,
813+ WhereConfigOperator :: DoesNotBeginWith => Ok ( format ! (
814+ "\" {column}\" NOT LIKE '{}%'" ,
815+ value. replace( '\'' , "''" )
816+ ) ) ,
817+ WhereConfigOperator :: DoesNotEndWith => Ok ( format ! (
818+ "\" {column}\" NOT LIKE '%{}'" ,
819+ value. replace( '\'' , "''" )
820+ ) ) ,
821+ WhereConfigOperator :: ILike => Ok ( format ! (
822+ "\" {column}\" ILIKE '{}'" ,
823+ value. replace( '\'' , "''" )
824+ ) ) ,
825+ _ => {
826+ // Standard operators: =, !=, <, >, <=, >=
827+ let formatted_value = Self :: convert_value_by_data_type ( column, value, schema) ?;
828+ Ok ( format ! (
829+ "\" {column}\" {} {formatted_value}" ,
830+ operator. as_str( )
831+ ) )
783832 }
784- WhereConfigOperator :: EndsWith => {
785- format ! ( "\" {}\" LIKE '%{}'" , column, value. replace( '\'' , "''" ) )
833+ }
834+ }
835+
836+ /// Convert string value to appropriate data type based on schema
837+ fn convert_value_by_data_type (
838+ column : & str ,
839+ value : & str ,
840+ schema : & Schema ,
841+ ) -> Result < String , AlertError > {
842+ // Find the field in the schema
843+ let field = schema. fields ( ) . iter ( ) . find ( |f| f. name ( ) == column) ;
844+ let Some ( field) = field else {
845+ // Column not found in schema, fail migration
846+ return Err ( AlertError :: CustomError ( format ! (
847+ "Column '{column}' not found in stream schema during migration. Available columns: [{}]" ,
848+ schema
849+ . fields( )
850+ . iter( )
851+ . map( |f| f. name( ) . clone( ) )
852+ . collect:: <Vec <_>>( )
853+ . join( ", " )
854+ ) ) ) ;
855+ } ;
856+
857+ match field. data_type ( ) {
858+ DataType :: Float64 => {
859+ match value. parse :: < f64 > ( ) {
860+ Ok ( float_val) => Ok ( float_val. to_string ( ) ) , // Raw number without quotes
861+ Err ( _) => Err ( AlertError :: CustomError ( format ! (
862+ "Failed to parse value '{value}' as float64 for column '{column}' during migration" ,
863+ ) ) ) ,
864+ }
786865 }
787- WhereConfigOperator :: DoesNotContain => {
788- format ! ( "\" {}\" NOT LIKE '%{}%'" , column, value. replace( '\'' , "''" ) )
866+ DataType :: Int64 => {
867+ match value. parse :: < i64 > ( ) {
868+ Ok ( int_val) => Ok ( int_val. to_string ( ) ) , // Raw number without quotes
869+ Err ( _) => Err ( AlertError :: CustomError ( format ! (
870+ "Failed to parse value '{value}' as int64 for column '{column}' during migration" ,
871+ ) ) ) ,
872+ }
789873 }
790- WhereConfigOperator :: DoesNotBeginWith => {
791- format ! ( "\" {}\" NOT LIKE '{}%'" , column, value. replace( '\'' , "''" ) )
874+ DataType :: Boolean => {
875+ match value. to_lowercase ( ) . parse :: < bool > ( ) {
876+ Ok ( bool_val) => Ok ( bool_val. to_string ( ) ) , // Raw boolean without quotes
877+ Err ( _) => Err ( AlertError :: CustomError ( format ! (
878+ "Failed to parse value '{value}' as boolean for column '{column}' during migration" ,
879+ ) ) ) ,
880+ }
792881 }
793- WhereConfigOperator :: DoesNotEndWith => {
794- format ! ( "\" {}\" NOT LIKE '%{}'" , column, value. replace( '\'' , "''" ) )
882+ DataType :: Date32 | DataType :: Date64 => {
883+ // For date types, try to validate the format but keep as quoted string in SQL
884+ match chrono:: NaiveDate :: parse_from_str ( value, "%Y-%m-%d" ) {
885+ Ok ( _) => Ok ( format ! ( "'{}'" , value. replace( '\'' , "''" ) ) ) ,
886+ Err ( _) => {
887+ // Try ISO format
888+ match value. parse :: < chrono:: DateTime < chrono:: Utc > > ( ) {
889+ Ok ( _) => Ok ( format ! ( "'{}'" , value. replace( '\'' , "''" ) ) ) ,
890+ Err ( _) => Err ( AlertError :: CustomError ( format ! (
891+ "Failed to parse value '{value}' as date for column '{column}' during migration" ,
892+ ) ) ) ,
893+ }
894+ }
895+ }
795896 }
796- WhereConfigOperator :: ILike => {
797- format ! ( "\" {}\" ILIKE '{}'" , column, value. replace( '\'' , "''" ) )
897+ DataType :: Timestamp ( ..) => {
898+ // For timestamp types, try to validate but keep as quoted string in SQL
899+ match value. parse :: < chrono:: DateTime < chrono:: Utc > > ( ) {
900+ Ok ( _) => Ok ( format ! ( "'{}'" , value. replace( '\'' , "''" ) ) ) ,
901+ Err ( _) => Err ( AlertError :: CustomError ( format ! (
902+ "Failed to parse value '{value}' as timestamp for column '{column}' during migration" ,
903+ ) ) ) ,
904+ }
798905 }
799906 _ => {
800- // Standard operators: =, !=, <, >, <=, >=
801- format ! (
802- "\" {}\" {} '{}'" ,
803- column,
804- operator. as_str( ) ,
805- value. replace( '\'' , "''" )
806- )
907+ // For all other data types (string, binary, etc.), use string with quotes
908+ Ok ( format ! ( "'{}'" , value. replace( '\'' , "''" ) ) )
807909 }
808910 }
809911 }
0 commit comments