@@ -5,6 +5,9 @@ import pkg/chronicles
55import pkg/ datastore/ typedds
66
77import ../ utils/ asynciter
8+ import ../ utils/ safeasynciter
9+
10+ {.push raises : [].}
811
912type KeyVal * [T] = tuple [key: Key , value: T]
1013
@@ -42,6 +45,40 @@ proc toAsyncIter*[T](
4245
4346 AsyncIter [?! QueryResponse [T]].new (genNext, isFinished).success
4447
48+ proc toSafeAsyncIter * [T](
49+ queryIter: QueryIter [T], finishOnErr: bool = true
50+ ): Future [?! SafeAsyncIter [QueryResponse [T]]] {.async : (raises: [CancelledError ]).} =
51+ # # Converts `QueryIter[T]` to `SafeAsyncIter[QueryResponse[T]]` and automatically
52+ # # runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
53+ # # if the flag finishOnErr is set to true)
54+ # #
55+
56+ if queryIter.finished:
57+ trace " Disposing iterator"
58+ if error =? (await queryIter.dispose ()).errorOption:
59+ return failure (error)
60+ return success (SafeAsyncIter [QueryResponse [T]].empty ())
61+
62+ var errOccurred = false
63+
64+ proc genNext (): Future [?! QueryResponse [T]] {.async : (raises: [CancelledError ]).} =
65+ let queryResOrErr = await queryIter.next ()
66+
67+ if queryResOrErr.isErr:
68+ errOccurred = true
69+
70+ if queryIter.finished or (errOccurred and finishOnErr):
71+ trace " Disposing iterator"
72+ if error =? (await queryIter.dispose ()).errorOption:
73+ return failure (error)
74+
75+ return queryResOrErr
76+
77+ proc isFinished (): bool =
78+ queryIter.finished
79+
80+ SafeAsyncIter [QueryResponse [T]].new (genNext, isFinished).success
81+
4582proc filterSuccess * [T](
4683 iter: AsyncIter [?! QueryResponse [T]]
4784): Future [AsyncIter [tuple [key: Key , value: T]]] {.async : (raises: [CancelledError ]).} =
@@ -62,7 +99,30 @@ proc filterSuccess*[T](
6299
63100 (key: key, value: value).some
64101
65- try :
66- await mapFilter [?! QueryResponse [T], KeyVal [T]](iter, mapping)
67- except CatchableError as err:
68- raiseAssert err.msg
102+ await mapFilter [?! QueryResponse [T], KeyVal [T]](iter, mapping)
103+
104+ proc filterSuccess * [T](
105+ iter: SafeAsyncIter [QueryResponse [T]]
106+ ): Future [SafeAsyncIter [tuple [key: Key , value: T]]] {.
107+ async : (raises: [CancelledError ])
108+ .} =
109+ # # Filters out any items that are not success
110+
111+ proc mapping (
112+ resOrErr: ?! QueryResponse [T]
113+ ): Future [Option [?! KeyVal [T]]] {.async : (raises: [CancelledError ]).} =
114+ without res =? resOrErr, error:
115+ error " Error occurred when getting QueryResponse" , msg = error.msg
116+ return Result [KeyVal [T], ref CatchableError ].none
117+
118+ without key =? res.key:
119+ warn " No key for a QueryResponse"
120+ return Result [KeyVal [T], ref CatchableError ].none
121+
122+ without value =? res.value, error:
123+ error " Error occurred when getting a value from QueryResponse" , msg = error.msg
124+ return Result [KeyVal [T], ref CatchableError ].none
125+
126+ some (success ((key: key, value: value)))
127+
128+ await mapFilter [QueryResponse [T], KeyVal [T]](iter, mapping)
0 commit comments