15
15
from __future__ import annotations
16
16
17
17
import datetime
18
- from functools import singledispatch
19
18
import typing
20
19
21
20
import bigframes_vendored .pandas .core .window .rolling as vendored_pandas_rolling
24
23
25
24
from bigframes import dtypes
26
25
from bigframes .core import expression as ex
27
- from bigframes .core import log_adapter , nodes , ordering , window_spec
26
+ from bigframes .core import log_adapter , ordering , window_spec
28
27
import bigframes .core .blocks as blocks
28
+ from bigframes .core .window import ordering as window_ordering
29
29
import bigframes .operations .aggregations as agg_ops
30
30
31
31
@@ -140,7 +140,9 @@ def create_range_window(
140
140
if index_dtypes [0 ] != dtypes .TIMESTAMP_DTYPE :
141
141
raise ValueError ("Index type should be timestamps with timezones" )
142
142
143
- order_direction = _find_order_direction (block .expr .node , block .index_columns [0 ])
143
+ order_direction = window_ordering .find_order_direction (
144
+ block .expr .node , block .index_columns [0 ]
145
+ )
144
146
if order_direction is None :
145
147
raise ValueError (
146
148
"The index might not be in a monotonic order. Please sort the index before rolling."
@@ -157,48 +159,3 @@ def create_range_window(
157
159
),
158
160
)
159
161
return Window (block , spec , block .value_columns , is_series = is_series )
160
-
161
-
162
- @singledispatch
163
- def _find_order_direction (
164
- root : nodes .BigFrameNode , column_id : str
165
- ) -> ordering .OrderingDirection | None :
166
- """Returns the order of the given column with tree traversal. If the column cannot be found,
167
- or the ordering information is not available, return None.
168
- """
169
- return None
170
-
171
-
172
- @_find_order_direction .register
173
- def _ (root : nodes .OrderByNode , column_id : str ):
174
- if len (root .by ) == 0 :
175
- return None
176
-
177
- # Make sure the window key is the prefix of sorting keys.
178
- order_expr = root .by [0 ]
179
- scalar_expr = order_expr .scalar_expression
180
- if isinstance (scalar_expr , ex .DerefOp ) and scalar_expr .id .name == column_id :
181
- return order_expr .direction
182
-
183
- return None
184
-
185
-
186
- @_find_order_direction .register
187
- def _ (root : nodes .ReversedNode , column_id : str ):
188
- direction = _find_order_direction (root .child , column_id )
189
-
190
- if direction is None :
191
- return None
192
- return direction .reverse ()
193
-
194
-
195
- @_find_order_direction .register
196
- def _ (root : nodes .SelectionNode , column_id : str ):
197
- for alias_ref in root .input_output_pairs :
198
- if alias_ref .id .name == column_id :
199
- return _find_order_direction (root .child , alias_ref .ref .id .name )
200
-
201
-
202
- @_find_order_direction .register
203
- def _ (root : nodes .FilterNode , column_id : str ):
204
- return _find_order_direction (root .child , column_id )
0 commit comments