Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions RecommenderSystems/xdeepfm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# xDeepFM

[xDeepFM](https://arxiv.org/abs/1803.05170) is based on the embedding&mlp, which is an upgraded version of DCN. It solves the main problem of DCN: the unaware of feature field. Its model structure is as follows. Based on this structure, this project uses OneFlow distributed deep learning framework to realize training the model in graph mode on the Criteo data set.

<p align='center'>
<img width="539" alt="Screen Shot 2022-04-01 at 4 45 22 PM" src="https://user-images.githubusercontent.com/63446546/172111504-d2db0fb3-a85b-4ee0-a644-3ccb0f87cbfb.png">
</p>


## Directory description

```txt
.
├── xdeepfm_train_eval.py # OneFlow xDeepFM train/val/test scripts with OneEmbedding module
├── README.md # Documentation
├── tools
│ ├── xdeepfm_parquet.scala # Read Criteo Kaggle data and export it as parquet data format
│ └── launch_spark.sh # Spark launching shell script
│ └── split_criteo_kaggle.py # Split criteo kaggle dataset to train\val\test set
├── train_xdeepfm.sh # xDeepFM training shell script
```

## Arguments description

We use exactly the same default values as [the xDeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/xDeepFM/xDeepFM_criteo_x4_001) in FuxiCTR.

| Argument Name | Argument Explanation | Default Value |
| -------------------------- | ------------------------------------------------------------ | ------------------------ |
| data_dir | the data file directory | *Required Argument* |
| num_train_samples | the number of train samples | *Required Argument* |
| num_val_samples | the number of validation samples | *Required Argument* |
| num_test_samples | the number of test samples | *Required Argument* |
| model_load_dir | model loading directory | None |
| model_save_dir | model saving directory | None |
| save_best_model | save best model or not | False |
| save_initial_model | save initial model parameters or not | False |
| save_model_after_each_eval | save model after each eval or not | False |
| embedding_vec_size | embedding vector size | 16 |
| dnn_hidden_units | dnn hidden units number | 1000,1000,1000,1000,1000 |
| cin_layer_units | cin hidden units number | 16,16,16 |
| net_dropout | number of minibatch training interations | 0.2 |
| embedding_vec_size | embedding vector size | 16 |
| learning_rate | initial learning rate | 0.001 |
| batch_size | training/evaluation batch size | 10000 |
| train_batches | the maximum number of training batches | 75000 |
| loss_print_interval | interval of printing loss | 100 |
| patience | Number of epochs with no improvement after which learning rate will be reduced | 2 |
| min_delta | threshold for measuring the new optimum, to only focus on significant changes | 1.0e-6 |
| table_size_array | embedding table size array for sparse fields | *Required Argument* |
| persistent_path | path for persistent kv store of embedding | *Required Argument* |
| store_type | OneEmbeddig persistent kv store type: `device_mem`, `cached_host_mem` or `cached_ssd` | `cached_host_mem` |
| cache_memory_budget_mb | size of cache memory budget on each device in megabytes when `store_type` is `cached_host_mem` or `cached_ssd` | 1024 |
| amp | enable Automatic Mixed Precision(AMP) training or not | False |
| loss_scale_policy | loss scale policy for AMP training: `static` or `dynamic` | `static` |
| disable_early_stop | disable early stop or not | False |

#### Early Stop Schema

The model is evaluated at the end of every epoch. At the end of each epoch, if the early stopping criterion is met, the training process will be stopped.

The monitor used for the early stop is `val_auc - val_log_loss`. The mode of the early stop is `max`. You could tune `patience` and `min_delta` as needed.

If you want to disable early stopping, simply add `--disable_early_stop` in the [train_xdeepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_xdeepfm_pr/RecommenderSystems/xdeepfm/train_xdeepfm.sh).

## Getting Started

A hands-on guide to train a xDeepFM model.

### Environment

1. Install OneFlow by following the steps in [OneFlow Installation Guide](https://github.com/Oneflow-Inc/oneflow#install-oneflow) or use the command line below.

```shell
python3 -m pip install --pre oneflow -f https://staging.oneflow.info/branch/master/cu102
```

2. Install all other dependencies listed below.

```json
psutil
petastorm
```

### Dataset


1. Download the [Criteo Kaggle dataset](https://www.kaggle.com/c/criteo-display-ad-challenge) and then split it using [split_criteo_kaggle.py](https://github.com/Oneflow-Inc/models/blob/dev_xdeepfm_pr/RecommenderSystems/xdeepfm/tools/split_criteo_kaggle.py).

Note: Same as [the xDeepFM_Criteo_x4_001 experiment](https://github.com/openbenchmark/BARS/tree/master/ctr_prediction/benchmarks/xDeepFM/xDeepFM_criteo_x4_001) in FuxiCTR, only train.txt is used. Also, the dataset is randomly spllitted into 8:1:1 as training set, validation set and test set. The dataset is splitted using StratifiedKFold in sklearn.

```shell
python3 split_criteo_kaggle.py --input_dir=/path/to/your/criteo_kaggle --output_dir=/path/to/your/output/dir
```

2. Download spark from https://spark.apache.org/downloads.html and then uncompress the tar file into the directory where you want to install Spark. Ensure the `SPARK_HOME` environment variable points to the directory where the spark is.

3. launch a spark shell using [launch_spark.sh](https://github.com/Oneflow-Inc/models/blob/dev_xdeepfm_pr/RecommenderSystems/xdeepfm/tools/launch_spark.sh).

- Modify the SPARK_LOCAL_DIRS as needed

```shell
export SPARK_LOCAL_DIRS=/path/to/your/spark/
```

- Run `bash launch_spark.sh`

4. load [xdeepfm_parquet.scala](https://github.com/Oneflow-Inc/models/blob/dev_xdeepfm_pr/RecommenderSystems/xdeepfm/tools/xdeepfm_parquet.scala) to your spark shell by `:load deepfm_parquet.scala`.

5. call the `makexDeepfmDataset(srcDir: String, dstDir:String)` function to generate the dataset.

```shell
makexDeepfmDataset("/path/to/your/src_dir", "/path/to/your/dst_dir")
```

After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training.

```txt
train samples = 36672493
validation samples = 4584062
test samples = 4584062
table size array:
649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376
1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572
```

### Start Training by Oneflow

1. Modify the [train_xdeepfm.sh](https://github.com/Oneflow-Inc/models/blob/dev_xdeepfm_pr/RecommenderSystems/xdeepfm/train_xdeepfm.sh) as needed.

```shell
#!/bin/bash
DEVICE_NUM_PER_NODE=1
DATA_DIR=/path/to/xdeepfm_parquet
PERSISTENT_PATH=/path/to/persistent
MODEL_SAVE_DIR=/path/to/model/save/dir

python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \
--node_rank 0 \
--master_addr 127.0.0.1 \
xdeepfm_train_eval.py \
--data_dir $DATA_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 1024 \
--batch_size 10000 \
--train_batches 75000 \
--loss_print_interval 100 \
--dnn "1000,1000,1000,1000,1000" \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 36672493 \
--num_val_samples 4584062 \
--num_test_samples 4584062 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model
```

2. train a xDeepFM model by `bash train_xdeepfm.sh`.
5 changes: 5 additions & 0 deletions RecommenderSystems/xdeepfm/tools/launch_spark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export SPARK_LOCAL_DIRS=/tmp/tmp_spark
spark-shell \
--master "local[*]" \
--conf spark.driver.maxResultSize=0 \
--driver-memory 360G
55 changes: 55 additions & 0 deletions RecommenderSystems/xdeepfm/tools/split_criteo_kaggle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import numpy as np
import pandas as pd
import argparse
from sklearn.model_selection import StratifiedKFold

RANDOM_SEED = 2018 # Fix seed for reproduction


def split_train_val_test(input_dir, output_dir):
num_dense_fields = 13
num_sparse_fields = 26

fields = ["Label"]
fields += [f"I{i+1}" for i in range(num_dense_fields)]
fields += [f"C{i+1}" for i in range(num_sparse_fields)]

ddf = pd.read_csv(
f"{input_dir}/train.txt",
sep="\t",
header=None,
names=fields,
encoding="utf-8",
dtype=object,
)
X = ddf.values
y = ddf["Label"].map(lambda x: float(x)).values
print(f"{len(X)} samples in total")

folds = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_SEED)

fold_indexes = [valid_idx for _, valid_idx in folds.split(X, y)]
test_index = fold_indexes[0]
valid_index = fold_indexes[1]
train_index = np.concatenate(fold_indexes[2:])

ddf.loc[test_index, :].to_csv(f"{output_dir}/test.csv", index=False, encoding="utf-8")
ddf.loc[valid_index, :].to_csv(f"{output_dir}/valid.csv", index=False, encoding="utf-8")
ddf.loc[train_index, :].to_csv(f"{output_dir}/train.csv", index=False, encoding="utf-8")

print("Train lines:", len(train_index))
print("Validation lines:", len(valid_index))
print("Test lines:", len(test_index))
print("Postive ratio:", np.sum(y) / len(y))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_dir", type=str, required=True, help="Path to downloaded criteo kaggle dataset",
)
parser.add_argument(
"--output_dir", type=str, required=True, help="Path to splitted criteo kaggle dataset",
)
args = parser.parse_args()
split_train_val_test(args.input_dir, args.output_dir)
35 changes: 35 additions & 0 deletions RecommenderSystems/xdeepfm/tools/xdeepfm_parquet.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import org.apache.spark.sql.functions.udf

def makexDeepfmDataset(srcDir: String, dstDir:String) = {
val train_csv = s"${srcDir}/train.csv"
val test_csv = s"${srcDir}/test.csv"
val val_csv = s"${srcDir}/valid.csv"

val make_label = udf((str:String) => str.toFloat)
val label_cols = Seq(make_label($"Label").as("Label"))

val dense_cols = 1.to(13).map{i=>xxhash64(lit(i), col(s"I$i")).as(s"I${i}")}

var sparse_cols = 1.to(26).map{i=>xxhash64(lit(i), col(s"C$i")).as(s"C${i}")}

val cols = label_cols ++ dense_cols ++ sparse_cols

spark.read.option("header","true").csv(test_csv).select(cols:_*).repartition(32).write.parquet(s"${dstDir}/test")
spark.read.option("header","true").csv(val_csv).select(cols:_*).repartition(32).write.parquet(s"${dstDir}/val")

spark.read.option("header","true").csv(train_csv).select(cols:_*).orderBy(rand()).repartition(256).write.parquet(s"${dstDir}/train")

// print the number of samples
val train_samples = spark.read.parquet(s"${dstDir}/train").count()
println(s"train samples = $train_samples")
val val_samples = spark.read.parquet(s"${dstDir}/val").count()
println(s"validation samples = $val_samples")
val test_samples = spark.read.parquet(s"${dstDir}/test").count()
println(s"test samples = $test_samples")

// print table size array
val df = spark.read.parquet(s"${dstDir}/train", s"${dstDir}/val", s"${dstDir}/test")
println("table size array: ")
println(1.to(13).map{i=>df.select(s"I$i").as[Long].distinct.count}.mkString(","))
println(1.to(26).map{i=>df.select(s"C$i").as[Long].distinct.count}.mkString(","))
}
33 changes: 33 additions & 0 deletions RecommenderSystems/xdeepfm/train_xdeepfm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash
DEVICE_NUM_PER_NODE=1
NUM_NODES=1
NODE_RANK=0
MASTER_ADDR=127.0.0.1

DATA_DIR=/path/to/xdeepfm_parquet
PERSISTENT_PATH=/path/to/persistent
MODEL_SAVE_DIR=/path/to/model/save/dir

python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
xdeepfm_train_eval.py \
--data_dir $DATA_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 1024 \
--batch_size 10000 \
--train_batches 80000 \
--loss_print_interval 100 \
--dnn_hidden_units "1000,1000,1000,1000,1000" \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 36672493 \
--num_val_samples 4584062 \
--num_test_samples 4584062 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model
Loading