in benchmarks/dlrm/ootb/dlrm_s_pytorch.py [0:0]
def run():
### parse arguments ###
parser = argparse.ArgumentParser(
description="Train Deep Learning Recommendation Model (DLRM)"
)
# model related parameters
parser.add_argument("--arch-sparse-feature-size", type=int, default=2)
parser.add_argument(
"--arch-embedding-size", type=dash_separated_ints, default="4-3-2"
)
# j will be replaced with the table number
parser.add_argument("--arch-mlp-bot", type=dash_separated_ints, default="4-3-2")
parser.add_argument("--arch-mlp-top", type=dash_separated_ints, default="4-2-1")
parser.add_argument(
"--arch-interaction-op", type=str, choices=["dot", "cat"], default="dot"
)
parser.add_argument("--arch-interaction-itself", action="store_true", default=False)
parser.add_argument(
"--weighted-pooling", type=str, choices=["fixed", "learned", None], default=None
)
# embedding table options
parser.add_argument("--md-flag", action="store_true", default=False)
parser.add_argument("--md-threshold", type=int, default=200)
parser.add_argument("--md-temperature", type=float, default=0.3)
parser.add_argument("--md-round-dims", action="store_true", default=False)
parser.add_argument("--qr-flag", action="store_true", default=False)
parser.add_argument("--qr-threshold", type=int, default=200)
parser.add_argument("--qr-operation", type=str, default="mult")
parser.add_argument("--qr-collisions", type=int, default=4)
# activations and loss
parser.add_argument("--activation-function", type=str, default="relu")
parser.add_argument("--loss-function", type=str, default="mse") # or bce or wbce
parser.add_argument(
"--loss-weights", type=dash_separated_floats, default="1.0-1.0"
) # for wbce
parser.add_argument("--loss-threshold", type=float, default=0.0) # 1.0e-7
parser.add_argument("--round-targets", type=bool, default=False)
# data
parser.add_argument("--data-size", type=int, default=1)
parser.add_argument("--num-batches", type=int, default=0)
parser.add_argument(
"--data-generation", type=str, default="random"
) # synthetic or dataset
parser.add_argument(
"--rand-data-dist", type=str, default="uniform"
) # uniform or gaussian
parser.add_argument("--rand-data-min", type=float, default=0)
parser.add_argument("--rand-data-max", type=float, default=1)
parser.add_argument("--rand-data-mu", type=float, default=-1)
parser.add_argument("--rand-data-sigma", type=float, default=1)
parser.add_argument("--data-trace-file", type=str, default="./input/dist_emb_j.log")
parser.add_argument("--data-set", type=str, default="kaggle") # or terabyte
parser.add_argument("--raw-data-file", type=str, default="")
parser.add_argument("--processed-data-file", type=str, default="")
parser.add_argument("--data-randomize", type=str, default="total") # or day or none
parser.add_argument("--data-trace-enable-padding", type=bool, default=False)
parser.add_argument("--max-ind-range", type=int, default=-1)
parser.add_argument("--data-sub-sample-rate", type=float, default=0.0) # in [0, 1]
parser.add_argument("--num-indices-per-lookup", type=int, default=10)
parser.add_argument("--num-indices-per-lookup-fixed", type=bool, default=False)
parser.add_argument("--num-workers", type=int, default=0)
parser.add_argument("--memory-map", action="store_true", default=False)
# training
parser.add_argument("--mini-batch-size", type=int, default=1)
parser.add_argument("--nepochs", type=int, default=1)
parser.add_argument("--learning-rate", type=float, default=0.01)
parser.add_argument("--print-precision", type=int, default=5)
parser.add_argument("--numpy-rand-seed", type=int, default=123)
parser.add_argument("--sync-dense-params", type=bool, default=True)
parser.add_argument("--optimizer", type=str, default="sgd")
parser.add_argument(
"--dataset-multiprocessing",
action="store_true",
default=False,
help="The Kaggle dataset can be multiprocessed in an environment \
with more than 7 CPU cores and more than 20 GB of memory. \n \
The Terabyte dataset can be multiprocessed in an environment \
with more than 24 CPU cores and at least 1 TB of memory.",
)
# inference
parser.add_argument("--inference-only", action="store_true", default=False)
# quantize
parser.add_argument("--quantize-mlp-with-bit", type=int, default=32)
parser.add_argument("--quantize-emb-with-bit", type=int, default=32)
# onnx
parser.add_argument("--save-onnx", action="store_true", default=False)
# gpu
parser.add_argument("--use-gpu", action="store_true", default=False)
parser.add_argument("--use-fbgemm-gpu", action="store_true", default=False)
parser.add_argument(
"--fbgemm-gpu-codegen-pref",
type=str,
choices=["Split", "IntN"],
default="Split",
)
# torch2trt
parser.add_argument("--use-torch2trt-for-mlp", action="store_true", default=False)
# distributed
parser.add_argument("--local_rank", type=int, default=-1)
parser.add_argument("--dist-backend", type=str, default="")
# debugging and profiling
parser.add_argument("--print-freq", type=int, default=1)
parser.add_argument("--test-freq", type=int, default=-1)
parser.add_argument("--test-mini-batch-size", type=int, default=-1)
parser.add_argument("--test-num-workers", type=int, default=-1)
parser.add_argument("--print-time", action="store_true", default=False)
parser.add_argument("--print-wall-time", action="store_true", default=False)
parser.add_argument("--print-accumulated-time", action="store_true", default=False)
parser.add_argument("--debug-mode", action="store_true", default=False)
parser.add_argument("--enable-profiling", action="store_true", default=False)
parser.add_argument("--plot-compute-graph", action="store_true", default=False)
parser.add_argument("--tensor-board-filename", type=str, default="run_kaggle_pt")
# store/load model
parser.add_argument("--save-model", type=str, default="")
parser.add_argument("--load-model", type=str, default="")
# mlperf logging (disables other output and stops early)
parser.add_argument("--mlperf-logging", action="store_true", default=False)
# stop at target accuracy Kaggle 0.789, Terabyte (sub-sampled=0.875) 0.8107
parser.add_argument("--mlperf-acc-threshold", type=float, default=0.0)
# stop at target AUC Terabyte (no subsampling) 0.8025
parser.add_argument("--mlperf-auc-threshold", type=float, default=0.0)
parser.add_argument("--mlperf-bin-loader", action="store_true", default=False)
parser.add_argument("--mlperf-bin-shuffle", action="store_true", default=False)
# mlperf gradient accumulation iterations
parser.add_argument("--mlperf-grad-accum-iter", type=int, default=1)
# LR policy
parser.add_argument("--lr-num-warmup-steps", type=int, default=0)
parser.add_argument("--lr-decay-start-step", type=int, default=0)
parser.add_argument("--lr-num-decay-steps", type=int, default=0)
parser.add_argument("--precache-ml-data", type=int, nargs='?', default=None, const=sys.maxsize)
parser.add_argument("--warmup-steps", type=int, default=0)
# FB5 Logging
parser.add_argument("--fb5logger", type=str, default=None)
parser.add_argument("--fb5config", type=str, default="tiny")
global args
global nbatches
global nbatches_test
global writer
args = parser.parse_args()
if args.dataset_multiprocessing:
assert float(sys.version[:3]) > 3.7, (
"The dataset_multiprocessing "
+ "flag is susceptible to a bug in Python 3.7 and under. "
+ "https://github.com/facebookresearch/dlrm/issues/172"
)
if args.mlperf_logging:
mlperf_logger.log_event(key=mlperf_logger.constants.CACHE_CLEAR, value=True)
mlperf_logger.log_start(
key=mlperf_logger.constants.INIT_START, log_all_ranks=True
)
if args.weighted_pooling is not None:
if args.qr_flag:
sys.exit("ERROR: quotient remainder with weighted pooling is not supported")
if args.md_flag:
sys.exit("ERROR: mixed dimensions with weighted pooling is not supported")
if args.quantize_emb_with_bit in [4, 8]:
if args.qr_flag:
sys.exit(
"ERROR: 4 and 8-bit quantization with quotient remainder is not supported"
)
if args.md_flag:
sys.exit(
"ERROR: 4 and 8-bit quantization with mixed dimensions is not supported"
)
if args.quantize_emb_with_bit in [4, 8, 16] and (
not fbgemm_gpu or not args.use_fbgemm_gpu
):
extra_info = ""
if not fbgemm_gpu:
extra_info += "\nfbgemm_gpu module failed to import.\n\n" + fbgemm_gpu_import_error_msg
if not args.use_fbgemm_gpu:
extra_info += "--use-fbgemm-gpu not set. "
if not args.inference_only:
sys.exit(
"ERROR: Training quantized embeddings requires fbgemm_gpu. "
+ extra_info
)
elif args.use_gpu:
sys.exit(
"ERROR: Quantized embeddings on GPU requires fbgemm_gpu. " + extra_info
)
elif args.quantize_emb_with_bit == 16:
sys.exit(
"ERROR: 16-bit quantized embeddings requires fbgemm_gpu. " + extra_info
)
assert args.quantize_emb_with_bit in [
4,
8,
16,
32,
], "only support 4/8/16/32-bit but got {}".format(args.quantize_emb_with_bit)
if args.use_gpu:
assert torch.cuda.is_available(), "No cuda device is available."
if args.use_fbgemm_gpu:
assert fbgemm_gpu, ("\nfbgemm_gpu module failed to import.\n\n" + fbgemm_gpu_import_error_msg)
use_gpu = args.use_gpu
use_fbgemm_gpu = args.use_fbgemm_gpu
### some basic setup ###
np.random.seed(args.numpy_rand_seed)
np.set_printoptions(precision=args.print_precision)
torch.set_printoptions(precision=args.print_precision)
torch.manual_seed(args.numpy_rand_seed)
if args.test_mini_batch_size < 0:
# if the parameter is not set, use the training batch size
args.test_mini_batch_size = args.mini_batch_size
if args.test_num_workers < 0:
# if the parameter is not set, use the same parameter for training
args.test_num_workers = args.num_workers
if not args.debug_mode:
ext_dist.init_distributed(
local_rank=args.local_rank, use_gpu=use_gpu, backend=args.dist_backend
)
if use_gpu:
torch.cuda.manual_seed_all(args.numpy_rand_seed)
torch.backends.cudnn.deterministic = True
if ext_dist.my_size > 1:
ngpus = 1
device = torch.device("cuda", ext_dist.my_local_rank)
else:
ngpus = torch.cuda.device_count()
device = torch.device("cuda", 0)
print("Using {} GPU(s)...".format(ngpus))
else:
device = torch.device("cpu")
print("Using CPU...")
### prepare training data ###
ln_bot = np.fromstring(args.arch_mlp_bot, dtype=int, sep="-")
# input data
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_end(key=mlperf_logger.constants.INIT_STOP)
mlperf_logger.barrier()
mlperf_logger.log_start(key=mlperf_logger.constants.RUN_START)
mlperf_logger.barrier()
if args.data_generation == "dataset":
train_data, train_ld, test_data, test_ld = dp.make_criteo_data_and_loaders(args)
table_feature_map = {idx: idx for idx in range(len(train_data.counts))}
nbatches = args.num_batches if args.num_batches > 0 else len(train_ld)
nbatches_test = len(test_ld)
ln_emb = train_data.counts
# enforce maximum limit on number of vectors per embedding
if args.max_ind_range > 0:
ln_emb = np.array(
list(
map(
lambda x: x if x < args.max_ind_range else args.max_ind_range,
ln_emb,
)
)
)
else:
ln_emb = np.array(ln_emb)
m_den = train_data.m_den
ln_bot[0] = m_den
else:
# input and target at random
ln_emb = np.fromstring(args.arch_embedding_size, dtype=int, sep="-")
m_den = ln_bot[0]
train_data, train_ld, test_data, test_ld = dp.make_random_data_and_loader(
args, ln_emb, m_den, cache_size=args.precache_ml_data
)
nbatches = args.num_batches if args.num_batches > 0 else len(train_ld)
nbatches_test = len(test_ld)
assert args.num_batches > args.warmup_steps, (f"Change --warmup-steps={args.warmup_steps} to be lower than --num-batches={args.num_batches}.")
args.ln_emb = ln_emb.tolist()
if args.mlperf_logging:
print("command line args: ", json.dumps(vars(args)))
### parse command line arguments ###
m_spa = args.arch_sparse_feature_size
ln_emb = np.asarray(ln_emb)
num_fea = ln_emb.size + 1 # num sparse + num dense features
if args.use_fbgemm_gpu:
assert m_spa % 4 == 0, (
f"{m_spa} % 4 is not 0, but fbgemm_gpu requires the embedding dim "
+ "(--arch-sparse-feature-size number) to be evenly divisible by 4."
)
m_den_out = ln_bot[ln_bot.size - 1]
if args.arch_interaction_op == "dot":
# approach 1: all
# num_int = num_fea * num_fea + m_den_out
# approach 2: unique
if args.arch_interaction_itself:
num_int = (num_fea * (num_fea + 1)) // 2 + m_den_out
else:
num_int = (num_fea * (num_fea - 1)) // 2 + m_den_out
elif args.arch_interaction_op == "cat":
num_int = num_fea * m_den_out
else:
sys.exit(
"ERROR: --arch-interaction-op="
+ args.arch_interaction_op
+ " is not supported"
)
arch_mlp_top_adjusted = str(num_int) + "-" + args.arch_mlp_top
ln_top = np.fromstring(arch_mlp_top_adjusted, dtype=int, sep="-")
# sanity check: feature sizes and mlp dimensions must match
if m_den != ln_bot[0]:
sys.exit(
"ERROR: arch-dense-feature-size "
+ str(m_den)
+ " does not match first dim of bottom mlp "
+ str(ln_bot[0])
)
if args.qr_flag:
if args.qr_operation == "concat" and 2 * m_spa != m_den_out:
sys.exit(
"ERROR: 2 arch-sparse-feature-size "
+ str(2 * m_spa)
+ " does not match last dim of bottom mlp "
+ str(m_den_out)
+ " (note that the last dim of bottom mlp must be 2x the embedding dim)"
)
if args.qr_operation != "concat" and m_spa != m_den_out:
sys.exit(
"ERROR: arch-sparse-feature-size "
+ str(m_spa)
+ " does not match last dim of bottom mlp "
+ str(m_den_out)
)
else:
if m_spa != m_den_out:
sys.exit(
"ERROR: arch-sparse-feature-size "
+ str(m_spa)
+ " does not match last dim of bottom mlp "
+ str(m_den_out)
)
if num_int != ln_top[0]:
sys.exit(
"ERROR: # of feature interactions "
+ str(num_int)
+ " does not match first dimension of top mlp "
+ str(ln_top[0])
)
# assign mixed dimensions if applicable
if args.md_flag:
m_spa = md_solver(
torch.tensor(ln_emb),
args.md_temperature, # alpha
d0=m_spa,
round_dim=args.md_round_dims,
).tolist()
if use_fbgemm_gpu:
for m in m_spa:
assert m % 4 == 0, (
"Found an incompatible embedding dim in m_spa. "
+ f"{m} % 4 is not 0, but fbgemm_gpu requires the "
+ "embedding dim to be evenly divisible by 4."
)
# test prints (model arch)
if args.debug_mode:
print("model arch:")
print(
"mlp top arch "
+ str(ln_top.size - 1)
+ " layers, with input to output dimensions:"
)
print(ln_top)
print("# of interactions")
print(num_int)
print(
"mlp bot arch "
+ str(ln_bot.size - 1)
+ " layers, with input to output dimensions:"
)
print(ln_bot)
print("# of features (sparse and dense)")
print(num_fea)
print("dense feature size")
print(m_den)
print("sparse feature size")
print(m_spa)
print(
"# of embeddings (= # of sparse features) "
+ str(ln_emb.size)
+ ", with dimensions "
+ str(m_spa)
+ "x:"
)
print(ln_emb)
print("data (inputs and targets):")
for j, inputBatch in enumerate(train_ld):
X, lS_o, lS_i, T, W, CBPP = unpack_batch(inputBatch)
torch.set_printoptions(precision=4)
# early exit if nbatches was set by the user and has been exceeded
if nbatches > 0 and j >= nbatches:
break
print("mini-batch: %d" % j)
print(X.detach().cpu())
# transform offsets to lengths when printing
print(
torch.IntTensor(
[
np.diff(
S_o.detach().cpu().tolist() + list(lS_i[i].shape)
).tolist()
for i, S_o in enumerate(lS_o)
]
)
)
print([S_i.detach().cpu() for S_i in lS_i])
print(T.detach().cpu())
global ndevices
ndevices = min(ngpus, args.mini_batch_size, num_fea - 1) if use_gpu else -1
### construct the neural network specified above ###
# WARNING: to obtain exactly the same initialization for
# the weights we need to start from the same random seed.
# np.random.seed(args.numpy_rand_seed)
global dlrm
dlrm = DLRM_Net(
m_spa,
ln_emb,
ln_bot,
ln_top,
arch_interaction_op=args.arch_interaction_op,
arch_interaction_itself=args.arch_interaction_itself,
sigmoid_bot=-1,
sigmoid_top=ln_top.size - 2,
sync_dense_params=args.sync_dense_params,
loss_threshold=args.loss_threshold,
ndevices=ndevices,
qr_flag=args.qr_flag,
qr_operation=args.qr_operation,
qr_collisions=args.qr_collisions,
qr_threshold=args.qr_threshold,
md_flag=args.md_flag,
md_threshold=args.md_threshold,
weighted_pooling=args.weighted_pooling,
loss_function=args.loss_function,
learning_rate=args.learning_rate,
use_gpu=use_gpu,
use_fbgemm_gpu=use_fbgemm_gpu,
fbgemm_gpu_codegen_pref=args.fbgemm_gpu_codegen_pref,
inference_only=args.inference_only,
quantize_mlp_with_bit=args.quantize_mlp_with_bit,
quantize_emb_with_bit=args.quantize_emb_with_bit,
use_torch2trt_for_mlp=args.use_torch2trt_for_mlp,
)
# test prints
if args.debug_mode:
print("initial parameters (weights and bias):")
dlrm.print_weights()
# In dlrm.quantize_embedding called below, the torch quantize calls run
# on cpu tensors only. They cannot quantize tensors stored on the gpu.
# So quantization occurs on cpu tensors before transferring them to gpu if
# use_gpu is enabled.
if args.quantize_emb_with_bit != 32:
dlrm.quantize_embedding(args.quantize_emb_with_bit)
if not args.inference_only:
assert args.quantize_mlp_with_bit == 32, (
"Dynamic quantization for mlp requires "
+ "--inference-only because training is not supported"
)
else:
# Currently only INT8 and FP16 quantized types are supported for quantized MLP inference.
# By default we don't do the quantization: quantize_{mlp,emb}_with_bit == 32 (FP32)
assert args.quantize_mlp_with_bit in [
8,
16,
32,
], "only support 8/16/32-bit but got {}".format(args.quantize_mlp_with_bit)
if not args.use_torch2trt_for_mlp:
if args.quantize_mlp_with_bit == 16 and use_gpu:
dlrm.top_l = dlrm.top_l.half()
dlrm.bot_l = dlrm.bot_l.half()
elif args.quantize_mlp_with_bit in [8, 16]:
assert not use_gpu, (
"Cannot run PyTorch's built-in dynamic quantization for mlp "
+ "with --use-gpu enabled, because DynamicQuantizedLinear's "
+ "forward function calls 'quantized::linear_dynamic', which does not "
+ "support the 'CUDA' backend. To convert to and run quantized mlp layers "
+ "on the gpu, install torch2trt and enable --use-torch2trt-for-mlp. "
+ "Alternatively, disable --use-gpu to use PyTorch's built-in "
+ "cpu quantization ops for the mlp layers. "
)
if args.quantize_mlp_with_bit == 8:
quantize_dtype = torch.qint8
else:
quantize_dtype = torch.float16
dlrm.top_l = torch.quantization.quantize_dynamic(
dlrm.top_l, {torch.nn.Linear}, quantize_dtype
)
dlrm.bot_l = torch.quantization.quantize_dynamic(
dlrm.bot_l, {torch.nn.Linear}, quantize_dtype
)
# Prep work for embedding tables and model transfer:
# Handling single-cpu and single-gpu modes
# NOTE: This also handles dist-backend modes (CLI args --dist-backend=nccl,
# --dist-backend=ccl, and --dist-backend=mpi) because in these modes each
# process runs in single-gpu mode. For example, if 8 processes are launched
# running dlrm_s_pytorch.py with --dist-backend=nccl --use-gpu, each process
# will run in single-gpu mode, resulting in 8 gpus total running distributed
# training or distributed inference if --inference-only is enabled.
if dlrm.ndevices_available <= 1:
if use_fbgemm_gpu:
dlrm.fbgemm_emb_l = nn.ModuleList(
[
fbgemm_gpu_emb_bag_wrapper(
device,
dlrm.emb_l if dlrm.emb_l else dlrm.emb_l_q,
dlrm.m_spa,
dlrm.quantize_bits,
dlrm.learning_rate,
dlrm.fbgemm_gpu_codegen_pref,
dlrm.requires_grad,
)
]
)
if use_gpu:
dlrm = dlrm.to(device)
if dlrm.weighted_pooling == "fixed":
for k, w in enumerate(dlrm.v_W_l):
dlrm.v_W_l[k] = w.cuda()
else:
# Handing Multi-gpu mode
dlrm.bot_l = dlrm.bot_l.to(device)
dlrm.top_l = dlrm.top_l.to(device)
dlrm.prepare_parallel_model(ndevices)
if args.use_torch2trt_for_mlp:
if torch2trt and use_gpu and args.inference_only:
bot_l_sample_input = torch.ones([1, ln_bot[0]], dtype=torch.float32).cuda()
top_l_sample_input = torch.ones([1, ln_top[0]], dtype=torch.float32).cuda()
additional_args = {}
if args.quantize_mlp_with_bit == 16:
additional_args['fp16_mode']=True
elif args.quantize_mlp_with_bit == 8:
additional_args['int8_mode']=True
dlrm.bot_l = torch2trt(dlrm.bot_l, (bot_l_sample_input,), **additional_args)
dlrm.top_l = torch2trt(dlrm.top_l, (top_l_sample_input,), **additional_args)
elif torch2trt is None:
sys.exit("\ntorch2trt module failed to import.\n\n" + torch2trt_import_error_msg)
else:
error_msg = "ERROR: When --use-torch2trt-for-mlp is enabled, "
if not use_gpu:
error_msg += "--use-gpu must be enabled, "
if not args.inference_only:
error_msg += "--inference-only must be enabled, "
error_msg = error_msg[:-2] + "."
sys.exit(error_msg)
# distribute data parallel mlps
if ext_dist.my_size > 1:
if use_gpu:
device_ids = [ext_dist.my_local_rank]
dlrm.bot_l = ext_dist.DDP(dlrm.bot_l, device_ids=device_ids)
dlrm.top_l = ext_dist.DDP(dlrm.top_l, device_ids=device_ids)
else:
dlrm.bot_l = ext_dist.DDP(dlrm.bot_l)
dlrm.top_l = ext_dist.DDP(dlrm.top_l)
if not args.inference_only:
# specify the optimizer algorithm
opts = {
"sgd": torch.optim.SGD,
"rwsadagrad": RowWiseSparseAdagrad.RWSAdagrad,
"adagrad": apex.optimizers.FusedAdagrad
if apex
else torch.optim.Adagrad,
}
parameters = (
dlrm.parameters()
if ext_dist.my_size == 1
else [
{
"params": [
p
for emb in (
[e.fbgemm_gpu_emb_bag for e in dlrm.fbgemm_emb_l]
if use_fbgemm_gpu
else dlrm.emb_l_q
if dlrm.quantize_bits != 32
else dlrm.emb_l
)
for p in emb.parameters()
],
"lr": args.learning_rate,
},
# TODO check this lr setup
# bottom mlp has no data parallelism
# need to check how do we deal with top mlp
{
"params": dlrm.bot_l.parameters(),
"lr": args.learning_rate,
},
{
"params": dlrm.top_l.parameters(),
"lr": args.learning_rate,
},
]
)
optimizer = opts[args.optimizer](parameters, lr=args.learning_rate)
lr_scheduler = LRPolicyScheduler(
optimizer,
args.lr_num_warmup_steps,
args.lr_decay_start_step,
args.lr_num_decay_steps,
)
# Guarantee GPU setup has completed before training or inference starts.
if use_gpu:
torch.cuda.synchronize()
### main loop ###
# training or inference
best_acc_test = 0
best_auc_test = 0
skip_upto_epoch = 0
skip_upto_batch = 0
total_time = 0
total_loss = 0
total_iter = 0
total_samp = 0
if args.mlperf_logging:
mlperf_logger.mlperf_submission_log("dlrm")
mlperf_logger.log_event(
key=mlperf_logger.constants.SEED, value=args.numpy_rand_seed
)
mlperf_logger.log_event(
key=mlperf_logger.constants.GLOBAL_BATCH_SIZE, value=args.mini_batch_size
)
# Load model is specified
if not (args.load_model == ""):
print("Loading saved model {}".format(args.load_model))
if use_gpu:
if dlrm.ndevices_available > 1:
# NOTE: when targeting inference on multiple GPUs,
# load the model as is on CPU or GPU, with the move
# to multiple GPUs to be done in parallel_forward
ld_model = torch.load(args.load_model)
else:
# NOTE: when targeting inference on single GPU,
# note that the call to .to(device) has already happened
ld_model = torch.load(
args.load_model,
map_location=torch.device("cuda")
# map_location=lambda storage, loc: storage.cuda(0)
)
else:
# when targeting inference on CPU
ld_model = torch.load(args.load_model, map_location=torch.device("cpu"))
dlrm.load_state_dict(ld_model["state_dict"])
ld_j = ld_model["iter"]
ld_k = ld_model["epoch"]
ld_nepochs = ld_model["nepochs"]
ld_nbatches = ld_model["nbatches"]
ld_nbatches_test = ld_model["nbatches_test"]
ld_train_loss = ld_model["train_loss"]
ld_total_loss = ld_model["total_loss"]
if args.mlperf_logging:
ld_gAUC_test = ld_model["test_auc"]
ld_acc_test = ld_model["test_acc"]
if not args.inference_only:
optimizer.load_state_dict(ld_model["opt_state_dict"])
best_acc_test = ld_acc_test
total_loss = ld_total_loss
skip_upto_epoch = ld_k # epochs
skip_upto_batch = ld_j # batches
else:
args.print_freq = ld_nbatches
args.test_freq = 0
print(
"Saved at: epoch = {:d}/{:d}, batch = {:d}/{:d}, ntbatch = {:d}".format(
ld_k, ld_nepochs, ld_j, ld_nbatches, ld_nbatches_test
)
)
print(
"Training state: loss = {:.6f}".format(
ld_train_loss,
)
)
if args.mlperf_logging:
print(
"Testing state: accuracy = {:3.3f} %, auc = {:.3f}".format(
ld_acc_test * 100, ld_gAUC_test
)
)
else:
print("Testing state: accuracy = {:3.3f} %".format(ld_acc_test * 100))
print("time/loss/accuracy (if enabled):")
if args.mlperf_logging:
# LR is logged twice for now because of a compliance checker bug
mlperf_logger.log_event(
key=mlperf_logger.constants.OPT_BASE_LR, value=args.learning_rate
)
mlperf_logger.log_event(
key=mlperf_logger.constants.OPT_LR_WARMUP_STEPS,
value=args.lr_num_warmup_steps,
)
# use logging keys from the official HP table and not from the logging library
mlperf_logger.log_event(
key="sgd_opt_base_learning_rate", value=args.learning_rate
)
mlperf_logger.log_event(
key="lr_decay_start_steps", value=args.lr_decay_start_step
)
mlperf_logger.log_event(
key="sgd_opt_learning_rate_decay_steps", value=args.lr_num_decay_steps
)
mlperf_logger.log_event(key="sgd_opt_learning_rate_decay_poly_power", value=2)
tb_file = "./" + args.tensor_board_filename
writer = SummaryWriter(tb_file)
# Pre-cache samples.
if args.precache_ml_data:
for _ in (test_ld if args.inference_only else train_ld):
pass
ext_dist.barrier()
with torch.autograd.profiler.profile(
args.enable_profiling, use_cuda=use_gpu, record_shapes=True
) as prof:
if not args.inference_only:
bmlogger = get_bmlogger() # default to Nop logger
if args.fb5logger is not None:
bmlogger = get_bmlogger(args.fb5logger)
bmlogger.header("DLRM", "OOTB", "train", args.fb5config, score_metric=loggerconstants.EXPS)
k = 0
while k < args.nepochs:
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_start(
key=mlperf_logger.constants.BLOCK_START,
metadata={
mlperf_logger.constants.FIRST_EPOCH_NUM: (k + 1),
mlperf_logger.constants.EPOCH_COUNT: 1,
},
)
mlperf_logger.barrier()
mlperf_logger.log_start(
key=mlperf_logger.constants.EPOCH_START,
metadata={mlperf_logger.constants.EPOCH_NUM: (k + 1)},
)
if k < skip_upto_epoch:
continue
if args.print_accumulated_time:
accum_time_begin = time_wrap(use_gpu)
if args.mlperf_logging:
previous_iteration_time = None
for j, inputBatch in enumerate(train_ld):
if j == 0 and args.save_onnx:
X_onnx, lS_o_onnx, lS_i_onnx, _, _, _ = unpack_batch(inputBatch)
if j < skip_upto_batch:
continue
if k == 0 and j == args.warmup_steps:
bmlogger.run_start()
X, lS_o, lS_i, T, W, CBPP = unpack_batch(inputBatch)
if args.mlperf_logging:
current_time = time_wrap(use_gpu)
if previous_iteration_time:
iteration_time = current_time - previous_iteration_time
else:
iteration_time = 0
previous_iteration_time = current_time
else:
t1 = time_wrap(use_gpu)
# early exit if nbatches was set by the user and has been exceeded
if nbatches > 0 and j >= nbatches:
break
# Skip the batch if batch size not multiple of total ranks
if ext_dist.my_size > 1 and X.size(0) % ext_dist.my_size != 0:
print(
"Warning: Skiping the batch %d with size %d"
% (j, X.size(0))
)
continue
mbs = T.shape[0] # = args.mini_batch_size except maybe for last
# forward pass
Z = dlrm_wrap(
X,
lS_o,
lS_i,
use_gpu,
device,
ndevices=ndevices,
)
if ext_dist.my_size > 1:
T = T[ext_dist.get_my_slice(mbs)]
W = W[ext_dist.get_my_slice(mbs)]
# loss
E = loss_fn_wrap(Z, T, use_gpu, device)
# compute loss and accuracy
L = E.detach().cpu().numpy() # numpy array
# training accuracy is not disabled
# S = Z.detach().cpu().numpy() # numpy array
# T = T.detach().cpu().numpy() # numpy array
# # print("res: ", S)
# # print("j, train: BCE", j, L)
# mbs = T.shape[0] # = args.mini_batch_size except maybe for last
# A = np.sum((np.round(S, 0) == T).astype(np.uint8))
with record_function("DLRM backward"):
# Update optimizer parameters to train weights instantiated lazily in
# the parallel_forward call.
if dlrm.ndevices_available > 1 and dlrm.add_new_weights_to_params:
# Pop any prior extra parameters. Priors may exist because
# self.parallel_model_is_not_prepared is set back to True
# when self.parallel_model_batch_size != batch_size.
# Search "self.parallel_model_batch_size != batch_size" in code.
if "lazy_params" in optimizer.param_groups[-1].keys():
optimizer.param_groups.pop()
# dlrm.v_W_l_l is a list of nn.ParameterLists, one ParameterList per gpu.
# Flatten the list of nn.ParameterList to one nn.ParameterList,
# and add it to the trainable params list.
lazy_params = nn.ParameterList()
if dlrm.weighted_pooling == "learned":
lazy_params.extend(
nn.ParameterList(
[p for p_l in dlrm.v_W_l_l for p in p_l]
)
)
if dlrm.use_fbgemm_gpu:
lazy_params.extend(
nn.ParameterList(
[
emb
for emb_ in dlrm.fbgemm_emb_l
for emb in emb_.fbgemm_gpu_emb_bag.parameters()
]
)
)
lazy_params_dict = optimizer.param_groups[0]
lazy_params_dict["lazy_params"] = True
lazy_params_dict["params"] = lazy_params
optimizer.param_groups.append(lazy_params_dict)
dlrm.add_new_weights_to_params = False
# Run "[[t.device.type for t in grp['params']] for grp in optimizer.param_groups]"
# to view devices used by tensors in the param groups.
# scaled error gradient propagation
# (where we do not accumulate gradients across mini-batches)
if (
args.mlperf_logging
and (j + 1) % args.mlperf_grad_accum_iter == 0
) or not args.mlperf_logging:
optimizer.zero_grad()
# backward pass
E.backward()
# optimizer
if (
args.mlperf_logging
and (j + 1) % args.mlperf_grad_accum_iter == 0
) or not args.mlperf_logging:
optimizer.step()
lr_scheduler.step()
if args.mlperf_logging:
total_time += iteration_time
else:
t2 = time_wrap(use_gpu)
total_time += t2 - t1
total_loss += L * mbs
total_iter += 1
total_samp += mbs
should_print = ((j + 1) % args.print_freq == 0) or (
j + 1 == nbatches
)
should_test = (
(args.test_freq > 0)
and (args.data_generation in ["dataset", "random"])
and (((j + 1) % args.test_freq == 0) or (j + 1 == nbatches))
)
# print time, loss and accuracy
if should_print or should_test:
gT = 1000.0 * total_time / total_iter if args.print_time else -1
total_time = 0
train_loss = total_loss / total_samp
total_loss = 0
str_run_type = (
"inference" if args.inference_only else "training"
)
wall_time = ""
if args.print_wall_time:
wall_time = " ({})".format(time.strftime("%H:%M"))
print(
"Finished {} it {}/{} of epoch {}, {:.2f} ms/it,".format(
str_run_type, j + 1, nbatches, k, gT
)
+ " loss {:.6f}".format(train_loss)
+ wall_time,
flush=True,
)
if args.print_accumulated_time and ext_dist.my_rank < 2:
current_unix_time = time_wrap(use_gpu)
ext_dist.orig_print(
"Accumulated time so far: {} for process {} for step {} at {}".format(
current_unix_time - accum_time_begin,
ext_dist.my_rank,
j + 1,
current_unix_time,
)
)
log_iter = nbatches * k + j + 1
writer.add_scalar("Train/Loss", train_loss, log_iter)
total_iter = 0
total_samp = 0
# testing
if should_test:
epoch_num_float = (j + 1) / len(train_ld) + k + 1
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_start(
key=mlperf_logger.constants.EVAL_START,
metadata={
mlperf_logger.constants.EPOCH_NUM: epoch_num_float
},
)
# don't measure training iter time in a test iteration
if args.mlperf_logging:
previous_iteration_time = None
print(
"Testing at - {}/{} of epoch {},".format(j + 1, nbatches, k)
)
model_metrics_dict, is_best = inference(
args,
dlrm,
best_acc_test,
best_auc_test,
test_ld,
device,
use_gpu,
log_iter,
)
if (
is_best
and not (args.save_model == "")
and not args.inference_only
):
model_metrics_dict["epoch"] = k
model_metrics_dict["iter"] = j + 1
model_metrics_dict["train_loss"] = train_loss
model_metrics_dict["total_loss"] = total_loss
model_metrics_dict[
"opt_state_dict"
] = optimizer.state_dict()
print("Saving model to {}".format(args.save_model))
torch.save(model_metrics_dict, args.save_model)
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_end(
key=mlperf_logger.constants.EVAL_STOP,
metadata={
mlperf_logger.constants.EPOCH_NUM: epoch_num_float
},
)
# Uncomment the line below to print out the total time with overhead
# print("Total test time for this group: {}" \
# .format(time_wrap(use_gpu) - accum_test_time_begin))
if (
args.mlperf_logging
and (args.mlperf_acc_threshold > 0)
and (best_acc_test > args.mlperf_acc_threshold)
):
print(
"MLPerf testing accuracy threshold "
+ str(args.mlperf_acc_threshold)
+ " reached, stop training"
)
break
if (
args.mlperf_logging
and (args.mlperf_auc_threshold > 0)
and (best_auc_test > args.mlperf_auc_threshold)
):
print(
"MLPerf testing auc threshold "
+ str(args.mlperf_auc_threshold)
+ " reached, stop training"
)
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_end(
key=mlperf_logger.constants.RUN_STOP,
metadata={
mlperf_logger.constants.STATUS: mlperf_logger.constants.SUCCESS
},
)
break
if k == 0:
bmlogger.run_stop(nbatches - args.warmup_steps, args.mini_batch_size)
if args.mlperf_logging:
mlperf_logger.barrier()
mlperf_logger.log_end(
key=mlperf_logger.constants.EPOCH_STOP,
metadata={mlperf_logger.constants.EPOCH_NUM: (k + 1)},
)
mlperf_logger.barrier()
mlperf_logger.log_end(
key=mlperf_logger.constants.BLOCK_STOP,
metadata={mlperf_logger.constants.FIRST_EPOCH_NUM: (k + 1)},
)
k += 1 # nepochs
if args.mlperf_logging and best_auc_test <= args.mlperf_auc_threshold:
mlperf_logger.barrier()
mlperf_logger.log_end(
key=mlperf_logger.constants.RUN_STOP,
metadata={
mlperf_logger.constants.STATUS: mlperf_logger.constants.ABORTED
},
)
else:
print("Testing for inference only")
inference(
args,
dlrm,
best_acc_test,
best_auc_test,
test_ld,
device,
use_gpu,
)
# profiling
if args.enable_profiling:
time_stamp = str(datetime.datetime.now()).replace(" ", "_")
with open("dlrm_s_pytorch" + time_stamp + "_shape.prof", "w") as prof_f:
prof_f.write(
prof.key_averages(group_by_input_shape=True).table(
sort_by="self_cpu_time_total"
)
)
with open("dlrm_s_pytorch" + time_stamp + "_total.prof", "w") as prof_f:
prof_f.write(prof.key_averages().table(sort_by="self_cpu_time_total"))
prof.export_chrome_trace("dlrm_s_pytorch" + time_stamp + ".json")
# print(prof.key_averages().table(sort_by="cpu_time_total"))
# plot compute graph
if args.plot_compute_graph:
sys.exit(
"ERROR: Please install pytorchviz package in order to use the"
+ " visualization. Then, uncomment its import above as well as"
+ " three lines below and run the code again."
)
# V = Z.mean() if args.inference_only else E
# dot = make_dot(V, params=dict(dlrm.named_parameters()))
# dot.render('dlrm_s_pytorch_graph') # write .pdf file
# test prints
if not args.inference_only and args.debug_mode:
print("updated parameters (weights and bias):")
dlrm.print_weights()
# export the model in onnx
if args.save_onnx:
"""
# workaround 1: tensor -> list
if torch.is_tensor(lS_i_onnx):
lS_i_onnx = [lS_i_onnx[j] for j in range(len(lS_i_onnx))]
# workaound 2: list -> tensor
lS_i_onnx = torch.stack(lS_i_onnx)
"""
# debug prints
# print("inputs", X_onnx, lS_o_onnx, lS_i_onnx)
# print("output", dlrm_wrap(X_onnx, lS_o_onnx, lS_i_onnx, use_gpu, device))
dlrm_pytorch_onnx_file = "dlrm_s_pytorch.onnx"
print("X_onnx.shape", X_onnx.shape)
if torch.is_tensor(lS_o_onnx):
print("lS_o_onnx.shape", lS_o_onnx.shape)
else:
for oo in lS_o_onnx:
print("oo.shape", oo.shape)
if torch.is_tensor(lS_i_onnx):
print("lS_i_onnx.shape", lS_i_onnx.shape)
else:
for ii in lS_i_onnx:
print("ii.shape", ii.shape)
# name inputs and outputs
o_inputs = (
["offsets"]
if torch.is_tensor(lS_o_onnx)
else ["offsets_" + str(i) for i in range(len(lS_o_onnx))]
)
i_inputs = (
["indices"]
if torch.is_tensor(lS_i_onnx)
else ["indices_" + str(i) for i in range(len(lS_i_onnx))]
)
all_inputs = ["dense_x"] + o_inputs + i_inputs
# debug prints
print("inputs", all_inputs)
# create dynamic_axis dictionaries
do_inputs = (
[{"offsets": {1: "batch_size"}}]
if torch.is_tensor(lS_o_onnx)
else [
{"offsets_" + str(i): {0: "batch_size"}} for i in range(len(lS_o_onnx))
]
)
di_inputs = (
[{"indices": {1: "batch_size"}}]
if torch.is_tensor(lS_i_onnx)
else [
{"indices_" + str(i): {0: "batch_size"}} for i in range(len(lS_i_onnx))
]
)
dynamic_axes = {"dense_x": {0: "batch_size"}, "pred": {0: "batch_size"}}
for do in do_inputs:
dynamic_axes.update(do)
for di in di_inputs:
dynamic_axes.update(di)
# debug prints
print(dynamic_axes)
# export model
torch.onnx.export(
dlrm,
(X_onnx, lS_o_onnx, lS_i_onnx),
dlrm_pytorch_onnx_file,
verbose=True,
use_external_data_format=True,
opset_version=11,
input_names=all_inputs,
output_names=["pred"],
dynamic_axes=dynamic_axes,
)
# recover the model back
dlrm_pytorch_onnx = onnx.load("dlrm_s_pytorch.onnx")
# check the onnx model
onnx.checker.check_model(dlrm_pytorch_onnx)
total_time_end = time_wrap(use_gpu)