diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9bb44ca61..6c8dc50056 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -590,6 +590,7 @@ def to_pandas_batches( page_size: Optional[int] = None, max_results: Optional[int] = None, allow_large_results: Optional[bool] = None, + squeeze: Optional[bool] = False, ): """Download results one message at a time. @@ -605,7 +606,10 @@ def to_pandas_batches( for record_batch in execute_result.arrow_batches(): df = io_pandas.arrow_to_pandas(record_batch, self.expr.schema) self._copy_index_to_pandas(df) - yield df + if squeeze: + yield df.squeeze(axis=1) + else: + yield df def _copy_index_to_pandas(self, df: pd.DataFrame): """Set the index on pandas DataFrame to match this block. diff --git a/bigframes/series.py b/bigframes/series.py index 817aef0c2a..305bc93a09 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -23,7 +23,18 @@ import numbers import textwrap import typing -from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import ( + Any, + cast, + Iterable, + List, + Literal, + Mapping, + Optional, + Sequence, + Tuple, + Union, +) import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.series as vendored_pandas_series @@ -478,6 +489,70 @@ def to_pandas( series.name = self._name return series + def to_pandas_batches( + self, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + *, + allow_large_results: Optional[bool] = None, + ) -> Iterable[pandas.Series]: + """Stream Series results to an iterable of pandas Series. + + page_size and max_results determine the size and number of batches, + see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> s = bpd.Series([4, 3, 2, 2, 3]) + + Iterate through the results in batches, limiting the total rows yielded + across all batches via `max_results`: + + >>> for s_batch in s.to_pandas_batches(max_results=3): + ... print(s_batch) + 0 4 + 1 3 + 2 2 + dtype: Int64 + + Alternatively, control the approximate size of each batch using `page_size` + and fetch batches manually using `next()`: + + >>> it = s.to_pandas_batches(page_size=2) + >>> next(it) + 0 4 + 1 3 + dtype: Int64 + >>> next(it) + 2 2 + 3 2 + dtype: Int64 + + Args: + page_size (int, default None): + The maximum number of rows of each batch. Non-positive values are ignored. + max_results (int, default None): + The maximum total number of rows of all batches. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. + + Returns: + Iterable[pandas.Series]: + An iterable of smaller Series which combine to + form the original Series. Results stream from bigquery, + see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable + """ + df = self._block.to_pandas_batches( + page_size=page_size, + max_results=max_results, + allow_large_results=allow_large_results, + squeeze=True, + ) + return df + def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run((self._value_column,)) return query_job diff --git a/tests/system/small/test_series_io.py b/tests/system/small/test_series_io.py index edc32f824f..8a699aed73 100644 --- a/tests/system/small/test_series_io.py +++ b/tests/system/small/test_series_io.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pandas as pd +import pytest + import bigframes @@ -32,3 +35,51 @@ def test_to_pandas_override_global_option(scalars_df_index): bf_series.to_pandas(allow_large_results=False) assert bf_series._query_job.destination.table_id == table_id assert session._metrics.execution_count - execution_count == 1 + + +@pytest.mark.parametrize( + ("page_size", "max_results", "allow_large_results"), + [ + pytest.param(None, None, True), + pytest.param(2, None, False), + pytest.param(None, 1, True), + pytest.param(2, 5, False), + pytest.param(3, 6, True), + pytest.param(3, 100, False), + pytest.param(100, 100, True), + ], +) +def test_to_pandas_batches(scalars_dfs, page_size, max_results, allow_large_results): + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df["int64_col"] + pd_series = scalars_pandas_df["int64_col"] + + total_rows = 0 + expected_total_rows = ( + min(max_results, len(pd_series)) if max_results else len(pd_series) + ) + + hit_last_page = False + for s in bf_series.to_pandas_batches( + page_size=page_size, + max_results=max_results, + allow_large_results=allow_large_results, + ): + assert not hit_last_page + + actual_rows = s.shape[0] + expected_rows = ( + min(page_size, expected_total_rows) if page_size else expected_total_rows + ) + + assert actual_rows <= expected_rows + if actual_rows < expected_rows: + assert page_size + hit_last_page = True + + pd.testing.assert_series_equal( + s, pd_series[total_rows : total_rows + actual_rows] + ) + total_rows += actual_rows + + assert total_rows == expected_total_rows