@@ -173,14 +173,12 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
173173 // For query mode, if the stream not found in memory map,
174174 //check if it exists in the storage
175175 //create stream and schema from storage
176- if PARSEABLE . options . mode == Mode :: Query {
177- match PARSEABLE
176+ if PARSEABLE . options . mode == Mode :: Query
177+ && ! PARSEABLE
178178 . create_stream_and_schema_from_storage ( & stream_name)
179- . await
180- {
181- Ok ( true ) => { }
182- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
183- }
179+ . await ?
180+ {
181+ return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ;
184182 }
185183
186184 let retention = PARSEABLE
@@ -199,16 +197,13 @@ pub async fn put_retention(
199197 // For query mode, if the stream not found in memory map,
200198 //check if it exists in the storage
201199 //create stream and schema from storage
202- if PARSEABLE . options . mode == Mode :: Query {
203- match PARSEABLE
200+ if PARSEABLE . options . mode == Mode :: Query
201+ && ! PARSEABLE
204202 . create_stream_and_schema_from_storage ( & stream_name)
205- . await
206- {
207- Ok ( true ) => { }
208- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
209- }
203+ . await ?
204+ {
205+ return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
210206 }
211- let stream = PARSEABLE . get_stream ( & stream_name) ?;
212207
213208 let retention: Retention = match serde_json:: from_value ( json) {
214209 Ok ( retention) => retention,
@@ -221,7 +216,7 @@ pub async fn put_retention(
221216 . put_retention ( & stream_name, & retention)
222217 . await ?;
223218
224- stream . set_retention ( retention) ;
219+ PARSEABLE . get_stream ( & stream_name ) ? . set_retention ( retention) ;
225220
226221 Ok ( (
227222 format ! ( "set retention configuration for log stream {stream_name}" ) ,
@@ -259,21 +254,16 @@ pub async fn get_stats(
259254) -> Result < impl Responder , StreamError > {
260255 let stream_name = stream_name. into_inner ( ) ;
261256
262- if ! PARSEABLE . streams . contains ( & stream_name ) {
263- // For query mode, if the stream not found in memory map,
264- //check if it exists in the storage
265- //create stream and schema from storage
266- if PARSEABLE . options . mode == Mode :: Query {
267- match PARSEABLE
257+ // For query mode, if the stream not found in memory map,
258+ //check if it exists in the storage
259+ //create stream and schema from storage
260+ if ! PARSEABLE . streams . contains ( & stream_name )
261+ && ( PARSEABLE . options . mode != Mode :: Query
262+ || ! PARSEABLE
268263 . create_stream_and_schema_from_storage ( & stream_name)
269- . await
270- {
271- Ok ( true ) => { }
272- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
273- }
274- } else {
275- return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
276- }
264+ . await ?)
265+ {
266+ return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ;
277267 }
278268
279269 let query_string = req. query_string ( ) ;
@@ -365,19 +355,18 @@ pub async fn get_stats(
365355
366356pub async fn get_stream_info ( stream_name : Path < String > ) -> Result < impl Responder , StreamError > {
367357 let stream_name = stream_name. into_inner ( ) ;
368- if !PARSEABLE . streams . contains ( & stream_name) {
369- if PARSEABLE . options . mode == Mode :: Query {
370- match PARSEABLE
358+ // For query mode, if the stream not found in memory map,
359+ //check if it exists in the storage
360+ //create stream and schema from storage
361+ if !PARSEABLE . streams . contains ( & stream_name)
362+ && ( PARSEABLE . options . mode != Mode :: Query
363+ || !PARSEABLE
371364 . create_stream_and_schema_from_storage ( & stream_name)
372- . await
373- {
374- Ok ( true ) => { }
375- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
376- }
377- } else {
378- return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
379- }
365+ . await ?)
366+ {
367+ return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ;
380368 }
369+
381370 let storage = PARSEABLE . storage . get_object_store ( ) ;
382371 // if first_event_at is not found in memory map, check if it exists in the storage
383372 // if it exists in the storage, update the first_event_at in memory map
@@ -426,14 +415,12 @@ pub async fn put_stream_hot_tier(
426415 // For query mode, if the stream not found in memory map,
427416 //check if it exists in the storage
428417 //create stream and schema from storage
429- if PARSEABLE . options . mode == Mode :: Query {
430- match PARSEABLE
418+ if PARSEABLE . options . mode == Mode :: Query
419+ && ! PARSEABLE
431420 . create_stream_and_schema_from_storage ( & stream_name)
432- . await
433- {
434- Ok ( true ) => { }
435- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
436- }
421+ . await ?
422+ {
423+ return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
437424 }
438425
439426 let stream = PARSEABLE . get_stream ( & stream_name) ?;
@@ -476,21 +463,16 @@ pub async fn put_stream_hot_tier(
476463pub async fn get_stream_hot_tier ( stream_name : Path < String > ) -> Result < impl Responder , StreamError > {
477464 let stream_name = stream_name. into_inner ( ) ;
478465
479- if ! PARSEABLE . streams . contains ( & stream_name ) {
480- // For query mode, if the stream not found in memory map,
481- //check if it exists in the storage
482- //create stream and schema from storage
483- if PARSEABLE . options . mode == Mode :: Query {
484- match PARSEABLE
466+ // For query mode, if the stream not found in memory map,
467+ //check if it exists in the storage
468+ //create stream and schema from storage
469+ if ! PARSEABLE . streams . contains ( & stream_name )
470+ && ( PARSEABLE . options . mode != Mode :: Query
471+ || ! PARSEABLE
485472 . create_stream_and_schema_from_storage ( & stream_name)
486- . await
487- {
488- Ok ( true ) => { }
489- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
490- }
491- } else {
492- return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
493- }
473+ . await ?)
474+ {
475+ return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ;
494476 }
495477
496478 let Some ( hot_tier_manager) = HotTierManager :: global ( ) else {
@@ -509,14 +491,12 @@ pub async fn delete_stream_hot_tier(
509491 // For query mode, if the stream not found in memory map,
510492 //check if it exists in the storage
511493 //create stream and schema from storage
512- if PARSEABLE . options . mode == Mode :: Query {
513- match PARSEABLE
494+ if PARSEABLE . options . mode == Mode :: Query
495+ && ! PARSEABLE
514496 . create_stream_and_schema_from_storage ( & stream_name)
515- . await
516- {
517- Ok ( true ) => { }
518- Ok ( false ) | Err ( _) => return Err ( StreamNotFound ( stream_name. clone ( ) ) . into ( ) ) ,
519- }
497+ . await ?
498+ {
499+ return Err ( StreamNotFound ( stream_name) . into ( ) ) ;
520500 }
521501
522502 if PARSEABLE . get_stream ( & stream_name) ?. get_stream_type ( ) == StreamType :: Internal {
0 commit comments