diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index 01917fd6d8..81637333b0 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -117,6 +117,12 @@ def model(self) -> bigquery.Model: """Get the BQML model associated with this wrapper""" return self._model + def recommend(self, input_data: bpd.DataFrame) -> bpd.DataFrame: + return self._apply_ml_tvf( + input_data, + self._model_manipulation_sql_generator.ml_recommend, + ) + def predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame: return self._apply_ml_tvf( input_data, diff --git a/bigframes/ml/decomposition.py b/bigframes/ml/decomposition.py index c98e18322a..ece950a5a2 100644 --- a/bigframes/ml/decomposition.py +++ b/bigframes/ml/decomposition.py @@ -19,6 +19,7 @@ from typing import List, Literal, Optional, Union +import bigframes_vendored.sklearn.decomposition._mf import bigframes_vendored.sklearn.decomposition._pca from google.cloud import bigquery @@ -27,7 +28,15 @@ import bigframes.pandas as bpd import bigframes.session -_BQML_PARAMS_MAPPING = {"svd_solver": "pcaSolver"} +_BQML_PARAMS_MAPPING = { + "svd_solver": "pcaSolver", + "feedback_type": "feedbackType", + "num_factors": "numFactors", + "user_col": "userColumn", + "item_col": "itemColumn", + "_input_label_columns": "inputLabelColumns", + "l2_reg": "l2Regularization", +} @log_adapter.class_logger @@ -197,3 +206,159 @@ def score( # TODO(b/291973741): X param is ignored. Update BQML supports input in ML.EVALUATE. return self._bqml_model.evaluate() + + +@log_adapter.class_logger +class MatrixFactorization( + base.UnsupervisedTrainablePredictor, + bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization, +): + __doc__ = bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization.__doc__ + + def __init__( + self, + *, + feedback_type: Literal["explicit", "implicit"] = "explicit", + num_factors: int, + user_col: str, + item_col: str, + rating_col: str = "rating", + # TODO: Add support for hyperparameter tuning. + l2_reg: float = 1.0, + ): + + feedback_type = feedback_type.lower() # type: ignore + if feedback_type not in ("explicit", "implicit"): + raise ValueError("Expected feedback_type to be `explicit` or `implicit`.") + + self.feedback_type = feedback_type + + if not isinstance(num_factors, int): + raise TypeError( + f"Expected num_factors to be an int, but got {type(num_factors)}." + ) + + if num_factors < 0: + raise ValueError( + f"Expected num_factors to be a positive integer, but got {num_factors}." + ) + + self.num_factors = num_factors + + if not isinstance(user_col, str): + raise TypeError(f"Expected user_col to be a str, but got {type(user_col)}.") + + self.user_col = user_col + + if not isinstance(item_col, str): + raise TypeError(f"Expected item_col to be STR, but got {type(item_col)}.") + + self.item_col = item_col + + if not isinstance(rating_col, str): + raise TypeError( + f"Expected rating_col to be a str, but got {type(rating_col)}." + ) + + self._input_label_columns = [rating_col] + + if not isinstance(l2_reg, (float, int)): + raise TypeError( + f"Expected l2_reg to be a float or int, but got {type(l2_reg)}." + ) + + self.l2_reg = l2_reg + self._bqml_model: Optional[core.BqmlModel] = None + self._bqml_model_factory = globals.bqml_model_factory() + + @property + def rating_col(self) -> str: + """str: The rating column name. Defaults to 'rating'.""" + return self._input_label_columns[0] + + @classmethod + def _from_bq( + cls, session: bigframes.session.Session, bq_model: bigquery.Model + ) -> MatrixFactorization: + assert bq_model.model_type == "MATRIX_FACTORIZATION" + + kwargs = utils.retrieve_params_from_bq_model( + cls, bq_model, _BQML_PARAMS_MAPPING + ) + + model = cls(**kwargs) + model._bqml_model = core.BqmlModel(session, bq_model) + return model + + @property + def _bqml_options(self) -> dict: + """The model options as they will be set for BQML""" + options: dict = { + "model_type": "matrix_factorization", + "feedback_type": self.feedback_type, + "user_col": self.user_col, + "item_col": self.item_col, + "rating_col": self.rating_col, + "l2_reg": self.l2_reg, + } + + if self.num_factors is not None: + options["num_factors"] = self.num_factors + + return options + + def _fit( + self, + X: utils.ArrayType, + y=None, + transforms: Optional[List[str]] = None, + ) -> MatrixFactorization: + if y is not None: + raise ValueError( + "Label column not supported for Matrix Factorization model but y was not `None`" + ) + + (X,) = utils.batch_convert_to_dataframe(X) + + self._bqml_model = self._bqml_model_factory.create_model( + X_train=X, + transforms=transforms, + options=self._bqml_options, + ) + return self + + def predict(self, X: utils.ArrayType) -> bpd.DataFrame: + if not self._bqml_model: + raise RuntimeError("A model must be fitted before recommend") + + (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) + + return self._bqml_model.recommend(X) + + def to_gbq(self, model_name: str, replace: bool = False) -> MatrixFactorization: + """Save the model to BigQuery. + + Args: + model_name (str): + The name of the model. + replace (bool, default False): + Determine whether to replace if the model already exists. Default to False. + + Returns: + MatrixFactorization: Saved model.""" + if not self._bqml_model: + raise RuntimeError("A model must be fitted before it can be saved") + + new_model = self._bqml_model.copy(model_name, replace) + return new_model.session.read_gbq_model(model_name) + + def score( + self, + X=None, + y=None, + ) -> bpd.DataFrame: + if not self._bqml_model: + raise RuntimeError("A model must be fitted before score") + + # TODO(b/291973741): X param is ignored. Update BQML supports input in ML.EVALUATE. + return self._bqml_model.evaluate() diff --git a/bigframes/ml/loader.py b/bigframes/ml/loader.py index 7ee558ad39..83c665a50b 100644 --- a/bigframes/ml/loader.py +++ b/bigframes/ml/loader.py @@ -42,6 +42,7 @@ "LINEAR_REGRESSION": linear_model.LinearRegression, "LOGISTIC_REGRESSION": linear_model.LogisticRegression, "KMEANS": cluster.KMeans, + "MATRIX_FACTORIZATION": decomposition.MatrixFactorization, "PCA": decomposition.PCA, "BOOSTED_TREE_REGRESSOR": ensemble.XGBRegressor, "BOOSTED_TREE_CLASSIFIER": ensemble.XGBClassifier, @@ -80,6 +81,7 @@ def from_bq( session: bigframes.session.Session, bq_model: bigquery.Model ) -> Union[ + decomposition.MatrixFactorization, decomposition.PCA, cluster.KMeans, linear_model.LinearRegression, diff --git a/bigframes/ml/sql.py b/bigframes/ml/sql.py index e89f17bcaa..a756fac3b9 100644 --- a/bigframes/ml/sql.py +++ b/bigframes/ml/sql.py @@ -299,6 +299,11 @@ def alter_model( return "\n".join(parts) # ML prediction TVFs + def ml_recommend(self, source_sql: str) -> str: + """Encode ML.RECOMMEND for BQML""" + return f"""SELECT * FROM ML.RECOMMEND(MODEL {self._model_ref_sql()}, + ({source_sql}))""" + def ml_predict(self, source_sql: str) -> str: """Encode ML.PREDICT for BQML""" return f"""SELECT * FROM ML.PREDICT(MODEL {self._model_ref_sql()}, diff --git a/tests/data/ratings.jsonl b/tests/data/ratings.jsonl new file mode 100644 index 0000000000..b7cd350d08 --- /dev/null +++ b/tests/data/ratings.jsonl @@ -0,0 +1,20 @@ +{"user_id": 1, "item_id": 2, "rating": 4.0} +{"user_id": 1, "item_id": 5, "rating": 3.0} +{"user_id": 2, "item_id": 1, "rating": 5.0} +{"user_id": 2, "item_id": 3, "rating": 2.0} +{"user_id": 3, "item_id": 4, "rating": 4.5} +{"user_id": 3, "item_id": 7, "rating": 3.5} +{"user_id": 4, "item_id": 2, "rating": 1.0} +{"user_id": 4, "item_id": 8, "rating": 5.0} +{"user_id": 5, "item_id": 3, "rating": 4.0} +{"user_id": 5, "item_id": 9, "rating": 2.5} +{"user_id": 6, "item_id": 1, "rating": 3.0} +{"user_id": 6, "item_id": 6, "rating": 4.5} +{"user_id": 7, "item_id": 5, "rating": 5.0} +{"user_id": 7, "item_id": 10, "rating": 1.5} +{"user_id": 8, "item_id": 4, "rating": 2.0} +{"user_id": 8, "item_id": 7, "rating": 4.0} +{"user_id": 9, "item_id": 2, "rating": 3.5} +{"user_id": 9, "item_id": 9, "rating": 5.0} +{"user_id": 10, "item_id": 3, "rating": 4.5} +{"user_id": 10, "item_id": 8, "rating": 2.5} diff --git a/tests/data/ratings_schema.json b/tests/data/ratings_schema.json new file mode 100644 index 0000000000..9fd0101ec8 --- /dev/null +++ b/tests/data/ratings_schema.json @@ -0,0 +1,17 @@ +[ + { + "mode": "NULLABLE", + "name": "user_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "item_id", + "type": "INT64" + }, + { + "mode": "NULLABLE", + "name": "rating", + "type": "FLOAT" + } +] diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a466b558b2..856a702fbf 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -320,6 +320,7 @@ def load_test_data_tables( ("repeated", "repeated_schema.json", "repeated.jsonl"), ("json", "json_schema.json", "json.jsonl"), ("penguins", "penguins_schema.json", "penguins.jsonl"), + ("ratings", "ratings_schema.json", "ratings.jsonl"), ("time_series", "time_series_schema.json", "time_series.jsonl"), ("hockey_players", "hockey_players.json", "hockey_players.jsonl"), ("matrix_2by3", "matrix_2by3.json", "matrix_2by3.jsonl"), @@ -416,6 +417,11 @@ def penguins_table_id(test_data_tables) -> str: return test_data_tables["penguins"] +@pytest.fixture(scope="session") +def ratings_table_id(test_data_tables) -> str: + return test_data_tables["ratings"] + + @pytest.fixture(scope="session") def urban_areas_table_id(test_data_tables) -> str: return test_data_tables["urban_areas"] @@ -769,6 +775,14 @@ def penguins_df_null_index( return unordered_session.read_gbq(penguins_table_id) +@pytest.fixture(scope="session") +def ratings_df_default_index( + ratings_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """DataFrame pointing at test data.""" + return session.read_gbq(ratings_table_id) + + @pytest.fixture(scope="session") def time_series_df_default_index( time_series_table_id: str, session: bigframes.Session diff --git a/tests/system/large/ml/test_decomposition.py b/tests/system/large/ml/test_decomposition.py index 49aa985189..d1a5f9f2aa 100644 --- a/tests/system/large/ml/test_decomposition.py +++ b/tests/system/large/ml/test_decomposition.py @@ -163,3 +163,49 @@ def test_decomposition_configure_fit_load_none_component( in reloaded_model._bqml_model.model_name ) assert reloaded_model.n_components == 7 + + +def test_decomposition_mf_configure_fit_load( + session, ratings_df_default_index, dataset_id +): + model = decomposition.MatrixFactorization( + num_factors=6, + feedback_type="explicit", + user_col="user_id", + item_col="item_id", + rating_col="rating", + l2_reg=9.83, + ) + + model.fit(ratings_df_default_index) + + reloaded_model = model.to_gbq( + f"{dataset_id}.temp_configured_mf_model", replace=True + ) + + new_ratings = session.read_pandas( + pd.DataFrame( + { + "user_id": ["11", "12", "13"], + "item_id": [1, 2, 3], + "rating": [1.0, 2.0, 3.0], + } + ) + ) + + reloaded_model.score(new_ratings) + + result = reloaded_model.predict(new_ratings).to_pandas() + + assert reloaded_model._bqml_model is not None + assert ( + f"{dataset_id}.temp_configured_mf_model" + in reloaded_model._bqml_model.model_name + ) + assert result is not None + assert reloaded_model.feedback_type == "explicit" + assert reloaded_model.num_factors == 6 + assert reloaded_model.user_col == "user_id" + assert reloaded_model.item_col == "item_id" + assert reloaded_model.rating_col == "rating" + assert reloaded_model.l2_reg == 9.83 diff --git a/tests/unit/ml/test_golden_sql.py b/tests/unit/ml/test_golden_sql.py index c9d147e18f..03695a20e4 100644 --- a/tests/unit/ml/test_golden_sql.py +++ b/tests/unit/ml/test_golden_sql.py @@ -20,7 +20,7 @@ import pytest_mock import bigframes -from bigframes.ml import core, linear_model +from bigframes.ml import core, decomposition, linear_model import bigframes.pandas as bpd TEMP_MODEL_ID = bigquery.ModelReference.from_string( @@ -80,6 +80,7 @@ def mock_X(mock_y, mock_session): ["index_column_id"], ["index_column_label"], ) + mock_X.reset_index(drop=True).cache().sql = "input_X_no_index_sql" mock_X.join(mock_y).sql = "input_X_y_sql" mock_X.join(mock_y).cache.return_value = mock_X.join(mock_y) mock_X.join(mock_y)._to_sql_query.return_value = ( @@ -209,3 +210,55 @@ def test_logistic_regression_score(mock_session, bqml_model, mock_X, mock_y): mock_session.read_gbq.assert_called_once_with( "SELECT * FROM ML.EVALUATE(MODEL `model_project`.`model_dataset`.`model_id`,\n (input_X_y_sql))" ) + + +def test_decomposition_mf_default_fit(bqml_model_factory, mock_session, mock_X): + model = decomposition.MatrixFactorization( + num_factors=34, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + model._bqml_model_factory = bqml_model_factory + model.fit(mock_X) + + mock_session._start_query_ml_ddl.assert_called_once_with( + "CREATE OR REPLACE MODEL `test-project`.`_anon123`.`temp_model_id`\nOPTIONS(\n model_type='matrix_factorization',\n feedback_type='explicit',\n user_col='user_id',\n item_col='item_col',\n rating_col='rating_col',\n l2_reg=9.83,\n num_factors=34)\nAS input_X_no_index_sql" + ) + + +def test_decomposition_mf_predict(mock_session, bqml_model, mock_X): + model = decomposition.MatrixFactorization( + num_factors=34, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + model._bqml_model = bqml_model + model.predict(mock_X) + + mock_session.read_gbq.assert_called_once_with( + "SELECT * FROM ML.RECOMMEND(MODEL `model_project`.`model_dataset`.`model_id`,\n (input_X_sql))", + index_col=["index_column_id"], + ) + + +def test_decomposition_mf_score(mock_session, bqml_model, mock_X): + model = decomposition.MatrixFactorization( + num_factors=34, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + model._bqml_model = bqml_model + model.score(mock_X) + + mock_session.read_gbq.assert_called_once_with( + "SELECT * FROM ML.EVALUATE(MODEL `model_project`.`model_dataset`.`model_id`)" + ) diff --git a/tests/unit/ml/test_matrix_factorization.py b/tests/unit/ml/test_matrix_factorization.py new file mode 100644 index 0000000000..92691ba9d4 --- /dev/null +++ b/tests/unit/ml/test_matrix_factorization.py @@ -0,0 +1,182 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from bigframes.ml import decomposition + + +def test_decomposition_mf_model(): + model = decomposition.MatrixFactorization( + num_factors=16, + feedback_type="implicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9, + ) + assert model.num_factors == 16 + assert model.feedback_type == "implicit" + assert model.user_col == "user_id" + assert model.item_col == "item_col" + assert model.rating_col == "rating_col" + + +def test_decomposition_mf_feedback_type_explicit(): + model = decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + assert model.feedback_type == "explicit" + + +def test_decomposition_mf_invalid_feedback_type_raises(): + feedback_type = "explimp" + with pytest.raises( + ValueError, + match="Expected feedback_type to be `explicit` or `implicit`.", + ): + decomposition.MatrixFactorization( + # Intentionally pass in the wrong type. This will fail if the user is using + # a type checker, but we can't assume that everyone is doing so, especially + # not in notebook environments. + num_factors=16, + feedback_type=feedback_type, # type: ignore + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + + +def test_decomposition_mf_num_factors_low(): + model = decomposition.MatrixFactorization( + num_factors=0, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + assert model.num_factors == 0 + + +def test_decomposition_mf_negative_num_factors_raises(): + num_factors = -2 + with pytest.raises( + ValueError, + match=f"Expected num_factors to be a positive integer, but got {num_factors}.", + ): + decomposition.MatrixFactorization( + num_factors=num_factors, # type: ignore + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + + +def test_decomposition_mf_invalid_num_factors_raises(): + num_factors = 0.5 + with pytest.raises( + TypeError, + match=f"Expected num_factors to be an int, but got {type(num_factors)}.", + ): + decomposition.MatrixFactorization( + num_factors=num_factors, # type: ignore + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + + +def test_decomposition_mf_invalid_user_col_raises(): + user_col = 123 + with pytest.raises( + TypeError, match=f"Expected user_col to be a str, but got {type(user_col)}." + ): + decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col=user_col, # type: ignore + item_col="item_col", + rating_col="rating_col", + l2_reg=9.83, + ) + + +def test_decomposition_mf_invalid_item_col_raises(): + item_col = 123 + with pytest.raises( + TypeError, match=f"Expected item_col to be STR, but got {type(item_col)}." + ): + decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col="user_id", + item_col=item_col, # type: ignore + rating_col="rating_col", + l2_reg=9.83, + ) + + +def test_decomposition_mf_invalid_rating_col_raises(): + rating_col = 4 + with pytest.raises( + TypeError, match=f"Expected rating_col to be a str, but got {type(rating_col)}." + ): + decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col=rating_col, # type: ignore + l2_reg=9.83, + ) + + +def test_decomposition_mf_l2_reg(): + model = decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=6.02, # type: ignore + ) + assert model.l2_reg == 6.02 + + +def test_decomposition_mf_invalid_l2_reg_raises(): + l2_reg = "6.02" + with pytest.raises( + TypeError, + match=f"Expected l2_reg to be a float or int, but got {type(l2_reg)}.", + ): + decomposition.MatrixFactorization( + num_factors=16, + feedback_type="explicit", + user_col="user_id", + item_col="item_col", + rating_col="rating_col", + l2_reg=l2_reg, # type: ignore + ) diff --git a/third_party/bigframes_vendored/sklearn/decomposition/_mf.py b/third_party/bigframes_vendored/sklearn/decomposition/_mf.py new file mode 100644 index 0000000000..fb29cc8984 --- /dev/null +++ b/third_party/bigframes_vendored/sklearn/decomposition/_mf.py @@ -0,0 +1,95 @@ +""" Matrix Factorization. +""" + +# Author: Alexandre Gramfort +# Olivier Grisel +# Mathieu Blondel +# Denis A. Engemann +# Michael Eickenberg +# Giorgio Patrini +# +# License: BSD 3 clause + +from abc import ABCMeta + +from bigframes_vendored.sklearn.base import BaseEstimator + +from bigframes import constants + + +class MatrixFactorization(BaseEstimator, metaclass=ABCMeta): + """Matrix Factorization (MF). + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> from bigframes.ml.decomposition import MatrixFactorization + >>> bpd.options.display.progress_bar = None + >>> X = bpd.DataFrame({ + ... "row": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6], + ... "column": [0,1] * 7, + ... "value": [1, 1, 2, 1, 3, 1.2, 4, 1, 5, 0.8, 6, 1, 2, 3], + ... }) + >>> model = MatrixFactorization(feedback_type='explicit', num_factors=6, user_col='row', item_col='column', rating_col='value', l2_reg=2.06) + >>> W = model.fit(X) + + Args: + feedback_type ('explicit' | 'implicit'): + Specifies the feedback type for the model. The feedback type determines the algorithm that is used during training. + num_factors (int or auto, default auto): + Specifies the number of latent factors to use. + user_col (str): + The user column name. + item_col (str): + The item column name. + l2_reg (float, default 1.0): + A floating point value for L2 regularization. The default value is 1.0. + """ + + def fit(self, X, y=None): + """Fit the model according to the given training data. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples, n_features). Training vector, + where `n_samples` is the number of samples and `n_features` is + the number of features. + + y (default None): + Ignored. + + Returns: + bigframes.ml.decomposition.MatrixFactorization: Fitted estimator. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def score(self, X=None, y=None): + """Calculate evaluation metrics of the model. + + .. note:: + + Output matches that of the BigQuery ML.EVALUATE function. + See: https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-evaluate#matrix_factorization_models + for the outputs relevant to this model type. + + Args: + X (default None): + Ignored. + + y (default None): + Ignored. + Returns: + bigframes.dataframe.DataFrame: DataFrame that represents model metrics. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def predict(self, X): + """Generate a predicted rating for every user-item row combination for a matrix factorization model. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or a DataFrame to predict. + + Returns: + bigframes.dataframe.DataFrame: Predicted DataFrames.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)