Skip to content

feat: Improve local data validation #1598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ class ArrayValue:

@classmethod
def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
adapted_table = local_data.adapt_pa_table(arrow_table)
schema = local_data.arrow_schema_to_bigframes(adapted_table.schema)
data_source = local_data.ManagedArrowTable.from_pyarrow(arrow_table)
return cls.from_managed(source=data_source, session=session)

@classmethod
def from_managed(cls, source: local_data.ManagedArrowTable, session: Session):
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
for item in schema.items
for item in source.schema.items
)
)
data_source = local_data.ManagedArrowTable(adapted_table, schema)
node = nodes.ReadLocalNode(
data_source,
source,
session=session,
scan_list=scan_list,
)
Expand Down
5 changes: 3 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from bigframes import session
from bigframes._config import sampling_options
import bigframes.constants
from bigframes.core import local_data
import bigframes.core as core
import bigframes.core.compile.googlesql as googlesql
import bigframes.core.expression as ex
Expand Down Expand Up @@ -187,8 +188,8 @@ def from_local(

pd_data = pd_data.set_axis(column_ids, axis=1)
pd_data = pd_data.reset_index(names=index_ids)
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
managed_data = local_data.ManagedArrowTable.from_pandas(pd_data)
array_value = core.ArrayValue.from_managed(managed_data, session=session)
block = cls(
array_value,
column_labels=column_labels,
Expand Down
130 changes: 101 additions & 29 deletions bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import dataclasses
import functools
from typing import cast, Union
import uuid

import geopandas # type: ignore
import numpy as np
import pandas
import pyarrow as pa

import bigframes.core.schema as schemata
Expand All @@ -32,51 +36,113 @@ class LocalTableMetadata:
row_count: int

@classmethod
def from_arrow(cls, table: pa.Table):
def from_arrow(cls, table: pa.Table) -> LocalTableMetadata:
return cls(total_bytes=table.nbytes, row_count=table.num_rows)


_MANAGED_STORAGE_TYPES_OVERRIDES: dict[bigframes.dtypes.Dtype, pa.DataType] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we use ManagedArrowTable (leveraging PyArrow) for local data, should we centralize our conversion logic based on that? Specifically, DataFrameAndLabels also handles conversions (Pandas <-> BigQuery), unifying these under a single class seems beneficial. If we standardize on PyArrow as the intermediate "transfer station," the Pandas-to-BigQuery conversion could simply become a Pandas -> Arrow -> BigQuery process, simplifying the overall system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my plan is that everything goes through local managed storage, to ensure consistent normalization and validation.

# wkt to be precise
bigframes.dtypes.GEO_DTYPE: pa.string()
}


@dataclasses.dataclass(frozen=True)
class ManagedArrowTable:
data: pa.Table = dataclasses.field(hash=False)
schema: schemata.ArraySchema = dataclasses.field(hash=False)
id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4)

def __post_init__(self):
self.validate()

@functools.cached_property
def metadata(self):
def metadata(self) -> LocalTableMetadata:
return LocalTableMetadata.from_arrow(self.data)


def arrow_schema_to_bigframes(arrow_schema: pa.Schema) -> schemata.ArraySchema:
"""Infer the corresponding bigframes schema given a pyarrow schema."""
schema_items = tuple(
schemata.SchemaItem(
field.name,
bigframes_type_for_arrow_type(field.type),
@classmethod
def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable:
"""Creates managed table from pandas. Ignores index, col names must be unique strings"""
columns: list[pa.ChunkedArray] = []
fields: list[schemata.SchemaItem] = []
column_names = list(dataframe.columns)
assert len(column_names) == len(set(column_names))

for name, col in dataframe.items():
new_arr, bf_type = _adapt_pandas_series(col)
columns.append(new_arr)
fields.append(schemata.SchemaItem(str(name), bf_type))

return ManagedArrowTable(
pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields))
)
for field in arrow_schema
)
return schemata.ArraySchema(schema_items)

@classmethod
def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable:
columns: list[pa.ChunkedArray] = []
fields: list[schemata.SchemaItem] = []
for name, arr in zip(table.column_names, table.columns):
new_arr, bf_type = _adapt_arrow_array(arr)
columns.append(new_arr)
fields.append(schemata.SchemaItem(name, bf_type))

return ManagedArrowTable(
pa.table(columns, names=table.column_names),
schemata.ArraySchema(tuple(fields)),
)

def adapt_pa_table(arrow_table: pa.Table) -> pa.Table:
"""Adapt a pyarrow table to one that can be handled by bigframes. Converts tz to UTC and unit to us for temporal types."""
new_schema = pa.schema(
[
pa.field(field.name, arrow_type_replacements(field.type))
for field in arrow_table.schema
]
)
return arrow_table.cast(new_schema)
def validate(self):
# TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type
for bf_field, arrow_field in zip(self.schema.items, self.data.schema):
expected_arrow_type = _get_managed_storage_type(bf_field.dtype)
arrow_type = arrow_field.type
if expected_arrow_type != arrow_type:
raise TypeError(
f"Field {bf_field} has arrow array type: {arrow_type}, expected type: {expected_arrow_type}"
)


def bigframes_type_for_arrow_type(pa_type: pa.DataType) -> bigframes.dtypes.Dtype:
return bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
arrow_type_replacements(pa_type)
)
def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType:
if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys():
return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype]
else:
return bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype)


def _adapt_pandas_series(
series: pandas.Series,
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
# Mostly rely on pyarrow conversions, but have to convert geo without its help.
if series.dtype == bigframes.dtypes.GEO_DTYPE:
series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1)
return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE
try:
return _adapt_arrow_array(pa.array(series))
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of catching a generic Exception, what specific exception types should I expect when performing geographic data type conversions? Could TypeError be one of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple TypeError, but also ArrowInvalid. I don't really trust it not to change though? the docs don't specify what error types they will give you.

if series.dtype == np.dtype("O"):
try:
series = series.astype(bigframes.dtypes.GEO_DTYPE)
except TypeError:
pass
raise e


def _adapt_arrow_array(
array: Union[pa.ChunkedArray, pa.Array]
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
target_type = _arrow_type_replacements(array.type)
if target_type != array.type:
# TODO: Maybe warn if lossy conversion?
array = array.cast(target_type)
bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type)
storage_type = _get_managed_storage_type(bf_type)
if storage_type != array.type:
raise TypeError(
f"Expected {bf_type} to use arrow {storage_type}, instead got {array.type}"
)
return array, bf_type


def arrow_type_replacements(type: pa.DataType) -> pa.DataType:
def _arrow_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_timestamp(type):
# This is potentially lossy, but BigFrames doesn't support ns
new_tz = "UTC" if (type.tz is not None) else None
Expand All @@ -91,18 +157,24 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType:
return pa.decimal128(38, 9)
if pa.types.is_decimal256(type):
return pa.decimal256(76, 38)
if pa.types.is_dictionary(type):
return arrow_type_replacements(type.value_type)
if pa.types.is_large_string(type):
# simple string type can handle the largest strings needed
return pa.string()
if pa.types.is_null(type):
# null as a type not allowed, default type is float64 for bigframes
return pa.float64()
if pa.types.is_list(type):
new_field_t = arrow_type_replacements(type.value_type)
new_field_t = _arrow_type_replacements(type.value_type)
if new_field_t != type.value_type:
return pa.list_(new_field_t)
return type
if pa.types.is_struct(type):
struct_type = cast(pa.StructType, type)
new_fields: list[pa.Field] = []
for i in range(struct_type.num_fields):
field = struct_type.field(i)
field.with_type(_arrow_type_replacements(field.type))
new_fields.append(field.with_type(_arrow_type_replacements(field.type)))
return pa.struct(new_fields)
else:
return type
2 changes: 2 additions & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ def bigframes_dtype_to_arrow_dtype(
if bigframes_dtype in _BIGFRAMES_TO_ARROW:
return _BIGFRAMES_TO_ARROW[bigframes_dtype]
if isinstance(bigframes_dtype, pd.ArrowDtype):
if pa.types.is_duration(bigframes_dtype.pyarrow_dtype):
return bigframes_dtype.pyarrow_dtype
if pa.types.is_list(bigframes_dtype.pyarrow_dtype):
return bigframes_dtype.pyarrow_dtype
if pa.types.is_struct(bigframes_dtype.pyarrow_dtype):
Expand Down
16 changes: 0 additions & 16 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@

logger = logging.getLogger(__name__)

NON_INLINABLE_DTYPES: Sequence[bigframes.dtypes.Dtype] = (
# Currently excluded as doesn't have arrow type
bigframes.dtypes.GEO_DTYPE,
)


class Session(
third_party_pandas_gbq.GBQIOMixin,
Expand Down Expand Up @@ -838,17 +833,6 @@ def _read_pandas_inline(
f"Could not convert with a BigQuery type: `{exc}`. "
) from exc

# Make sure all types are inlinable to avoid escaping errors.
inline_types = inline_df._block.expr.schema.dtypes
noninlinable_types = [
dtype for dtype in inline_types if dtype in NON_INLINABLE_DTYPES
]
if len(noninlinable_types) != 0:
raise ValueError(
f"Could not inline with a BigQuery type: `{noninlinable_types}`. "
f"{constants.FEEDBACK_LINK}"
)

return inline_df

def _read_pandas_load_job(
Expand Down
15 changes: 0 additions & 15 deletions tests/unit/session/test_io_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import pandas.testing
import pyarrow # type: ignore
import pytest
import shapely # type: ignore

import bigframes.core.schema
import bigframes.features
Expand Down Expand Up @@ -504,17 +503,3 @@ def test_read_pandas_with_bigframes_dataframe():
ValueError, match=re.escape("read_pandas() expects a pandas.DataFrame")
):
session.read_pandas(df)


def test_read_pandas_inline_w_noninlineable_type_raises_error():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you don't want to remove this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything is inlinable now, no way to get this error anymore

session = resources.create_bigquery_session()
data = [
shapely.Point(1, 1),
shapely.Point(2, 1),
shapely.Point(1, 2),
]
s = pandas.Series(data, dtype=geopandas.array.GeometryDtype())
with pytest.raises(
ValueError, match="Could not (convert|inline) with a BigQuery type:"
):
session.read_pandas(s, write_engine="bigquery_inline")
Original file line number Diff line number Diff line change
Expand Up @@ -813,10 +813,8 @@ def visit_DefaultLiteral(self, op, *, value, dtype):
elif dtype.is_json():
return sge.ParseJSON(this=sge.convert(str(value)))
elif dtype.is_geospatial():
args = [value.wkt]
if (srid := dtype.srid) is not None:
args.append(srid)
return self.f.st_geomfromtext(*args)
wkt = value if isinstance(value, str) else value.wkt
return self.f.st_geogfromtext(wkt)

raise NotImplementedError(f"Unsupported type: {dtype!r}")

Expand Down