1818*/
1919
2020pub mod format;
21- mod writer;
2221
2322use arrow_array:: RecordBatch ;
2423use arrow_schema:: { Field , Fields , Schema } ;
2524use itertools:: Itertools ;
2625use std:: sync:: Arc ;
2726
2827use self :: error:: EventError ;
29- pub use self :: writer:: STREAM_WRITERS ;
30- use crate :: { metadata, storage:: StreamType } ;
28+ use crate :: { metadata, staging:: STAGING , storage:: StreamType } ;
3129use chrono:: NaiveDateTime ;
3230use std:: collections:: HashMap ;
3331
@@ -52,36 +50,32 @@ impl Event {
5250 let mut key = get_schema_key ( & self . rb . schema ( ) . fields ) ;
5351 if self . time_partition . is_some ( ) {
5452 let parsed_timestamp_to_min = self . parsed_timestamp . format ( "%Y%m%dT%H%M" ) . to_string ( ) ;
55- key = format ! ( "{key}{ parsed_timestamp_to_min}" ) ;
53+ key. push_str ( & parsed_timestamp_to_min) ;
5654 }
5755
5856 if !self . custom_partition_values . is_empty ( ) {
59- let mut custom_partition_key = String :: default ( ) ;
6057 for ( k, v) in self . custom_partition_values . iter ( ) . sorted_by_key ( |v| v. 0 ) {
61- custom_partition_key = format ! ( "{custom_partition_key} &{k}={v}" ) ;
58+ key . push_str ( & format ! ( "&{k}={v}" ) ) ;
6259 }
63- key = format ! ( "{key}{custom_partition_key}" ) ;
6460 }
6561
66- let num_rows = self . rb . num_rows ( ) as u64 ;
6762 if self . is_first_event {
6863 commit_schema ( & self . stream_name , self . rb . schema ( ) ) ?;
6964 }
7065
71- STREAM_WRITERS . append_to_local (
72- & self . stream_name ,
66+ STAGING . get_or_create_stream ( & self . stream_name ) . push (
7367 & key,
7468 & self . rb ,
7569 self . parsed_timestamp ,
7670 & self . custom_partition_values ,
77- & self . stream_type ,
71+ self . stream_type ,
7872 ) ?;
7973
8074 metadata:: STREAM_INFO . update_stats (
8175 & self . stream_name ,
8276 self . origin_format ,
8377 self . origin_size ,
84- num_rows,
78+ self . rb . num_rows ( ) ,
8579 self . parsed_timestamp ,
8680 ) ?;
8781
@@ -93,21 +87,16 @@ impl Event {
9387 pub fn process_unchecked ( & self ) -> Result < ( ) , EventError > {
9488 let key = get_schema_key ( & self . rb . schema ( ) . fields ) ;
9589
96- STREAM_WRITERS . append_to_local (
97- & self . stream_name ,
90+ STAGING . get_or_create_stream ( & self . stream_name ) . push (
9891 & key,
9992 & self . rb ,
10093 self . parsed_timestamp ,
10194 & self . custom_partition_values ,
102- & self . stream_type ,
95+ self . stream_type ,
10396 ) ?;
10497
10598 Ok ( ( ) )
10699 }
107-
108- pub fn clear ( & self , stream_name : & str ) {
109- STREAM_WRITERS . clear ( stream_name) ;
110- }
111100}
112101
113102pub fn get_schema_key ( fields : & [ Arc < Field > ] ) -> String {
@@ -138,14 +127,13 @@ pub mod error {
138127 use arrow_schema:: ArrowError ;
139128
140129 use crate :: metadata:: error:: stream_info:: MetadataError ;
130+ use crate :: staging:: StagingError ;
141131 use crate :: storage:: ObjectStorageError ;
142132
143- use super :: writer:: errors:: StreamWriterError ;
144-
145133 #[ derive( Debug , thiserror:: Error ) ]
146134 pub enum EventError {
147135 #[ error( "Stream Writer Failed: {0}" ) ]
148- StreamWriter ( #[ from] StreamWriterError ) ,
136+ StreamWriter ( #[ from] StagingError ) ,
149137 #[ error( "Metadata Error: {0}" ) ]
150138 Metadata ( #[ from] MetadataError ) ,
151139 #[ error( "Stream Writer Failed: {0}" ) ]
0 commit comments