@@ -110,98 +110,110 @@ pub async fn resolve_parseable_metadata(
110110 . as_ref ( )
111111 . map ( |meta| serde_json:: from_slice ( meta) . expect ( "parseable config is valid json" ) ) ;
112112
113- // Env Change needs to be updated
114- let check = determine_environment ( staging_metadata, remote_metadata) ;
115- // flags for if metadata needs to be synced
116- let mut overwrite_staging = false ;
117- let mut overwrite_remote = false ;
113+ let env_change = determine_environment ( staging_metadata, remote_metadata) ;
118114
119- let res: Result < StorageMetadata , & ' static str > = match check {
120- EnvChange :: None ( mut metadata) => {
121- // overwrite staging anyways so that it matches remote in case of any divergence
122- overwrite_staging = true ;
123- match PARSEABLE . options . mode {
124- Mode :: All => {
125- metadata. server_mode . standalone_after_distributed ( ) ?;
126- overwrite_remote = true ;
127- update_metadata_mode_and_staging ( & mut metadata) ;
128- }
129- Mode :: Query => {
130- overwrite_remote = true ;
131- update_metadata_mode_and_staging ( & mut metadata) ;
132- }
133- _=> { }
134- }
135- if PARSEABLE . options . mode == Mode :: All {
136- metadata. server_mode . standalone_after_distributed ( ) ?;
137- }
138- Ok ( metadata)
139- } ,
140- EnvChange :: NewRemote => {
141- Err ( "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server" )
142- }
143- EnvChange :: NewStaging ( mut metadata) => {
144- // If server is started in ingest mode, ensure query mode has been started
145- if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
146- return Err ( ObjectStorageError :: UnhandledError ( format ! (
147- "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
148- JOIN_COMMUNITY
149- ) . into ( ) ) ) ;
150- }
151- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
152- metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
153- // this flag is set to true so that metadata is copied to staging
154- overwrite_staging = true ;
155- // overwrite remote in all and query mode
156- // because staging dir has changed.
157- match PARSEABLE . options . mode {
158- Mode :: All => {
159- metadata. server_mode . standalone_after_distributed ( )
160- . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
161- overwrite_remote = true ;
162- }
163- Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
164- update_metadata_mode_and_staging ( & mut metadata) ;
165- if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
166- overwrite_remote = true ;
167- }
168- }
169- }
170- Ok ( metadata)
171- }
172- EnvChange :: CreateBoth => {
173- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
174- let metadata = StorageMetadata :: default ( ) ;
175- // new metadata needs to be set
176- // if mode is query or all then both staging and remote
177- match PARSEABLE . options . mode {
178- Mode :: All | Mode :: Query | Mode :: Prism => overwrite_remote = true ,
179- _ => ( ) ,
180- }
181- // else only staging
182- overwrite_staging = true ;
183- Ok ( metadata)
184- }
185- } ;
186-
187- let mut metadata = res. map_err ( |err| {
188- let err = format ! ( "{}. {}" , err, JOIN_COMMUNITY ) ;
189- let err: Box < dyn std:: error:: Error + Send + Sync + ' static > = err. into ( ) ;
190- ObjectStorageError :: UnhandledError ( err)
191- } ) ?;
115+ let ( mut metadata, overwrite_staging, overwrite_remote) = process_env_change ( env_change) ?;
192116
193117 metadata. server_mode = PARSEABLE . options . mode ;
118+
194119 if overwrite_remote {
195120 put_remote_metadata ( & metadata) . await ?;
196121 }
197-
198122 if overwrite_staging {
199123 put_staging_metadata ( & metadata) ?;
200124 }
201125
202126 Ok ( metadata)
203127}
204128
129+ fn process_env_change (
130+ env_change : EnvChange ,
131+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
132+ match env_change {
133+ EnvChange :: None ( mut metadata) => handle_none_env ( & mut metadata) ,
134+ EnvChange :: NewRemote => handle_new_remote_env ( ) ,
135+ EnvChange :: NewStaging ( mut metadata) => handle_new_staging_env ( & mut metadata) ,
136+ EnvChange :: CreateBoth => handle_create_both_env ( ) ,
137+ }
138+ }
139+
140+ fn handle_none_env (
141+ metadata : & mut StorageMetadata ,
142+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
143+ let overwrite_staging = true ;
144+ let mut overwrite_remote = false ;
145+
146+ match PARSEABLE . options . mode {
147+ Mode :: All => {
148+ metadata. server_mode . standalone_after_distributed ( ) ?;
149+ overwrite_remote = true ;
150+ update_metadata_mode_and_staging ( metadata) ;
151+ }
152+ Mode :: Query => {
153+ overwrite_remote = true ;
154+ update_metadata_mode_and_staging ( metadata) ;
155+ }
156+ _ => { }
157+ }
158+ if PARSEABLE . options . mode == Mode :: All {
159+ metadata. server_mode . standalone_after_distributed ( ) ?;
160+ }
161+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
162+ }
163+
164+ fn handle_new_remote_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
165+ Err ( ObjectStorageError :: UnhandledError ( format ! (
166+ "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}" ,
167+ JOIN_COMMUNITY
168+ ) . into ( ) ) )
169+ }
170+
171+ fn handle_new_staging_env (
172+ metadata : & mut StorageMetadata ,
173+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
174+ if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
175+ return Err ( ObjectStorageError :: UnhandledError (
176+ format ! (
177+ "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
178+ JOIN_COMMUNITY
179+ )
180+ . into ( ) ,
181+ ) ) ;
182+ }
183+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
184+ metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
185+ let overwrite_staging = true ;
186+ let mut overwrite_remote = false ;
187+
188+ match PARSEABLE . options . mode {
189+ Mode :: All => {
190+ metadata
191+ . server_mode
192+ . standalone_after_distributed ( )
193+ . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
194+ overwrite_remote = true ;
195+ }
196+ Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
197+ update_metadata_mode_and_staging ( metadata) ;
198+ if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
199+ overwrite_remote = true ;
200+ }
201+ }
202+ }
203+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
204+ }
205+
206+ fn handle_create_both_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
207+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
208+ let metadata = StorageMetadata :: default ( ) ;
209+ let overwrite_remote = matches ! (
210+ PARSEABLE . options. mode,
211+ Mode :: All | Mode :: Query | Mode :: Prism
212+ ) ;
213+ let overwrite_staging = true ;
214+ Ok ( ( metadata, overwrite_staging, overwrite_remote) )
215+ }
216+
205217fn update_metadata_mode_and_staging ( metadata : & mut StorageMetadata ) {
206218 metadata. server_mode = PARSEABLE . options . mode ;
207219 metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
0 commit comments