diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index c7eaafe3de..071365199c 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -58,18 +58,19 @@ class ArrayValue: @classmethod def from_pyarrow(cls, arrow_table: pa.Table, session: Session): - adapted_table = local_data.adapt_pa_table(arrow_table) - schema = local_data.arrow_schema_to_bigframes(adapted_table.schema) + data_source = local_data.ManagedArrowTable.from_pyarrow(arrow_table) + return cls.from_managed(source=data_source, session=session) + @classmethod + def from_managed(cls, source: local_data.ManagedArrowTable, session: Session): scan_list = nodes.ScanList( tuple( nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) - for item in schema.items + for item in source.schema.items ) ) - data_source = local_data.ManagedArrowTable(adapted_table, schema) node = nodes.ReadLocalNode( - data_source, + source, session=session, scan_list=scan_list, ) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9bb44ca61..d56147cd47 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -53,6 +53,7 @@ from bigframes import session from bigframes._config import sampling_options import bigframes.constants +from bigframes.core import local_data import bigframes.core as core import bigframes.core.compile.googlesql as googlesql import bigframes.core.expression as ex @@ -187,8 +188,8 @@ def from_local( pd_data = pd_data.set_axis(column_ids, axis=1) pd_data = pd_data.reset_index(names=index_ids) - as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False) - array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session) + managed_data = local_data.ManagedArrowTable.from_pandas(pd_data) + array_value = core.ArrayValue.from_managed(managed_data, session=session) block = cls( array_value, column_labels=column_labels, diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 3a4b7ac0d9..b21f12ff94 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -18,8 +18,12 @@ import dataclasses import functools +from typing import cast, Union import uuid +import geopandas # type: ignore +import numpy as np +import pandas import pyarrow as pa import bigframes.core.schema as schemata @@ -32,51 +36,113 @@ class LocalTableMetadata: row_count: int @classmethod - def from_arrow(cls, table: pa.Table): + def from_arrow(cls, table: pa.Table) -> LocalTableMetadata: return cls(total_bytes=table.nbytes, row_count=table.num_rows) +_MANAGED_STORAGE_TYPES_OVERRIDES: dict[bigframes.dtypes.Dtype, pa.DataType] = { + # wkt to be precise + bigframes.dtypes.GEO_DTYPE: pa.string() +} + + @dataclasses.dataclass(frozen=True) class ManagedArrowTable: data: pa.Table = dataclasses.field(hash=False) schema: schemata.ArraySchema = dataclasses.field(hash=False) id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + def __post_init__(self): + self.validate() + @functools.cached_property - def metadata(self): + def metadata(self) -> LocalTableMetadata: return LocalTableMetadata.from_arrow(self.data) - -def arrow_schema_to_bigframes(arrow_schema: pa.Schema) -> schemata.ArraySchema: - """Infer the corresponding bigframes schema given a pyarrow schema.""" - schema_items = tuple( - schemata.SchemaItem( - field.name, - bigframes_type_for_arrow_type(field.type), + @classmethod + def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable: + """Creates managed table from pandas. Ignores index, col names must be unique strings""" + columns: list[pa.ChunkedArray] = [] + fields: list[schemata.SchemaItem] = [] + column_names = list(dataframe.columns) + assert len(column_names) == len(set(column_names)) + + for name, col in dataframe.items(): + new_arr, bf_type = _adapt_pandas_series(col) + columns.append(new_arr) + fields.append(schemata.SchemaItem(str(name), bf_type)) + + return ManagedArrowTable( + pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields)) ) - for field in arrow_schema - ) - return schemata.ArraySchema(schema_items) + @classmethod + def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable: + columns: list[pa.ChunkedArray] = [] + fields: list[schemata.SchemaItem] = [] + for name, arr in zip(table.column_names, table.columns): + new_arr, bf_type = _adapt_arrow_array(arr) + columns.append(new_arr) + fields.append(schemata.SchemaItem(name, bf_type)) + + return ManagedArrowTable( + pa.table(columns, names=table.column_names), + schemata.ArraySchema(tuple(fields)), + ) -def adapt_pa_table(arrow_table: pa.Table) -> pa.Table: - """Adapt a pyarrow table to one that can be handled by bigframes. Converts tz to UTC and unit to us for temporal types.""" - new_schema = pa.schema( - [ - pa.field(field.name, arrow_type_replacements(field.type)) - for field in arrow_table.schema - ] - ) - return arrow_table.cast(new_schema) + def validate(self): + # TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type + for bf_field, arrow_field in zip(self.schema.items, self.data.schema): + expected_arrow_type = _get_managed_storage_type(bf_field.dtype) + arrow_type = arrow_field.type + if expected_arrow_type != arrow_type: + raise TypeError( + f"Field {bf_field} has arrow array type: {arrow_type}, expected type: {expected_arrow_type}" + ) -def bigframes_type_for_arrow_type(pa_type: pa.DataType) -> bigframes.dtypes.Dtype: - return bigframes.dtypes.arrow_dtype_to_bigframes_dtype( - arrow_type_replacements(pa_type) - ) +def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType: + if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys(): + return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype] + else: + return bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype) + + +def _adapt_pandas_series( + series: pandas.Series, +) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: + # Mostly rely on pyarrow conversions, but have to convert geo without its help. + if series.dtype == bigframes.dtypes.GEO_DTYPE: + series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1) + return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE + try: + return _adapt_arrow_array(pa.array(series)) + except Exception as e: + if series.dtype == np.dtype("O"): + try: + series = series.astype(bigframes.dtypes.GEO_DTYPE) + except TypeError: + pass + raise e + + +def _adapt_arrow_array( + array: Union[pa.ChunkedArray, pa.Array] +) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: + target_type = _arrow_type_replacements(array.type) + if target_type != array.type: + # TODO: Maybe warn if lossy conversion? + array = array.cast(target_type) + bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) + storage_type = _get_managed_storage_type(bf_type) + if storage_type != array.type: + raise TypeError( + f"Expected {bf_type} to use arrow {storage_type}, instead got {array.type}" + ) + return array, bf_type -def arrow_type_replacements(type: pa.DataType) -> pa.DataType: +def _arrow_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_timestamp(type): # This is potentially lossy, but BigFrames doesn't support ns new_tz = "UTC" if (type.tz is not None) else None @@ -91,8 +157,6 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: return pa.decimal128(38, 9) if pa.types.is_decimal256(type): return pa.decimal256(76, 38) - if pa.types.is_dictionary(type): - return arrow_type_replacements(type.value_type) if pa.types.is_large_string(type): # simple string type can handle the largest strings needed return pa.string() @@ -100,9 +164,17 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: # null as a type not allowed, default type is float64 for bigframes return pa.float64() if pa.types.is_list(type): - new_field_t = arrow_type_replacements(type.value_type) + new_field_t = _arrow_type_replacements(type.value_type) if new_field_t != type.value_type: return pa.list_(new_field_t) return type + if pa.types.is_struct(type): + struct_type = cast(pa.StructType, type) + new_fields: list[pa.Field] = [] + for i in range(struct_type.num_fields): + field = struct_type.field(i) + field.with_type(_arrow_type_replacements(field.type)) + new_fields.append(field.with_type(_arrow_type_replacements(field.type))) + return pa.struct(new_fields) else: return type diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 22cc521e8e..194da57ac1 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -456,6 +456,8 @@ def bigframes_dtype_to_arrow_dtype( if bigframes_dtype in _BIGFRAMES_TO_ARROW: return _BIGFRAMES_TO_ARROW[bigframes_dtype] if isinstance(bigframes_dtype, pd.ArrowDtype): + if pa.types.is_duration(bigframes_dtype.pyarrow_dtype): + return bigframes_dtype.pyarrow_dtype if pa.types.is_list(bigframes_dtype.pyarrow_dtype): return bigframes_dtype.pyarrow_dtype if pa.types.is_struct(bigframes_dtype.pyarrow_dtype): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 72129d594e..4304e33f35 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -108,11 +108,6 @@ logger = logging.getLogger(__name__) -NON_INLINABLE_DTYPES: Sequence[bigframes.dtypes.Dtype] = ( - # Currently excluded as doesn't have arrow type - bigframes.dtypes.GEO_DTYPE, -) - class Session( third_party_pandas_gbq.GBQIOMixin, @@ -838,17 +833,6 @@ def _read_pandas_inline( f"Could not convert with a BigQuery type: `{exc}`. " ) from exc - # Make sure all types are inlinable to avoid escaping errors. - inline_types = inline_df._block.expr.schema.dtypes - noninlinable_types = [ - dtype for dtype in inline_types if dtype in NON_INLINABLE_DTYPES - ] - if len(noninlinable_types) != 0: - raise ValueError( - f"Could not inline with a BigQuery type: `{noninlinable_types}`. " - f"{constants.FEEDBACK_LINK}" - ) - return inline_df def _read_pandas_load_job( diff --git a/tests/unit/session/test_io_pandas.py b/tests/unit/session/test_io_pandas.py index 24155437fe..2fa07aed35 100644 --- a/tests/unit/session/test_io_pandas.py +++ b/tests/unit/session/test_io_pandas.py @@ -24,7 +24,6 @@ import pandas.testing import pyarrow # type: ignore import pytest -import shapely # type: ignore import bigframes.core.schema import bigframes.features @@ -504,17 +503,3 @@ def test_read_pandas_with_bigframes_dataframe(): ValueError, match=re.escape("read_pandas() expects a pandas.DataFrame") ): session.read_pandas(df) - - -def test_read_pandas_inline_w_noninlineable_type_raises_error(): - session = resources.create_bigquery_session() - data = [ - shapely.Point(1, 1), - shapely.Point(2, 1), - shapely.Point(1, 2), - ] - s = pandas.Series(data, dtype=geopandas.array.GeometryDtype()) - with pytest.raises( - ValueError, match="Could not (convert|inline) with a BigQuery type:" - ): - session.read_pandas(s, write_engine="bigquery_inline") diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py index 305314edf0..6e98d6a9e1 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py @@ -813,10 +813,8 @@ def visit_DefaultLiteral(self, op, *, value, dtype): elif dtype.is_json(): return sge.ParseJSON(this=sge.convert(str(value))) elif dtype.is_geospatial(): - args = [value.wkt] - if (srid := dtype.srid) is not None: - args.append(srid) - return self.f.st_geomfromtext(*args) + wkt = value if isinstance(value, str) else value.wkt + return self.f.st_geogfromtext(wkt) raise NotImplementedError(f"Unsupported type: {dtype!r}")