torchbenchmark/models/dlrm/dlrm_s_caffe2.py (896 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
# Description: an implementation of a deep learning recommendation model (DLRM)
# The model input consists of dense and sparse features. The former is a vector
# of floating point values. The latter is a list of sparse indices into
# embedding tables, which consist of vectors of floating point values.
# The selected vectors are passed to mlp networks denoted by triangles,
# in some cases the vectors are interacted through operators (Ops).
#
# output:
# vector of values
# model: |
# /\
# /__\
# |
# _____________________> Op <___________________
# / | \
# /\ /\ /\
# /__\ /__\ ... /__\
# | | |
# | Op Op
# | ____/__\_____ ____/__\____
# | |_Emb_|____|__| ... |_Emb_|__|___|
# input:
# [ dense features ] [sparse indices] , ..., [sparse indices]
#
# More precise definition of model layers:
# 1) fully connected layers of an mlp
# z = f(y)
# y = Wx + b
#
# 2) embedding lookup (for a list of sparse indices p=[p1,...,pk])
# z = Op(e1,...,ek)
# obtain vectors e1=E[:,p1], ..., ek=E[:,pk]
#
# 3) Operator Op can be one of the following
# Sum(e1,...,ek) = e1 + ... + ek
# Dot(e1,...,ek) = [e1'e1, ..., e1'ek, ..., ek'e1, ..., ek'ek]
# Cat(e1,...,ek) = [e1', ..., ek']'
# where ' denotes transpose operation
#
# References:
# [1] Maxim Naumov, Dheevatsa Mudigere, Hao-Jun Michael Shi, Jianyu Huang,
# Narayanan Sundaram, Jongsoo Park, Xiaodong Wang, Udit Gupta, Carole-Jean Wu,
# Alisson G. Azzolini, Dmytro Dzhulgakov, Andrey Mallevich, Ilia Cherniavskii,
# Yinghai Lu, Raghuraman Krishnamoorthi, Ansha Yu, Volodymyr Kondratenko,
# Stephanie Pereira, Xianjie Chen, Wenlin Chen, Vijay Rao, Bill Jia, Liang Xiong,
# Misha Smelyanskiy, "Deep Learning Recommendation Model for Personalization and
# Recommendation Systems", CoRR, arXiv:1906.00091, 2019
from __future__ import absolute_import, division, print_function, unicode_literals
import functools
# others
import operator
import time
import copy
# data generation
import dlrm_data_caffe2 as dc
# numpy
import numpy as np
import sklearn.metrics
# onnx
# The onnx import causes deprecation warnings every time workers
# are spawned during testing. So, we filter out those warnings.
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
import onnx
import caffe2.python.onnx.frontend
# caffe2
from caffe2.proto import caffe2_pb2
from caffe2.python import brew, core, dyndep, model_helper, net_drawer, workspace
# from caffe2.python.predictor import mobile_exporter
"""
# auxiliary routine used to split input on the mini-bacth dimension
def where_to_split(mini_batch_size, ndevices, _add_leftover=False):
n = (mini_batch_size + ndevices - 1) // ndevices # ceiling
l = mini_batch_size - n * (ndevices - 1) # leftover
s = [n] * (ndevices - 1)
if _add_leftover:
ls += [l if l > 0 else n]
return ls
"""
### define dlrm in Caffe2 ###
class DLRM_Net(object):
def FeedBlobWrapper(self, tag, val, add_prefix=True, split=False, device_id=-1):
if self.ndevices > 1 and add_prefix:
if split:
# split across devices
mini_batch_size = val.shape[0]
# approach 1: np and caffe2 operators assume the mini-batch size is
# divisible exactly by the number of available devices
if mini_batch_size % self.ndevices != 0:
sys.exit("ERROR: caffe2 net assumes that the mini_batch_size "
+ str(mini_batch_size)
+ " is evenly divisible by the number of available devices"
+ str(self.ndevices))
vals = np.split(val, self.ndevices, axis=0)
"""
# approach 2: np and caffe2 operators do not assume exact divisibility
if args.mini_batch_size != mini_batch_size:
sys.exit("ERROR: caffe2 net was prepared for mini-batch size "
+ str(args.mini_batch_size)
+ " which is different from current mini-batch size "
+ str(mini_batch_size) + " being passed to it. "
+ "This is common for the last mini-batch, when "
+ "mini-batch size does not evenly divided the number of "
+ "elements in the data set.")
ls = where_to_split(mini_batch_size, self.ndevices)
vals = np.split(val, ls, axis=0)
"""
# feed to multiple devices
for d in range(self.ndevices):
tag_on_device = "gpu_" + str(d) + "/" + tag
_d = core.DeviceOption(workspace.GpuDeviceType, d)
workspace.FeedBlob(tag_on_device, vals[d], device_option=_d)
else:
# feed to multiple devices
for d in range(self.ndevices):
tag_on_device = "gpu_" + str(d) + "/" + tag
_d = core.DeviceOption(workspace.GpuDeviceType, d)
workspace.FeedBlob(tag_on_device, val, device_option=_d)
else:
# feed to a single device (named or not)
if device_id >= 0:
_d = core.DeviceOption(workspace.GpuDeviceType, device_id)
workspace.FeedBlob(tag, val, device_option=_d)
else:
workspace.FeedBlob(tag, val)
def FetchBlobWrapper(self, tag, add_prefix=True, reduce_across=None, device_id=-1):
if self.ndevices > 1 and add_prefix:
# fetch from multiple devices
vals = []
for d in range(self.ndevices):
if tag.__class__ == list:
tag_on_device = tag[d]
else:
tag_on_device = "gpu_" + str(0) + "/" + tag
val = workspace.FetchBlob(tag_on_device)
vals.append(val)
# reduce across devices
if reduce_across == "add":
return functools.reduce(operator.add, vals)
elif reduce_across == "concat":
return np.concatenate(vals)
else:
return vals
else:
# fetch from a single device (named or not)
if device_id >= 0:
tag_on_device = "gpu_" + str(device_id) + "/" + tag
return workspace.FetchBlob(tag_on_device)
else:
return workspace.FetchBlob(tag)
def AddLayerWrapper(self, layer, inp_blobs, out_blobs,
add_prefix=True, reset_grad=False, **kwargs):
# auxiliary routine to adjust tags
def adjust_tag(blobs, on_device):
if blobs.__class__ == str:
_blobs = on_device + blobs
elif blobs.__class__ == list:
_blobs = list(map(lambda tag: on_device + tag, blobs))
else: # blobs.__class__ == model_helper.ModelHelper or something else
_blobs = blobs
return _blobs
if self.ndevices > 1 and add_prefix:
# add layer on multiple devices
ll = []
for d in range(self.ndevices):
# add prefix on_device
on_device = "gpu_" + str(d) + "/"
_inp_blobs = adjust_tag(inp_blobs, on_device)
_out_blobs = adjust_tag(out_blobs, on_device)
# WARNING: reset_grad option was exlusively designed for WeightedSum
# with inp_blobs=[w, tag_one, "", lr], where "" will be replaced
if reset_grad:
w_grad = self.gradientMap[_inp_blobs[0]]
_inp_blobs[2] = w_grad
# add layer to the model
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, d)):
if kwargs:
new_layer = layer(_inp_blobs, _out_blobs, **kwargs)
else:
new_layer = layer(_inp_blobs, _out_blobs)
ll.append(new_layer)
return ll
else:
# add layer on a single device
# WARNING: reset_grad option was exlusively designed for WeightedSum
# with inp_blobs=[w, tag_one, "", lr], where "" will be replaced
if reset_grad:
w_grad = self.gradientMap[inp_blobs[0]]
inp_blobs[2] = w_grad
# add layer to the model
if kwargs:
new_layer = layer(inp_blobs, out_blobs, **kwargs)
else:
new_layer = layer(inp_blobs, out_blobs)
return new_layer
def create_mlp(self, ln, sigmoid_layer, model, tag):
(tag_layer, tag_in, tag_out) = tag
# build MLP layer by layer
layers = []
weights = []
for i in range(1, ln.size):
n = ln[i - 1]
m = ln[i]
# create tags
tag_fc_w = tag_layer + ":::" + "fc" + str(i) + "_w"
tag_fc_b = tag_layer + ":::" + "fc" + str(i) + "_b"
tag_fc_y = tag_layer + ":::" + "fc" + str(i) + "_y"
tag_fc_z = tag_layer + ":::" + "fc" + str(i) + "_z"
if i == ln.size - 1:
tag_fc_z = tag_out
weights.append(tag_fc_w)
weights.append(tag_fc_b)
# initialize the weights
# approach 1: custom Xavier input, output or two-sided fill
mean = 0.0 # std_dev = np.sqrt(variance)
std_dev = np.sqrt(2 / (m + n)) # np.sqrt(1 / m) # np.sqrt(1 / n)
W = np.random.normal(mean, std_dev, size=(m, n)).astype(np.float32)
std_dev = np.sqrt(1 / m) # np.sqrt(2 / (m + 1))
b = np.random.normal(mean, std_dev, size=m).astype(np.float32)
self.FeedBlobWrapper(tag_fc_w, W)
self.FeedBlobWrapper(tag_fc_b, b)
# approach 2: caffe2 xavier
# W = self.AddLayerWrapper(
# model.param_init_net.XavierFill,
# [],
# tag_fc_w,
# shape=[m, n]
# )
# b = self.AddLayerWrapper(
# model.param_init_net.ConstantFill,
# [],
# tag_fc_b,
# shape=[m]
# )
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
self.onnx_tsd[tag_fc_w] = (onnx.TensorProto.FLOAT, W.shape)
self.onnx_tsd[tag_fc_b] = (onnx.TensorProto.FLOAT, b.shape)
# approach 1: construct fully connected operator using model.net
fc = self.AddLayerWrapper(
model.net.FC, [tag_in, tag_fc_w, tag_fc_b], tag_fc_y
)
# approach 2: construct fully connected operator using brew
# https://github.com/caffe2/tutorials/blob/master/MNIST.ipynb
# fc = brew.fc(model, layer, tag_fc_w, dim_in=m, dim_out=n)
layers.append(fc)
if i == sigmoid_layer:
# approach 1: construct sigmoid operator using model.net
layer = self.AddLayerWrapper(model.net.Sigmoid, tag_fc_y, tag_fc_z)
# approach 2: using brew (which currently does not support sigmoid)
# tag_sigm = tag_layer + ":::" + "sigmoid" + str(i)
# layer = brew.sigmoid(model,fc,tag_sigmoid)
else:
# approach 1: construct relu operator using model.net
layer = self.AddLayerWrapper(model.net.Relu, tag_fc_y, tag_fc_z)
# approach 2: using brew
# tag_relu = tag_layer + ":::" + "relu" + str(i)
# layer = brew.relu(model,fc,tag_relu)
tag_in = tag_fc_z
layers.append(layer)
# WARNING: the dependency between layers is implicit in the tags,
# so only the last layer is added to the layers list. It will
# later be used for interactions.
return layers, weights
def create_emb(self, m, ln, model, tag):
(tag_layer, tag_in, tag_out) = tag
emb_l = []
weights_l = []
for i in range(0, ln.size):
n = ln[i]
# select device
if self.ndevices > 1:
d = i % self.ndevices
else:
d = -1
# create tags
on_device = "" if self.ndevices <= 1 else "gpu_" + str(d) + "/"
len_s = on_device + tag_layer + ":::" + "sls" + str(i) + "_l"
ind_s = on_device + tag_layer + ":::" + "sls" + str(i) + "_i"
tbl_s = on_device + tag_layer + ":::" + "sls" + str(i) + "_w"
sum_s = on_device + tag_layer + ":::" + "sls" + str(i) + "_z"
weights_l.append(tbl_s)
# initialize the weights
# approach 1a: custom
W = np.random.uniform(low=-np.sqrt(1 / n),
high=np.sqrt(1 / n),
size=(n, m)).astype(np.float32)
# approach 1b: numpy rand
# W = ra.rand(n, m).astype(np.float32)
self.FeedBlobWrapper(tbl_s, W, False, device_id=d)
# approach 2: caffe2 xavier
# with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, d)):
# W = model.param_init_net.XavierFill([], tbl_s, shape=[n, m])
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
self.onnx_tsd[tbl_s] = (onnx.TensorProto.FLOAT, W.shape)
# create operator
if self.ndevices <= 1:
EE = model.net.SparseLengthsSum([tbl_s, ind_s, len_s], [sum_s])
else:
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, d)):
EE = model.net.SparseLengthsSum([tbl_s, ind_s, len_s], [sum_s])
emb_l.append(EE)
return emb_l, weights_l
def create_interactions(self, x, ly, model, tag):
(tag_dense_in, tag_sparse_in, tag_int_out) = tag
if self.arch_interaction_op == "dot":
# concatenate dense and sparse features
tag_int_out_info = tag_int_out + "_info"
T, T_info = model.net.Concat(
x + ly,
[tag_int_out + "_cat_axis0", tag_int_out_info + "_cat_axis0"],
axis=1,
add_axis=1,
)
# perform a dot product
Z = model.net.BatchMatMul([T, T], tag_int_out + "_matmul", trans_b=1)
# append dense feature with the interactions (into a row vector)
# approach 1: all
# Zflat = model.net.Flatten(Z, tag_int_out + "_flatten", axis=1)
# approach 2: unique
Zflat_all = model.net.Flatten(Z, tag_int_out + "_flatten_all", axis=1)
Zflat = model.net.BatchGather(
[Zflat_all, tag_int_out + "_tril_indices"],
tag_int_out + "_flatten"
)
R, R_info = model.net.Concat(
x + [Zflat], [tag_int_out, tag_int_out_info], axis=1
)
elif self.arch_interaction_op == "cat":
# concatenation features (into a row vector)
tag_int_out_info = tag_int_out + "_info"
R, R_info = model.net.Concat(
x + ly, [tag_int_out, tag_int_out_info], axis=1
)
else:
sys.exit("ERROR: --arch-interaction-op="
+ self.arch_interaction_op + " is not supported")
return R
def create_sequential_forward_ops(self):
# embeddings
tag = (self.temb, self.tsin, self.tsout)
self.emb_l, self.emb_w = self.create_emb(self.m_spa, self.ln_emb,
self.model, tag)
# bottom mlp
tag = (self.tbot, self.tdin, self.tdout)
self.bot_l, self.bot_w = self.create_mlp(self.ln_bot, self.sigmoid_bot,
self.model, tag)
# interactions
tag = (self.tdout, self.tsout, self.tint)
Z = self.create_interactions([self.bot_l[-1]], self.emb_l, self.model, tag)
# top mlp
tag = (self.ttop, Z, self.tout)
self.top_l, self.top_w = self.create_mlp(self.ln_top, self.sigmoid_top,
self.model, tag)
# debug prints
# print(self.emb_l)
# print(self.bot_l)
# print(self.top_l)
# setup the last output variable
self.last_output = self.top_l[-1]
def create_parallel_forward_ops(self):
# distribute embeddings (model parallelism)
tag = (self.temb, self.tsin, self.tsout)
self.emb_l, self.emb_w = self.create_emb(self.m_spa, self.ln_emb,
self.model, tag)
# replicate mlp (data parallelism)
tag = (self.tbot, self.tdin, self.tdout)
self.bot_l, self.bot_w = self.create_mlp(self.ln_bot, self.sigmoid_bot,
self.model, tag)
# add communication (butterfly shuffle)
t_list = []
for i, emb_output in enumerate(self.emb_l):
# split input
src_d = i % self.ndevices
lo = [emb_output + "_split_" + str(d) for d in range(self.ndevices)]
# approach 1: np and caffe2 operators assume the mini-batch size is
# divisible exactly by the number of available devices
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, src_d)):
self.model.net.Split(emb_output, lo, axis=0)
"""
# approach 2: np and caffe2 operators do not assume exact divisibility
ls = where_to_split(args.mini_batch_size, self.ndevices, _add_leftover=True)
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, src_d)):
emb_output_split = self.model.net.Split(
emb_output, lo, split=lp, axis=0
)
"""
# scatter
y = []
for dst_d in range(len(lo)):
src_blob = lo[dst_d]
dst_blob = str(src_blob).replace(
"gpu_" + str(src_d), "gpu_" + str(dst_d), 1
)
if src_blob != dst_blob:
with core.DeviceScope(
core.DeviceOption(workspace.GpuDeviceType, dst_d)
):
blob = self.model.Copy(src_blob, dst_blob)
else:
blob = dst_blob
y.append(blob)
t_list.append(y)
# adjust lists to be ordered per device
x = list(map(lambda x: list(x), zip(*self.bot_l)))
ly = list(map(lambda y: list(y), zip(*t_list)))
# interactions
for d in range(self.ndevices):
on_device = "gpu_" + str(d) + "/"
tag = (on_device + self.tdout, on_device + self.tsout, on_device + self.tint)
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, d)):
self.create_interactions([x[d][-1]], ly[d], self.model, tag)
# replicate mlp (data parallelism)
tag = (self.ttop, self.tint, self.tout)
self.top_l, self.top_w = self.create_mlp(self.ln_top, self.sigmoid_top,
self.model, tag)
# debug prints
# print(self.model.net.Proto(),end='\n')
# sys.exit("ERROR: debugging")
# setup the last output variable
self.last_output = self.top_l[-1]
def __init__(
self,
m_spa,
ln_emb,
ln_bot,
ln_top,
arch_interaction_op,
arch_interaction_itself=False,
sigmoid_bot=-1,
sigmoid_top=-1,
save_onnx=False,
model=None,
test_net=None,
tag=None,
ndevices=-1,
forward_ops=True,
enable_prof=False,
):
super(DLRM_Net, self).__init__()
# init model
if model is None:
global_init_opt = ["caffe2", "--caffe2_log_level=0"]
if enable_prof:
global_init_opt += [
"--logtostderr=0",
"--log_dir=$HOME",
"--caffe2_logging_print_net_summary=1",
]
workspace.GlobalInit(global_init_opt)
self.set_tags()
self.model = model_helper.ModelHelper(name="DLRM", init_params=True)
self.test_net = None
else:
# WARNING: assume that workspace and tags have been initialized elsewhere
self.set_tags(tag[0], tag[1], tag[2], tag[3], tag[4], tag[5], tag[6],
tag[7], tag[8], tag[9])
self.model = model
self.test_net = test_net
# save arguments
self.m_spa = m_spa
self.ln_emb = ln_emb
self.ln_bot = ln_bot
self.ln_top = ln_top
self.arch_interaction_op = arch_interaction_op
self.arch_interaction_itself = arch_interaction_itself
self.sigmoid_bot = sigmoid_bot
self.sigmoid_top = sigmoid_top
self.save_onnx = save_onnx
self.ndevices = ndevices
# onnx types and shapes dictionary
if self.save_onnx:
self.onnx_tsd = {}
# create forward operators
if forward_ops:
if self.ndevices <= 1:
return self.create_sequential_forward_ops()
else:
return self.create_parallel_forward_ops()
def set_tags(
self,
_tag_layer_top_mlp="top",
_tag_layer_bot_mlp="bot",
_tag_layer_embedding="emb",
_tag_feature_dense_in="dense_in",
_tag_feature_dense_out="dense_out",
_tag_feature_sparse_in="sparse_in",
_tag_feature_sparse_out="sparse_out",
_tag_interaction="interaction",
_tag_dense_output="prob_click",
_tag_dense_target="target",
):
# layer tags
self.ttop = _tag_layer_top_mlp
self.tbot = _tag_layer_bot_mlp
self.temb = _tag_layer_embedding
# dense feature tags
self.tdin = _tag_feature_dense_in
self.tdout = _tag_feature_dense_out
# sparse feature tags
self.tsin = _tag_feature_sparse_in
self.tsout = _tag_feature_sparse_out
# output and target tags
self.tint = _tag_interaction
self.ttar = _tag_dense_target
self.tout = _tag_dense_output
def parameters(self):
return self.model
def get_loss(self):
return self.FetchBlobWrapper(self.loss, reduce_across="add")
def get_output(self):
return self.FetchBlobWrapper(self.last_output, reduce_across="concat")
def create(self, X, S_lengths, S_indices, T):
self.create_input(X, S_lengths, S_indices, T)
self.create_model(X, S_lengths, S_indices, T)
def create_input(self, X, S_lengths, S_indices, T):
# feed input data to blobs
self.FeedBlobWrapper(self.tdin, X, split=True)
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
self.onnx_tsd[self.tdin] = (onnx.TensorProto.FLOAT, X.shape)
for i in range(len(self.emb_l)):
# select device
if self.ndevices > 1:
d = i % self.ndevices
else:
d = -1
# create tags
on_device = "" if self.ndevices <= 1 else "gpu_" + str(d) + "/"
len_s = on_device + self.temb + ":::" + "sls" + str(i) + "_l"
ind_s = on_device + self.temb + ":::" + "sls" + str(i) + "_i"
self.FeedBlobWrapper(len_s, np.array(S_lengths[i]), False, device_id=d)
self.FeedBlobWrapper(ind_s, np.array(S_indices[i]), False, device_id=d)
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
lshape = (len(S_lengths[i]),) # =args.mini_batch_size
ishape = (len(S_indices[i]),)
self.onnx_tsd[len_s] = (onnx.TensorProto.INT32, lshape)
self.onnx_tsd[ind_s] = (onnx.TensorProto.INT32, ishape)
# feed target data to blobs
if T is not None:
zeros_fp32 = np.zeros(T.shape).astype(np.float32)
self.FeedBlobWrapper(self.ttar, zeros_fp32, split=True)
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
self.onnx_tsd[self.ttar] = (onnx.TensorProto.FLOAT, T.shape)
def create_model(self, X, S_lengths, S_indices, T):
#setup tril indices for the interactions
offset = 1 if self.arch_interaction_itself else 0
num_fea = len(self.emb_l) + 1
tril_indices = np.array([j + i * num_fea
for i in range(num_fea) for j in range(i + offset)])
self.FeedBlobWrapper(self.tint + "_tril_indices", tril_indices)
if self.save_onnx:
tish = tril_indices.shape
self.onnx_tsd[self.tint + "_tril_indices"] = (onnx.TensorProto.INT32, tish)
# create compute graph
if T is not None:
# WARNING: RunNetOnce call is needed only if we use brew and ConstantFill.
# We could use direct calls to self.model functions above to avoid it
workspace.RunNetOnce(self.model.param_init_net)
workspace.CreateNet(self.model.net)
if self.test_net is not None:
workspace.CreateNet(self.test_net)
def run(self, X, S_lengths, S_indices, T, test_net=False, enable_prof=False):
# feed input data to blobs
# dense features
self.FeedBlobWrapper(self.tdin, X, split=True)
# sparse features
for i in range(len(self.emb_l)):
# select device
if self.ndevices > 1:
d = i % self.ndevices
else:
d = -1
# create tags
on_device = "" if self.ndevices <= 1 else "gpu_" + str(d) + "/"
len_s = on_device + self.temb + ":::" + "sls" + str(i) + "_l"
ind_s = on_device + self.temb + ":::" + "sls" + str(i) + "_i"
self.FeedBlobWrapper(len_s, np.array(S_lengths[i]), False, device_id=d)
self.FeedBlobWrapper(ind_s, np.array(S_indices[i]), False, device_id=d)
# feed target data to blobs if needed
if T is not None:
self.FeedBlobWrapper(self.ttar, T, split=True)
# execute compute graph
if test_net:
workspace.RunNet(self.test_net)
else:
if enable_prof:
workspace.C.benchmark_net(self.model.net.Name(), 0, 1, True)
else:
workspace.RunNet(self.model.net)
# debug prints
# print("intermediate")
# print(self.FetchBlobWrapper(self.bot_l[-1]))
# for tag_emb in self.emb_l:
# print(self.FetchBlobWrapper(tag_emb))
# print(self.FetchBlobWrapper(self.tint))
def MSEloss(self, scale=1.0):
# add MSEloss to the model
self.AddLayerWrapper(self.model.SquaredL2Distance, [self.tout, self.ttar], "sd")
self.AddLayerWrapper(self.model.Scale, "sd", "sd2", scale=2.0 * scale)
# WARNING: "loss" is a special tag and should not be changed
self.loss = self.AddLayerWrapper(self.model.AveragedLoss, "sd2", "loss")
def BCEloss(self, scale=1.0, threshold=0.0):
# add BCEloss to the mode
if 0.0 < threshold and threshold < 1.0:
self.AddLayerWrapper(self.model.Clip, self.tout, "tout_c",
min=threshold, max=(1.0 - threshold))
self.AddLayerWrapper(self.model.MakeTwoClass, "tout_c", "tout_2c")
else:
self.AddLayerWrapper(self.model.MakeTwoClass, self.tout, "tout_2c")
self.AddLayerWrapper(self.model.LabelCrossEntropy, ["tout_2c", self.ttar], "sd")
# WARNING: "loss" is a special tag and should not be changed
if scale == 1.0:
self.loss = self.AddLayerWrapper(self.model.AveragedLoss, "sd", "loss")
else:
self.AddLayerWrapper(self.model.Scale, "sd", "sd2", scale=scale)
self.loss = self.AddLayerWrapper(self.model.AveragedLoss, "sd2", "loss")
def sgd_optimizer(self, learning_rate,
T=None, _gradientMap=None, sync_dense_params=True):
# create one, it and lr tags (or use them if already present)
if T is not None:
(tag_one, tag_it, tag_lr) = T
else:
(tag_one, tag_it, tag_lr) = ("const_one", "optim_it", "optim_lr")
# approach 1: feed values directly
# self.FeedBlobWrapper(tag_one, np.ones(1).astype(np.float32))
# self.FeedBlobWrapper(tag_it, np.zeros(1).astype(np.int64))
# it = self.AddLayerWrapper(self.model.Iter, tag_it, tag_it)
# lr = self.AddLayerWrapper(self.model.LearningRate, tag_it, tag_lr,
# base_lr=-1 * learning_rate, policy="fixed")
# approach 2: use brew
self.AddLayerWrapper(self.model.param_init_net.ConstantFill,
[], tag_one, shape=[1], value=1.0)
self.AddLayerWrapper(brew.iter, self.model, tag_it)
self.AddLayerWrapper(self.model.LearningRate, tag_it, tag_lr,
base_lr=-1 * learning_rate, policy="fixed")
# save the blob shapes for latter (only needed if onnx is requested)
if self.save_onnx:
self.onnx_tsd[tag_one] = (onnx.TensorProto.FLOAT, (1,))
self.onnx_tsd[tag_it] = (onnx.TensorProto.INT64, (1,))
# create gradient maps (or use them if already present)
if _gradientMap is not None:
self.gradientMap = _gradientMap
else:
if self.loss.__class__ == list:
self.gradientMap = self.model.AddGradientOperators(self.loss)
else:
self.gradientMap = self.model.AddGradientOperators([self.loss])
# update weights
# approach 1: builtin function
# optimizer.build_sgd(self.model, base_learning_rate=learning_rate)
# approach 2: custom code
# top MLP weight and bias
for w in self.top_w:
# allreduce across devices if needed
if sync_dense_params and self.ndevices > 1:
grad_blobs = [
self.gradientMap["gpu_{}/".format(d) + w]
for d in range(self.ndevices)
]
self.model.NCCLAllreduce(grad_blobs, grad_blobs)
# update weights
self.AddLayerWrapper(self.model.WeightedSum,
[w, tag_one, "", tag_lr], w, reset_grad=True)
# bottom MLP weight and bias
for w in self.bot_w:
# allreduce across devices if needed
if sync_dense_params and self.ndevices > 1:
grad_blobs = [
self.gradientMap["gpu_{}/".format(d) + w]
for d in range(self.ndevices)
]
self.model.NCCLAllreduce(grad_blobs, grad_blobs)
# update weights
self.AddLayerWrapper(self.model.WeightedSum,
[w, tag_one, "", tag_lr], w, reset_grad=True)
# update embeddings
for i, w in enumerate(self.emb_w):
# select device
if self.ndevices > 1:
d = i % self.ndevices
# create tags
on_device = "" if self.ndevices <= 1 else "gpu_" + str(d) + "/"
_tag_one = on_device + tag_one
_tag_lr = on_device + tag_lr
# pickup gradient
w_grad = self.gradientMap[w]
# update weights
if self.ndevices > 1:
with core.DeviceScope(core.DeviceOption(workspace.GpuDeviceType, d)):
self.model.ScatterWeightedSum([w, _tag_one, w_grad.indices,
w_grad.values, _tag_lr], w)
else:
self.model.ScatterWeightedSum([w, _tag_one, w_grad.indices,
w_grad.values, _tag_lr], w)
def print_all(self):
# approach 1: all
print(workspace.Blobs(), end='\n')
for _, l in enumerate(workspace.Blobs()):
print(l)
print(self.FetchBlobWrapper(l))
# approach 2: only summary
# for param in self.model.params:
# self.model.Summarize(param, [], to_file=1)
# self.model.Summarize(self.model.param_to_grad[param], [], to_file=1)
def print_weights(self):
for _, l in enumerate(self.emb_w):
# print(l)
print(self.FetchBlobWrapper(l, False))
for _, l in enumerate(self.bot_w):
# print(l)
if self.ndevices > 1:
print(self.FetchBlobWrapper(l, False, device_id=0))
else:
print(self.FetchBlobWrapper(l))
for _, l in enumerate(self.top_w):
# print(l)
if self.ndevices > 1:
print(self.FetchBlobWrapper(l, False, device_id=0))
else:
print(self.FetchBlobWrapper(l))
def print_activations(self):
for _, l in enumerate(self.emb_l):
print(l)
print(self.FetchBlobWrapper(l, False))
for _, l in enumerate(self.bot_l):
print(l)
print(self.FetchBlobWrapper(l))
print(self.tint)
print(self.FetchBlobWrapper(self.tint))
for _, l in enumerate(self.top_l):
print(l)
print(self.FetchBlobWrapper(l))
def define_metrics():
metrics = {
'loss': lambda y_true, y_score:
sklearn.metrics.log_loss(
y_true=y_true,
y_pred=y_score,
labels=[0,1]),
'recall': lambda y_true, y_score:
sklearn.metrics.recall_score(
y_true=y_true,
y_pred=np.round(y_score)
),
'precision': lambda y_true, y_score:
sklearn.metrics.precision_score(
y_true=y_true,
y_pred=np.round(y_score)
),
'f1': lambda y_true, y_score:
sklearn.metrics.f1_score(
y_true=y_true,
y_pred=np.round(y_score)
),
'ap': sklearn.metrics.average_precision_score,
'roc_auc': sklearn.metrics.roc_auc_score,
'accuracy': lambda y_true, y_score:
sklearn.metrics.accuracy_score(
y_true=y_true,
y_pred=np.round(y_score)
),
# 'pre_curve' : sklearn.metrics.precision_recall_curve,
# 'roc_curve' : sklearn.metrics.roc_curve,
}
return metrics
def calculate_metrics(targets, scores):
scores = np.concatenate(scores, axis=0)
targets = np.concatenate(targets, axis=0)
metrics = define_metrics()
# print("Compute time for validation metric : ", end="")
# first_it = True
validation_results = {}
for metric_name, metric_function in metrics.items():
# if first_it:
# first_it = False
# else:
# print(", ", end="")
# metric_compute_start = time_wrap(False)
try:
validation_results[metric_name] = metric_function(
targets,
scores
)
except Exception as error :
validation_results[metric_name] = -1
print("{} in calculating {}".format(error, metric_name))
# metric_compute_end = time_wrap(False)
# met_time = metric_compute_end - metric_compute_start
# print("{} {:.4f}".format(metric_name, 1000 * (met_time)),
# end="")
# print(" ms")
return validation_results
if __name__ == "__main__":
### import packages ###
import sys
import argparse
### parse arguments ###
parser = argparse.ArgumentParser(
description="Train Deep Learning Recommendation Model (DLRM)"
)
# model related parameters
parser.add_argument("--arch-sparse-feature-size", type=int, default=2)
parser.add_argument("--arch-embedding-size", type=str, default="4-3-2")
parser.add_argument("--arch-mlp-bot", type=str, default="4-3-2")
parser.add_argument("--arch-mlp-top", type=str, default="4-2-1")
parser.add_argument("--arch-interaction-op", type=str, default="dot")
parser.add_argument("--arch-interaction-itself", action="store_true", default=False)
# activations and loss
parser.add_argument("--activation-function", type=str, default="relu")
parser.add_argument("--loss-function", type=str, default="mse") # or bce
parser.add_argument("--loss-threshold", type=float, default=0.0) # 1.0e-7
parser.add_argument("--round-targets", type=bool, default=False)
# data
parser.add_argument("--data-size", type=int, default=1)
parser.add_argument("--num-batches", type=int, default=0)
parser.add_argument("--data-generation", type=str, default="random") # or synthetic or dataset
parser.add_argument("--data-trace-file", type=str, default="./input/dist_emb_j.log")
parser.add_argument("--data-set", type=str, default="kaggle") # or terabyte
parser.add_argument("--raw-data-file", type=str, default="")
parser.add_argument("--processed-data-file", type=str, default="")
parser.add_argument("--data-randomize", type=str, default="total") # or day or none
parser.add_argument("--data-trace-enable-padding", type=bool, default=False)
parser.add_argument("--max-ind-range", type=int, default=-1)
parser.add_argument("--data-sub-sample-rate", type=float, default=0.0) # in [0, 1]
parser.add_argument("--num-indices-per-lookup", type=int, default=10)
parser.add_argument("--num-indices-per-lookup-fixed", type=bool, default=False)
parser.add_argument("--memory-map", action="store_true", default=False)
# training
parser.add_argument("--mini-batch-size", type=int, default=1)
parser.add_argument("--nepochs", type=int, default=1)
parser.add_argument("--learning-rate", type=float, default=0.01)
parser.add_argument("--print-precision", type=int, default=5)
parser.add_argument("--numpy-rand-seed", type=int, default=123)
parser.add_argument("--sync-dense-params", type=bool, default=True)
parser.add_argument("--caffe2-net-type", type=str, default="")
# inference
parser.add_argument("--inference-only", action="store_true", default=False)
# onnx (or protobuf with shapes)
parser.add_argument("--save-onnx", action="store_true", default=False)
parser.add_argument("--save-proto-types-shapes", action="store_true", default=False)
# gpu
parser.add_argument("--use-gpu", action="store_true", default=False)
# debugging and profiling
parser.add_argument("--print-freq", type=int, default=1)
parser.add_argument("--test-freq", type=int, default=-1)
parser.add_argument("--print-time", action="store_true", default=False)
parser.add_argument("--debug-mode", action="store_true", default=False)
parser.add_argument("--enable-profiling", action="store_true", default=False)
parser.add_argument("--plot-compute-graph", action="store_true", default=False)
# mlperf logging (disables other output and stops early)
parser.add_argument("--mlperf-logging", action="store_true", default=False)
# stop at target accuracy Kaggle 0.789, Terabyte (sub-sampled=0.875) 0.8107
parser.add_argument("--mlperf-acc-threshold", type=float, default=0.0)
# stop at target AUC Terabyte (no subsampling) 0.8025
parser.add_argument("--mlperf-auc-threshold", type=float, default=0.0)
args = parser.parse_args()
### some basic setup ###
np.random.seed(args.numpy_rand_seed)
np.set_printoptions(precision=args.print_precision)
use_gpu = args.use_gpu
if use_gpu:
device_opt = core.DeviceOption(workspace.GpuDeviceType, 0)
ngpus = workspace.NumGpuDevices() # 1
print("Using {} GPU(s)...".format(ngpus))
else:
device_opt = core.DeviceOption(caffe2_pb2.CPU)
print("Using CPU...")
### prepare training data ###
ln_bot = np.fromstring(args.arch_mlp_bot, dtype=int, sep="-")
if args.data_generation == "dataset":
# input and target from dataset
(nbatches, lX, lS_l, lS_i, lT,
nbatches_test, lX_test, lS_l_test, lS_i_test, lT_test,
ln_emb, m_den) = dc.read_dataset(
args.data_set, args.max_ind_range, args.data_sub_sample_rate,
args.mini_batch_size, args.num_batches, args.data_randomize, "train",
args.raw_data_file, args.processed_data_file, args.memory_map
)
# enforce maximum limit on number of vectors per embedding
if args.max_ind_range > 0:
ln_emb = np.array(list(map(
lambda x: x if x < args.max_ind_range else args.max_ind_range,
ln_emb
)))
ln_bot[0] = m_den
else:
# input and target at random
ln_emb = np.fromstring(args.arch_embedding_size, dtype=int, sep="-")
m_den = ln_bot[0]
(nbatches, lX, lS_l, lS_i, lT) = dc.generate_random_data(
m_den, ln_emb, args.data_size, args.num_batches, args.mini_batch_size,
args.num_indices_per_lookup, args.num_indices_per_lookup_fixed,
1, args.round_targets, args.data_generation, args.data_trace_file,
args.data_trace_enable_padding
)
### parse command line arguments ###
m_spa = args.arch_sparse_feature_size
num_fea = ln_emb.size + 1 # num sparse + num dense features
m_den_out = ln_bot[ln_bot.size - 1]
if args.arch_interaction_op == "dot":
# approach 1: all
# num_int = num_fea * num_fea + m_den_out
# approach 2: unique
if args.arch_interaction_itself:
num_int = (num_fea * (num_fea + 1)) // 2 + m_den_out
else:
num_int = (num_fea * (num_fea - 1)) // 2 + m_den_out
elif args.arch_interaction_op == "cat":
num_int = num_fea * m_den_out
else:
sys.exit("ERROR: --arch-interaction-op="
+ args.arch_interaction_op + " is not supported")
arch_mlp_top_adjusted = str(num_int) + "-" + args.arch_mlp_top
ln_top = np.fromstring(arch_mlp_top_adjusted, dtype=int, sep="-")
# sanity check: feature sizes and mlp dimensions must match
if m_den != ln_bot[0]:
sys.exit("ERROR: arch-dense-feature-size "
+ str(m_den) + " does not match first dim of bottom mlp " + str(ln_bot[0]))
if m_spa != m_den_out:
sys.exit("ERROR: arch-sparse-feature-size "
+ str(m_spa) + " does not match last dim of bottom mlp " + str(m_den_out))
if num_int != ln_top[0]:
sys.exit("ERROR: # of feature interactions "
+ str(num_int) + " does not match first dim of top mlp " + str(ln_top[0]))
# test prints (model arch)
if args.debug_mode:
print("model arch:")
print("mlp top arch " + str(ln_top.size - 1)
+ " layers, with input to output dimensions:")
print(ln_top)
print("# of interactions")
print(num_int)
print("mlp bot arch " + str(ln_bot.size - 1)
+ " layers, with input to output dimensions:")
print(ln_bot)
print("# of features (sparse and dense)")
print(num_fea)
print("dense feature size")
print(m_den)
print("sparse feature size")
print(m_spa)
print("# of embeddings (= # of sparse features) " + str(ln_emb.size)
+ ", with dimensions " + str(m_spa) + "x:")
print(ln_emb)
print("data (inputs and targets):")
for j in range(0, nbatches):
print("mini-batch: %d" % j)
print(lX[j])
print(lS_l[j])
print(lS_i[j])
print(lT[j].astype(np.float32))
### construct the neural network specified above ###
# WARNING: to obtain exactly the same initialization for
# the weights we need to start from the same random seed.
# np.random.seed(args.numpy_rand_seed)
ndevices = min(ngpus, args.mini_batch_size, num_fea - 1) if use_gpu else -1
flag_types_shapes = args.save_onnx or args.save_proto_types_shapes
flag_forward_ops = not (use_gpu and ndevices > 1)
with core.DeviceScope(device_opt):
dlrm = DLRM_Net(
m_spa,
ln_emb,
ln_bot,
ln_top,
args.arch_interaction_op,
arch_interaction_itself=args.arch_interaction_itself,
sigmoid_bot=-1,
sigmoid_top=ln_top.size - 1,
save_onnx=flag_types_shapes,
ndevices=ndevices,
# forward_ops = flag_forward_ops
enable_prof=args.enable_profiling,
)
# load nccl if using multiple devices
if args.sync_dense_params and ndevices > 1:
dyndep.InitOpsLibrary("//caffe2/caffe2/contrib/nccl:nccl_ops")
# set the net type for better performance (dag, async_scheduling, etc)
if args.caffe2_net_type:
dlrm.parameters().net.Proto().type = args.caffe2_net_type
# plot compute graph
if args.plot_compute_graph:
graph = net_drawer.GetPydotGraph(
dlrm.parameters().net,
"dlrm_s_caffe2_graph",
"BT"
)
graph.write_pdf(graph.get_name() + ".pdf")
# test prints
if args.debug_mode:
print("initial parameters (weights and bias):")
dlrm.print_weights()
# add training loss if needed
if not args.inference_only:
with core.DeviceScope(device_opt):
# specify the loss function
nd = 1.0 if dlrm.ndevices <= 1 else 1.0 / dlrm.ndevices # 1
if args.loss_function == "mse":
dlrm.MSEloss(scale=nd)
elif args.loss_function == "bce":
dlrm.BCEloss(scale=nd, threshold=args.loss_threshold)
else:
sys.exit("ERROR: --loss-function=" + args.loss_function
+ " is not supported")
# define test net (as train net without gradients)
dlrm.test_net = core.Net(copy.deepcopy(dlrm.model.net.Proto()))
# specify the optimizer algorithm
dlrm.sgd_optimizer(
args.learning_rate, sync_dense_params=args.sync_dense_params
)
# init/create
dlrm.create(lX[0], lS_l[0], lS_i[0], lT[0])
### main loop ###
best_gA_test = 0
best_auc_test = 0
total_time = 0
total_loss = 0
total_accu = 0
total_iter = 0
total_samp = 0
k = 0
print("time/loss/accuracy (if enabled):")
while k < args.nepochs:
j = 0
while j < nbatches:
'''
# debug prints
print("input and targets")
print(lX[j])
print(lS_l[j])
print(lS_i[j])
print(lT[j].astype(np.float32))
'''
# forward and backward pass, where the latter runs only
# when gradients and loss have been added to the net
time1 = time.time()
dlrm.run(lX[j], lS_l[j], lS_i[j], lT[j]) # args.enable_profiling
time2 = time.time()
total_time += time2 - time1
# compte loss and accuracy
Z = dlrm.get_output() # numpy array
T = lT[j] # numpy array
'''
# debug prints
print("output and loss")
print(Z)
print(dlrm.get_loss())
'''
mbs = T.shape[0] # = args.mini_batch_size except maybe for last
A = np.sum((np.round(Z, 0) == T).astype(np.uint8))
total_accu += 0 if args.inference_only else A
total_loss += 0 if args.inference_only else dlrm.get_loss() * mbs
total_iter += 1
total_samp += mbs
# print time, loss and accuracy
should_print = ((j + 1) % args.print_freq == 0) or (j + 1 == nbatches)
should_test = (
(args.test_freq > 0)
and (args.data_generation == "dataset")
and (((j + 1) % args.test_freq == 0) or (j + 1 == nbatches))
)
if should_print or should_test:
gT = 1000. * total_time / total_iter if args.print_time else -1
total_time = 0
gA = total_accu / total_samp
total_accu = 0
gL = total_loss / total_samp
total_loss = 0
str_run_type = "inference" if args.inference_only else "training"
print(
"Finished {} it {}/{} of epoch {}, {:.2f} ms/it,".format(
str_run_type, j + 1, nbatches, k, gT
)
+ " loss {:.6f}, accuracy {:3.3f} %".format(gL, gA * 100)
)
total_iter = 0
total_samp = 0
# debug prints
# print(Z)
# print(T)
# testing
if should_test and not args.inference_only:
# don't measure training iter time in a test iteration
if args.mlperf_logging:
previous_iteration_time = None
test_accu = 0
test_loss = 0
test_samp = 0
if args.mlperf_logging:
scores = []
targets = []
for i in range(nbatches_test):
# early exit if nbatches was set by the user and was exceeded
if nbatches > 0 and i >= nbatches:
break
# forward pass
dlrm.run(lX_test[i], lS_l_test[i], lS_i_test[i], lT_test[i], test_net=True)
Z_test = dlrm.get_output()
T_test = lT_test[i]
if args.mlperf_logging:
scores.append(Z_test)
targets.append(T_test)
else:
# compte loss and accuracy
L_test = dlrm.get_loss()
mbs_test = T_test.shape[0] # = mini_batch_size except last
A_test = np.sum((np.round(Z_test, 0) == T_test).astype(np.uint8))
test_accu += A_test
test_loss += L_test * mbs_test
test_samp += mbs_test
# compute metrics (after test loop has finished)
if args.mlperf_logging:
validation_results = calculate_metrics(targets, scores)
gA_test = validation_results['accuracy']
gL_test = validation_results['loss']
else:
gA_test = test_accu / test_samp
gL_test = test_loss / test_samp
# print metrics
is_best = gA_test > best_gA_test
if is_best:
best_gA_test = gA_test
if args.mlperf_logging:
is_best = validation_results['roc_auc'] > best_auc_test
if is_best:
best_auc_test = validation_results['roc_auc']
print(
"Testing at - {}/{} of epoch {},".format(j + 1, nbatches, k)
+ " loss {:.6f}, recall {:.4f}, precision {:.4f},".format(
validation_results['loss'],
validation_results['recall'],
validation_results['precision']
)
+ " f1 {:.4f}, ap {:.4f},".format(
validation_results['f1'],
validation_results['ap'],
)
+ " auc {:.4f}, best auc {:.4f},".format(
validation_results['roc_auc'],
best_auc_test
)
+ " accuracy {:3.3f} %, best accuracy {:3.3f} %".format(
validation_results['accuracy'] * 100,
best_gA_test * 100
)
)
else:
print(
"Testing at - {}/{} of epoch {},".format(j + 1, nbatches, 0)
+ " loss {:.6f}, accuracy {:3.3f} %, best {:3.3f} %".format(
gL_test, gA_test * 100, best_gA_test * 100
)
)
# check thresholds
if (args.mlperf_logging
and (args.mlperf_acc_threshold > 0)
and (best_gA_test > args.mlperf_acc_threshold)):
print("MLPerf testing accuracy threshold "
+ str(args.mlperf_acc_threshold)
+ " reached, stop training")
break
if (args.mlperf_logging
and (args.mlperf_auc_threshold > 0)
and (best_auc_test > args.mlperf_auc_threshold)):
print("MLPerf testing auc threshold "
+ str(args.mlperf_auc_threshold)
+ " reached, stop training")
break
j += 1 # nbatches
k += 1 # nepochs
# test prints
if not args.inference_only and args.debug_mode:
print("updated parameters (weights and bias):")
dlrm.print_weights()
# build onnx model from caffe2
if args.save_onnx:
pnet = dlrm.parameters().net.Proto()
inet = dlrm.parameters().param_init_net.Proto()
value_info = dlrm.onnx_tsd # None
# debug prints
# print(value_info)
# WARNING: Why Caffe2 to ONNX net transformation currently does not work?
# ONNX does not support SparseLengthsSum operator directly. A workaround
# could be for the Caffe2 ONNX frontend to indirectly map this operator to
# Gather and ReducedSum ONNX operators, following the PyTorch approach.
c2f = caffe2.python.onnx.frontend.Caffe2Frontend()
dlrm_caffe2_onnx = c2f.caffe2_net_to_onnx_model(pnet, inet, value_info)
# check the onnx model
onnx.checker.check_model(dlrm_caffe2_onnx)
# save model to a file
with open("dlrm_s_caffe2.onnx", "w+") as dlrm_caffe2_onnx_file:
dlrm_caffe2_onnx_file.write(str(dlrm_caffe2_onnx))
# build protobuf with types and shapes
if args.save_proto_types_shapes:
# add types and shapes to protobuf
__TYPE_MAPPING = {
onnx.TensorProto.FLOAT: caffe2_pb2.TensorProto.FLOAT,
onnx.TensorProto.UINT8: caffe2_pb2.TensorProto.UINT8,
onnx.TensorProto.INT8: caffe2_pb2.TensorProto.INT8,
onnx.TensorProto.UINT16: caffe2_pb2.TensorProto.UINT16,
onnx.TensorProto.INT16: caffe2_pb2.TensorProto.INT16,
onnx.TensorProto.INT32: caffe2_pb2.TensorProto.INT32,
onnx.TensorProto.INT64: caffe2_pb2.TensorProto.INT64,
onnx.TensorProto.STRING: caffe2_pb2.TensorProto.STRING,
onnx.TensorProto.BOOL: caffe2_pb2.TensorProto.BOOL,
onnx.TensorProto.FLOAT16: caffe2_pb2.TensorProto.FLOAT16,
onnx.TensorProto.DOUBLE: caffe2_pb2.TensorProto.DOUBLE,
}
pnet = dlrm.parameters().net.Proto()
arg = pnet.arg.add()
arg.name = "input_shape_info"
for i in pnet.external_input:
if i in dlrm.onnx_tsd:
onnx_dtype, shape = dlrm.onnx_tsd[i]
t = arg.tensors.add()
t.name = i
t.data_type = __TYPE_MAPPING[onnx_dtype]
t.dims.extend(shape)
else:
print("Warning: we don't have shape/type info for input: {}".format(i))
# debug print
# print(pnet)
# export the protobuf with types and shapes
with open("dlrm_s_caffe2.proto", "w+") as dlrm_s_proto_file:
dlrm_s_proto_file.write(str(pnet))
"""
# export the protobuf with types and shapes as well as weights
# see https://github.com/pytorch/pytorch/issues/9533
#save
net = dlrm.parameters().net
params = dlrm.parameters().params
init_net, predict_net = mobile_exporter.Export(workspace, net, params)
with open("dlrm_s_caffe2.predict", "wb") as dlrm_s_predict_file:
dlrm_s_predict_file.write(predict_net.SerializeToString())
with open("dlrm_s_caffe2.init", "wb") as dlrm_s_init_file:
dlrm_s_init_file.write(init_net.SerializeToString())
#load
net_def = caffe2_pb2.NetDef()
init_def= caffe2_pb2.NetDef()
with open("dlrm_s_caffe2.predict", "rb") as dlrm_s_predict_file:
net_def.ParseFromString(dlrm_s_predict_file.read())
print(net_def)
with open("dlrm_s_caffe2.init", "rb") as dlrm_s_init_file:
init_def.ParseFromString(dlrm_s_init_file.read())
print(init_def)
"""