Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 114 additions & 71 deletions RecommenderSystems/dcn/dcn_train_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@ def str_list(x):
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument(
"--num_train_samples", type=int, default=36672493, help="the number of training samples"
"--num_train_samples", type=int, default=36672493, help="the number of training samples",
)
parser.add_argument(
"--num_valid_samples", type=int, default=4584062, help="the number of validation samples"
"--num_valid_samples", type=int, default=4584062, help="the number of validation samples",
)
parser.add_argument(
"--num_test_samples", type=int, default=4584062, help="the number of test samples"
"--num_test_samples", type=int, default=4584062, help="the number of test samples",
)

parser.add_argument("--shard_seed", type=int, default=2022)
parser.add_argument("--model_load_dir", type=str, default=None)
parser.add_argument("--model_save_dir", type=str, default=None)
parser.add_argument("--save_best_model", action="store_true", help="save best model or not")
parser.add_argument(
"--save_initial_model", action="store_true", help="save initial model parameters or not."
"--save_initial_model", action="store_true", help="save initial model parameters or not.",
)
parser.add_argument(
"--save_model_after_each_eval", action="store_true", help="save model after each eval."
"--save_model_after_each_eval", action="store_true", help="save model after each eval.",
)

parser.add_argument("--disable_fusedmlp", action="store_true", help="disable fused MLP or not")
parser.add_argument("--embedding_vec_size", type=int, default=16)
parser.add_argument("--batch_norm", type=bool, default=False)
parser.add_argument("--dnn_hidden_units", type=int_list, default="1000,1000,1000,1000,1000")
Expand Down Expand Up @@ -77,7 +79,7 @@ def str_list(x):
required=True,
)
parser.add_argument(
"--persistent_path", type=str, required=True, help="path for persistent kv store"
"--persistent_path", type=str, required=True, help="path for persistent kv store",
)
parser.add_argument("--store_type", type=str, default="cached_host_mem")
parser.add_argument("--cache_memory_budget_mb", type=int, default=8192)
Expand Down Expand Up @@ -126,7 +128,7 @@ def __init__(
self.shard_count = shard_count
self.cur_shard = cur_shard

fields = ["Label"]
fields = ["label"]
fields += [f"I{i+1}" for i in range(num_dense_fields)]
fields += [f"C{i+1}" for i in range(num_sparse_fields)]
self.fields = fields
Expand Down Expand Up @@ -262,53 +264,76 @@ def forward(self, ids):
return self.one_embedding.forward(ids)


class CrossInteractionLayer(nn.Module):
'''
Follow the same CrossInteractionLayer implementation of FuxiCTR
'''
def __init__(self, input_dim):
super(CrossInteractionLayer, self).__init__()
self.weight = nn.Linear(input_dim, 1, bias=False)
self.bias = nn.Parameter(flow.zeros(input_dim))

def forward(self, X_0, X_i):
interaction_out = self.weight(X_i) * X_0 + self.bias
return interaction_out


class CrossNet(nn.Module):
'''
Follow the same CrossNet implementation of FuxiCTR
'''
def __init__(self, input_dim, num_layers):
super(CrossNet, self).__init__()
self.num_layers = num_layers
self.cross_net = nn.ModuleList(
CrossInteractionLayer(input_dim) for _ in range(self.num_layers)
)
self.input_dim = input_dim
self.add_parameters()
self.reset_parameters()

def add_parameters(self) -> None:
for idx in range(self.num_layers):
self.register_parameter(
f"weight_{idx}", flow.nn.Parameter(flow.Tensor(1, self.input_dim,)),
)
self.register_parameter(
f"bias_{idx}", flow.nn.Parameter(flow.zeros(self.input_dim)),
)

def weight(self, i):
return getattr(self, f"weight_{i}")

def bias(self, i):
return getattr(self, f"bias_{i}")

def reset_parameters(self) -> None:
for i in range(self.num_layers):
flow.nn.init.kaiming_uniform_(self.weight(i), a=math.sqrt(5))

def forward(self, X_0):
X_i = X_0 # b x dim
for i in range(self.num_layers):
X_i = X_i + self.cross_net[i](X_0, X_i)
X_i = flow._C.fused_cross_feature_interaction(
X_i, self.weight(i), X_0, self.bias(i), "vector"
)
return X_i


class DNN(nn.Module):
def __init__(
self, input_dim, hidden_units=[], dropout_rates=0, batch_norm=False, use_bias=True,
self,
input_dim,
hidden_units=[],
dropout_rates=0,
use_fusedmlp=True,
batch_norm=False,
use_bias=True,
):
super(DNN, self).__init__()
dense_layers = []
hidden_units = [input_dim] + hidden_units
for idx in range(len(hidden_units) - 1):
dense_layers.append(nn.Linear(hidden_units[idx], hidden_units[idx + 1], bias=use_bias))
dense_layers.append(nn.ReLU())
if batch_norm:
dense_layers.append(nn.BatchNorm1d(hidden_units[idx + 1]))
if dropout_rates > 0:
dense_layers.append(nn.Dropout(p=dropout_rates))
self.dnn = nn.Sequential(*dense_layers) # * used to unpack list
if use_fusedmlp and not batch_norm:
hidden_dropout_rates_list = [dropout_rates] * (len(hidden_units) - 1)
self.dnn = nn.FusedMLP(
input_dim,
hidden_units[:-1],
hidden_units[-1],
hidden_dropout_rates_list,
dropout_rates,
False,
)
else:
hidden_units = [input_dim] + hidden_units
for idx in range(len(hidden_units) - 1):
dense_layers.append(
nn.Linear(hidden_units[idx], hidden_units[idx + 1], bias=use_bias)
)
dense_layers.append(nn.ReLU())
if batch_norm:
dense_layers.append(nn.BatchNorm1d(hidden_units[idx + 1]))
if dropout_rates > 0:
dense_layers.append(nn.Dropout(p=dropout_rates))
self.dnn = nn.Sequential(*dense_layers) # * used to unpack list

def forward(self, inputs):
return self.dnn(inputs)
Expand All @@ -324,6 +349,7 @@ def __init__(
cache_memory_budget_mb,
size_factor,
dnn_hidden_units=[128, 128],
use_fusedmlp=True,
crossing_layers=3,
net_dropout=0.2,
batch_norm=False,
Expand All @@ -347,6 +373,7 @@ def __init__(
input_dim=input_dim,
hidden_units=dnn_hidden_units,
dropout_rates=net_dropout,
use_fusedmlp=use_fusedmlp,
batch_norm=batch_norm,
use_bias=True,
)
Expand Down Expand Up @@ -374,7 +401,7 @@ def forward(self, X):
else:
final_out = cross_out
y_pred = self.fc(final_out)
return y_pred.sigmoid()
return y_pred

def reset_parameters(self):
def reset_param(m):
Expand All @@ -394,6 +421,7 @@ def make_dcn_module(args):
one_embedding_store_type=args.store_type,
cache_memory_budget_mb=args.cache_memory_budget_mb,
dnn_hidden_units=args.dnn_hidden_units,
use_fusedmlp=not args.disable_fusedmlp,
crossing_layers=args.crossing_layers,
net_dropout=args.net_dropout,
batch_norm=args.batch_norm,
Expand All @@ -411,7 +439,7 @@ def __init__(self, dcn_module, amp=False):

def build(self, features):
predicts = self.module(features.to("cuda"))
return predicts
return predicts.sigmoid()


class DCNTrainGraph(flow.nn.Graph):
Expand All @@ -432,19 +460,35 @@ def __init__(
def build(self, labels, features):

logits = self.module(features.to("cuda")).squeeze()
loss = self.loss(logits, labels.squeeze().to("cuda"))
reduce_loss = flow.mean(loss)
reduce_loss.backward()
# loss = self.loss(logits, labels.squeeze().to("cuda"))
# reduce_loss = flow.mean(loss)

reduce_loss = self.loss(logits, labels.squeeze().to("cuda"))
reduce_loss.backward()
return reduce_loss.to("cpu")


# def make_lr_scheduler(args, optimizer):
# batches_per_epoch = math.ceil(args.num_train_samples / args.train_batch_size)
# multistep_lr = flow.optim.lr_scheduler.MultiStepLR(
# optimizer, milestones=[3 * batches_per_epoch], gamma=args.lr_factor
# )
# return multistep_lr

def make_lr_scheduler(args, optimizer):
batches_per_epoch = math.ceil(args.num_train_samples / args.train_batch_size)
multistep_lr = flow.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=[3 * batches_per_epoch], gamma=args.lr_factor
warmup_lr = flow.optim.lr_scheduler.LinearLR(
optimizer, start_factor=0, total_iters=2750,
)
return multistep_lr
poly_decay_lr = flow.optim.lr_scheduler.PolynomialLR(
optimizer, decay_batch=40000, end_learning_rate=1e-6, power=2.0, cycle=False,
)
sequential_lr = flow.optim.lr_scheduler.SequentialLR(
optimizer=optimizer,
schedulers=[warmup_lr, poly_decay_lr],
milestones=[40000],
interval_rescaling=True,
)
return sequential_lr


def train(args):
Expand Down Expand Up @@ -506,7 +550,8 @@ def early_stop(

opt = flow.optim.Adam(dcn_module.parameters(), lr=args.learning_rate)
lr_scheduler = None
loss_func = flow.nn.BCELoss(reduction="none").to("cuda")
# loss_func = flow.nn.BCELoss(reduction="none").to("cuda")
loss_func = flow.nn.BCEWithLogitsLoss(reduction="mean").to("cuda")

if args.loss_scale_policy == "static":
grad_scaler = flow.amp.StaticGradScaler(1024)
Expand Down Expand Up @@ -550,37 +595,37 @@ def early_stop(
+ f"Latency {(latency * 1000):0.3f} ms, Throughput {throughput:0.1f}, {strtime}"
)

if step % batches_per_epoch == 0:
if step % 10000 == 0:
epoch += 1
val_auc, val_logloss = eval(
args,
eval_graph,
tag="val",
cur_step=step,
epoch=epoch,
cached_eval_batches=cached_valid_batches,
)
if args.save_model_after_each_eval:
save_model(f"step_{step}_val_auc_{val_auc:0.5f}")

monitor_value = get_metrics(logs={"auc": val_auc, "logloss": val_logloss})

stop_training, best_metric, stopping_steps, save_best = early_stop(
epoch,
monitor_value,
best_metric=best_metric,
stopping_steps=stopping_steps,
patience=args.patience,
min_delta=args.min_delta,
cached_eval_batches=None,
)
# if args.save_model_after_each_eval:
# save_model(f"step_{step}_val_auc_{val_auc:0.5f}")

# monitor_value = get_metrics(logs={"auc": val_auc, "logloss": val_logloss})

if args.save_best_model and save_best:
if rank == 0:
print(f"Save best model: monitor(max): {best_metric:.6f}")
save_model("best_checkpoint")
# stop_training, best_metric, stopping_steps, save_best = early_stop(
# epoch,
# monitor_value,
# best_metric=best_metric,
# stopping_steps=stopping_steps,
# patience=args.patience,
# min_delta=args.min_delta,
# )

if not args.disable_early_stop and stop_training:
break
# if args.save_best_model and save_best:
# if rank == 0:
# print(f"Save best model: monitor(max): {best_metric:.6f}")
# save_model("best_checkpoint")

# if not args.disable_early_stop and stop_training:
# break

dcn_module.train()
last_time = time.time()
Expand Down Expand Up @@ -678,5 +723,3 @@ def eval(args, eval_graph, tag="val", cur_step=0, epoch=0, cached_eval_batches=N
flow.boxing.nccl.enable_all_to_all(True)
args = get_args()
train(args)


53 changes: 40 additions & 13 deletions RecommenderSystems/dcn/train.sh
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
DEVICE_NUM_PER_NODE=1
DATA_DIR=your_path/criteo_parquet
PERSISTENT_PATH=your_path/persistent1
MODEL_SAVE_DIR=your_path/model_save_dir
#!/bin/bash
DEVICE_NUM_PER_NODE=4
DATA_DIR=/RAID0/xiexuan/criteo1t_parquet_40M_long
PERSISTENT_PATH=/home/zhengzekang/models/RecommenderSystems/dlrm/init_model


rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/0-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/1-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/2-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/3-4/*

export ONEFLOW_ONE_EMBEDDING_FUSED_MLP_ASYNC_GRAD=1

export ONEFLOW_ONE_EMBEDDING_FUSE_EMBEDDING_INTERACTION=1
export ONEFLOW_ONE_EMBEDDING_GRADIENT_SHUFFLE_USE_FP16=1
export ONEFLOW_FUSE_MODEL_UPDATE_CAST=1
export ONEFLOW_ENABLE_MULTI_TENSOR_MODEL_UPDATE=1
export ONEFLOW_KERNEL_ENABLE_CUDA_GRAPH=1
export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=1
export ONEFLOW_ONE_EMBEDDING_USE_SYSTEM_GATHER=0
export ONEFLOW_EP_CUDA_DEVICE_SCHEDULE=2
export ONEFLOW_EP_CUDA_STREAM_NON_BLOCKING=1
export ONEFLOW_ONE_EMBEDDING_ADD_ID_SHUFFLE_COPY_OUT=1
export ONEFLOW_ONE_EMBEDDING_FUSE_EMBEDDING_INTERACTION=1


export ONEFLOW_PROFILER_KERNEL_PROFILE_KERNEL_FORWARD_RANGE=1

# /usr/local/cuda-11.4/nsight-systems-2021.2.4/target-linux-x64/nsys profile --stat=true --force-overwrite true \
# --output="oneflow_dcn_1n4d_55296_fp32" \
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \
--node_rank 0 \
--master_addr 127.0.0.1 \
dcn_train_eval.py \
--data_dir $DATA_DIR \
--model_save_dir $MODEL_SAVE_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 2048 \
--table_size_array "62866,8001,2901,74623,7530,3391,1400,21705,7937,21,276,1235896,9659,39884301,39040,17291,7421,20263,3,7121,1543,63,38532372,2953790,403302,10,2209,11938,155,4,976,14,39979538,25638302,39665755,585840,12973,108,36" \
--store_type 'device_mem' \
--train_batch_size 55296 \
--train_batches 75000 \
--loss_print_interval 1000 \
--dnn_hidden_units "1000, 1000, 1000, 1000, 1000" \
--crossing_layers 4 \
--embedding_vec_size 16




--learning_rate 0.0025 \
--embedding_vec_size 16 \
--num_train_samples 4195197692 \
--num_valid_samples 89137318 \
--num_test_samples 89137319 \
--net_dropout 0.05