2222from __future__ import annotations
2323
2424import ast
25+ import copy
2526import dataclasses
2627import datetime
2728import functools
3031import textwrap
3132import typing
3233from typing import (
34+ Any ,
3335 Iterable ,
3436 List ,
3537 Literal ,
4951import pyarrow as pa
5052
5153from bigframes import session
52- import bigframes ._config . sampling_options as sampling_options
54+ from bigframes ._config import sampling_options
5355import bigframes .constants
5456import bigframes .core as core
5557import bigframes .core .compile .googlesql as googlesql
@@ -535,19 +537,9 @@ def to_pandas(
535537 Returns:
536538 pandas.DataFrame, QueryJob
537539 """
538- if (sampling_method is not None ) and (sampling_method not in _SAMPLING_METHODS ):
539- raise NotImplementedError (
540- f"The downsampling method { sampling_method } is not implemented, "
541- f"please choose from { ',' .join (_SAMPLING_METHODS )} ."
542- )
543-
544- sampling = bigframes .options .sampling .with_max_download_size (max_download_size )
545- if sampling_method is not None :
546- sampling = sampling .with_method (sampling_method ).with_random_state ( # type: ignore
547- random_state
548- )
549- else :
550- sampling = sampling .with_disabled ()
540+ sampling = self ._get_sampling_option (
541+ max_download_size , sampling_method , random_state
542+ )
551543
552544 df , query_job = self ._materialize_local (
553545 materialize_options = MaterializationOptions (
@@ -559,6 +551,27 @@ def to_pandas(
559551 df .set_axis (self .column_labels , axis = 1 , copy = False )
560552 return df , query_job
561553
554+ def _get_sampling_option (
555+ self ,
556+ max_download_size : Optional [int ] = None ,
557+ sampling_method : Optional [str ] = None ,
558+ random_state : Optional [int ] = None ,
559+ ) -> sampling_options .SamplingOptions :
560+
561+ if (sampling_method is not None ) and (sampling_method not in _SAMPLING_METHODS ):
562+ raise NotImplementedError (
563+ f"The downsampling method { sampling_method } is not implemented, "
564+ f"please choose from { ',' .join (_SAMPLING_METHODS )} ."
565+ )
566+
567+ sampling = bigframes .options .sampling .with_max_download_size (max_download_size )
568+ if sampling_method is None :
569+ return sampling .with_disabled ()
570+
571+ return sampling .with_method (sampling_method ).with_random_state ( # type: ignore
572+ random_state
573+ )
574+
562575 def try_peek (
563576 self , n : int = 20 , force : bool = False , allow_large_results = None
564577 ) -> typing .Optional [pd .DataFrame ]:
@@ -798,11 +811,73 @@ def split(
798811 return [sliced_block .drop_columns (drop_cols ) for sliced_block in sliced_blocks ]
799812
800813 def _compute_dry_run (
801- self , value_keys : Optional [Iterable [str ]] = None
802- ) -> bigquery .QueryJob :
814+ self ,
815+ value_keys : Optional [Iterable [str ]] = None ,
816+ * ,
817+ ordered : bool = True ,
818+ max_download_size : Optional [int ] = None ,
819+ sampling_method : Optional [str ] = None ,
820+ random_state : Optional [int ] = None ,
821+ ) -> typing .Tuple [pd .Series , bigquery .QueryJob ]:
822+ sampling = self ._get_sampling_option (
823+ max_download_size , sampling_method , random_state
824+ )
825+ if sampling .enable_downsampling :
826+ raise NotImplementedError ("Dry run with sampling is not supported" )
827+
828+ index : List [Any ] = []
829+ values : List [Any ] = []
830+
831+ index .append ("columnCount" )
832+ values .append (len (self .value_columns ))
833+ index .append ("columnDtypes" )
834+ values .append (
835+ {
836+ col : self .expr .get_column_type (self .resolve_label_exact_or_error (col ))
837+ for col in self .column_labels
838+ }
839+ )
840+
841+ index .append ("indexLevel" )
842+ values .append (self .index .nlevels )
843+ index .append ("indexDtypes" )
844+ values .append (self .index .dtypes )
845+
803846 expr = self ._apply_value_keys_to_expr (value_keys = value_keys )
804- query_job = self .session ._executor .dry_run (expr )
805- return query_job
847+ query_job = self .session ._executor .dry_run (expr , ordered )
848+ job_api_repr = copy .deepcopy (query_job ._properties )
849+
850+ job_ref = job_api_repr ["jobReference" ]
851+ for key , val in job_ref .items ():
852+ index .append (key )
853+ values .append (val )
854+
855+ index .append ("jobType" )
856+ values .append (job_api_repr ["configuration" ]["jobType" ])
857+
858+ query_config = job_api_repr ["configuration" ]["query" ]
859+ for key in ("destinationTable" , "useLegacySql" ):
860+ index .append (key )
861+ values .append (query_config .get (key ))
862+
863+ query_stats = job_api_repr ["statistics" ]["query" ]
864+ for key in (
865+ "referencedTables" ,
866+ "totalBytesProcessed" ,
867+ "cacheHit" ,
868+ "statementType" ,
869+ ):
870+ index .append (key )
871+ values .append (query_stats .get (key ))
872+
873+ index .append ("creationTime" )
874+ values .append (
875+ pd .Timestamp (
876+ job_api_repr ["statistics" ]["creationTime" ], unit = "ms" , tz = "UTC"
877+ )
878+ )
879+
880+ return pd .Series (values , index = index ), query_job
806881
807882 def _apply_value_keys_to_expr (self , value_keys : Optional [Iterable [str ]] = None ):
808883 expr = self ._expr
@@ -2703,11 +2778,18 @@ def to_pandas(
27032778 "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
27042779 )
27052780 ordered = ordered if ordered is not None else True
2781+
27062782 df , query_job = self ._block .select_columns ([]).to_pandas (
2707- ordered = ordered , allow_large_results = allow_large_results
2783+ ordered = ordered ,
2784+ allow_large_results = allow_large_results ,
27082785 )
27092786 return df .index , query_job
27102787
2788+ def _compute_dry_run (
2789+ self , * , ordered : bool = True
2790+ ) -> Tuple [pd .Series , bigquery .QueryJob ]:
2791+ return self ._block .select_columns ([])._compute_dry_run (ordered = ordered )
2792+
27112793 def resolve_level (self , level : LevelsType ) -> typing .Sequence [str ]:
27122794 if utils .is_list_like (level ):
27132795 levels = list (level )
0 commit comments