in easy_rec/python/utils/convert_rtp_fg.py [0:0]
def convert_rtp_fg(rtp_fg,
embedding_dim=16,
batch_size=1024,
label_fields=[],
num_steps=10,
model_type='',
separator='\002',
incol_separator='\003',
train_input_path=None,
eval_input_path=None,
selected_cols='',
input_type='OdpsRTPInput',
is_async=False):
with tf.gfile.GFile(rtp_fg, 'r') as fin:
rtp_fg = json.load(fin)
model_dir = rtp_fg.get('model_dir', 'experiments/rtp_fg_demo')
num_steps = rtp_fg.get('num_steps', num_steps)
model_type = rtp_fg.get('model_type', model_type)
label_fields = rtp_fg.get('label_fields', label_fields)
model_path = rtp_fg.get('model_path', '')
edit_config_json = rtp_fg.get('edit_config_json', None)
rtp_features = rtp_fg['features']
logging.info('model_dir = %s' % model_dir)
logging.info('num_steps = %d' % num_steps)
logging.info('model_type = %s' % model_type)
logging.info('model_path = %s' % model_path)
logging.info('edit_config_json = %s' % edit_config_json)
pipeline_config = load_input_field_and_feature_config(rtp_fg, label_fields,
embedding_dim,
incol_separator)
pipeline_config.model_dir = model_dir
pipeline_config.data_config.separator = separator
if selected_cols:
pipeline_config.data_config.selected_cols = selected_cols
if train_input_path is not None:
pipeline_config.train_input_path = train_input_path
if eval_input_path is not None:
pipeline_config.eval_input_path = eval_input_path
pipeline_config.data_config.batch_size = batch_size
pipeline_config.data_config.rtp_separator = ';'
pipeline_config.data_config.label_fields.extend(label_fields)
text_format.Merge('input_type: %s' % input_type, pipeline_config.data_config)
if model_path:
model_type = None
with tf.gfile.GFile(model_path, 'r') as fin:
model_config = fin.read()
text_format.Merge(model_config, pipeline_config)
if not pipeline_config.HasField('train_config'):
train_config_str = """
train_config {
log_step_count_steps: 200
optimizer_config: {
%s: {
learning_rate: {
exponential_decay_learning_rate {
initial_learning_rate: 0.0001
decay_steps: 100000
decay_factor: 0.5
min_learning_rate: 0.0000001
}
}
}
use_moving_average: false
}
sync_replicas: %s
}
""" % ('adam_optimizer' if not is_async else 'adam_async_optimizer',
'true' if not is_async else 'false')
text_format.Merge(train_config_str, pipeline_config)
pipeline_config.train_config.num_steps = num_steps
if model_type == 'deepfm':
pipeline_config.model_config.model_class = 'DeepFM'
wide_group = FeatureGroupConfig()
wide_group.group_name = 'wide'
wide_group.wide_deep = WideOrDeep.WIDE
for feature in rtp_features:
feature_name = feature['feature_name']
wide_group.feature_names.append(feature_name)
pipeline_config.model_config.feature_groups.append(wide_group)
deep_group = FeatureGroupConfig()
deep_group.CopyFrom(wide_group)
deep_group.group_name = 'deep'
deep_group.wide_deep = WideOrDeep.DEEP
pipeline_config.model_config.feature_groups.append(deep_group)
deepfm_config_str = """
deepfm {
dnn {
hidden_units: [128, 64, 32]
}
final_dnn {
hidden_units: [128, 64]
}
wide_output_dim: 32
l2_regularization: 1e-5
}
"""
text_format.Merge(deepfm_config_str, pipeline_config.model_config)
pipeline_config.model_config.embedding_regularization = 1e-5
elif model_type == 'wide_and_deep':
pipeline_config.model_config.model_class = 'WideAndDeep'
wide_group = FeatureGroupConfig()
wide_group.group_name = 'wide'
wide_group.wide_deep = WideOrDeep.WIDE
for feature in rtp_features:
feature_name = feature['feature_name']
group = feature.get('group', 'wide_and_deep')
if group not in ['wide', 'deep', 'wide_and_deep']:
logging.warning('invalid group %s for %s' % (group, feature_name))
group = 'wide_and_deep'
if group in ['wide', 'wide_and_deep']:
wide_group.feature_names.append(feature_name)
pipeline_config.model_config.feature_groups.append(wide_group)
deep_group = FeatureGroupConfig()
deep_group.group_name = 'deep'
deep_group.wide_deep = WideOrDeep.DEEP
for feature in rtp_features:
feature_name = feature['feature_name']
group = feature.get('group', 'wide_and_deep')
if group not in ['wide', 'deep', 'wide_and_deep']:
group = 'wide_and_deep'
if group in ['deep', 'wide_and_deep']:
deep_group.feature_names.append(feature_name)
pipeline_config.model_config.feature_groups.append(deep_group)
deepfm_config_str = """
wide_and_deep {
dnn {
hidden_units: [128, 64, 32]
}
l2_regularization: 1e-5
}
"""
text_format.Merge(deepfm_config_str, pipeline_config.model_config)
pipeline_config.model_config.embedding_regularization = 1e-5
elif model_type == 'multi_tower':
pipeline_config.model_config.model_class = 'MultiTower'
feature_groups = {}
group_map = {
'u': 'user',
'i': 'item',
'ctx': 'combo',
'q': 'combo',
'comb': 'combo'
}
for feature in rtp_features:
feature_name = feature['feature_name'].strip()
group_name = ''
if 'group' in feature:
group_name = feature['group']
else:
toks = feature_name.split('_')
group_name = toks[0]
if group_name in group_map:
group_name = group_map[group_name]
if group_name in feature_groups:
feature_groups[group_name].append(feature_name)
else:
feature_groups[group_name] = [feature_name]
logging.info(
'if group is specified, group will be used as feature group name; '
'otherwise, the prefix of feature_name in fg.json is used as feature group name'
)
logging.info('prefix map: %s' % str(group_map))
for group_name in feature_groups:
logging.info('add group = %s' % group_name)
group = FeatureGroupConfig()
group.group_name = group_name
for fea_name in feature_groups[group_name]:
group.feature_names.append(fea_name)
group.wide_deep = WideOrDeep.DEEP
pipeline_config.model_config.feature_groups.append(group)
multi_tower_config_str = ' multi_tower {\n'
for group_name in feature_groups:
multi_tower_config_str += """
towers {
input: "%s"
dnn {
hidden_units: [256, 192, 128]
}
}
""" % group_name
multi_tower_config_str = multi_tower_config_str + """
final_dnn {
hidden_units: [192, 128, 64]
}
l2_regularization: 1e-4
}
"""
text_format.Merge(multi_tower_config_str, pipeline_config.model_config)
pipeline_config.model_config.embedding_regularization = 1e-5
elif model_type == 'esmm':
pipeline_config.model_config.model_class = 'ESMM'
feature_groups = {}
for feature in rtp_features:
feature_name = feature['feature_name']
group = feature.get('group', 'all')
if group in feature_groups:
feature_groups[group].append(feature_name)
else:
feature_groups[group] = [feature_name]
for group_name in feature_groups:
logging.info('add group = %s' % group_name)
group = FeatureGroupConfig()
group.group_name = group_name
for fea_name in feature_groups[group_name]:
group.feature_names.append(fea_name)
group.wide_deep = WideOrDeep.DEEP
pipeline_config.model_config.feature_groups.append(group)
esmm_config_str = ' esmm {\n'
for group_name in feature_groups:
esmm_config_str += """
groups {
input: "%s"
dnn {
hidden_units: [256, 128, 96, 64]
}
}""" % group_name
esmm_config_str += """
ctr_tower {
tower_name: "ctr"
label_name: "%s"
dnn {
hidden_units: [128, 96, 64, 32, 16]
}
num_class: 1
weight: 1.0
loss_type: CLASSIFICATION
metrics_set: {
auc {}
}
}
cvr_tower {
tower_name: "cvr"
label_name: "%s"
dnn {
hidden_units: [128, 96, 64, 32, 16]
}
num_class: 1
weight: 1.0
loss_type: CLASSIFICATION
metrics_set: {
auc {}
}
}
l2_regularization: 1e-6
}""" % (label_fields[0], label_fields[1])
text_format.Merge(esmm_config_str, pipeline_config.model_config)
pipeline_config.model_config.embedding_regularization = 5e-5
elif model_type == 'dbmtl':
pipeline_config.model_config.model_class = 'DBMTL'
feature_groups = {}
for feature in rtp_features:
feature_name = feature['feature_name']
group = 'all'
if group in feature_groups:
feature_groups[group].append(feature_name)
else:
feature_groups[group] = [feature_name]
for group_name in feature_groups:
logging.info('add group = %s' % group_name)
group = FeatureGroupConfig()
group.group_name = group_name
for fea_name in feature_groups[group_name]:
group.feature_names.append(fea_name)
group.wide_deep = WideOrDeep.DEEP
pipeline_config.model_config.feature_groups.append(group)
dbmtl_config_str = """
dbmtl {
bottom_dnn {
hidden_units: [1024]
}
expert_dnn {
hidden_units: [256, 128, 64, 32]
}
num_expert: 8
task_towers {
tower_name: "ctr"
label_name: "%s"
loss_type: CLASSIFICATION
metrics_set: {
auc {}
}
dnn {
hidden_units: [256, 128, 64, 32]
}
relation_dnn {
hidden_units: [32]
}
weight: 1.0
}
task_towers {
tower_name: "cvr"
label_name: "%s"
loss_type: CLASSIFICATION
metrics_set: {
auc {}
}
dnn {
hidden_units: [256, 128, 64, 32]
}
relation_tower_names: ["ctr"]
relation_dnn {
hidden_units: [32]
}
weight: 1.0
}
l2_regularization: 1e-6
}
""" % (label_fields[0], label_fields[1])
text_format.Merge(dbmtl_config_str, pipeline_config.model_config)
pipeline_config.model_config.embedding_regularization = 5e-6
if model_type in ['wide_and_deep', 'deepfm', 'multi_tower']:
text_format.Merge("""