easy_rec/python/tools/create_config_from_excel.py (321 lines of code) (raw):

# -*-encoding:utf-8-*- # Copyright (c) Alibaba, Inc. and its affiliates. import logging import math import sys import numpy as np import pandas as pd from easy_rec.python.utils import config_util logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s') class ModelConfigConverter: def __init__(self, excel_path, output_path, model_type, column_separator, incol_separator, train_input_path, eval_input_path, model_dir): self._excel_path = excel_path self._output_path = output_path self._model_type = model_type self._column_separator = column_separator self._incol_separator = incol_separator self._dict_global = self._parse_global() self._tower_dicts = {} self._feature_names = [] self._feature_details = {} self._label = '' self._train_input_path = train_input_path self._eval_input_path = eval_input_path self._model_dir = model_dir if not self._model_dir: self._model_dir = 'experiments/demo' logging.warning('model_dir is not specified, set to %s' % self._model_dir) def _get_type_name(self, input_name): type_dict = { 'bigint': 'INT64', 'double': 'DOUBLE', 'float': 'FLOAT', 'string': 'STRING', 'bool': 'BOOL' } return type_dict[input_name] def _get_type_default(self, input_name): type_dict = { 'bigint': '0', 'double': '0.0', 'float': '0.0', 'string': '', 'bool': 'false' } return type_dict[input_name] def _parse_global(self): df = pd.read_excel(self._excel_path, sheet_name='global') dict_global = {} for i, row in df.iterrows(): field = {} name = field['name'] = row['name'].strip() field['type_name'] = row['type'] field['hash_bucket_size'] = row['hash_bucket_size'] field['embedding_dim'] = row['embedding_dim'] field['default_value'] = row['default_value'] dict_global[name] = field return dict_global def _add_to_tower(self, tower_name, field): if tower_name.lower() == 'nan': return if tower_name != 'label': if self._model_type == 'deepfm': if tower_name == 'deep': tower_names = ['deep'] elif tower_name == 'wide': tower_names = ['wide'] elif tower_name == 'wide_and_deep': tower_names = ['wide', 'deep'] else: raise ValueError( 'invalid tower_name[%s] for deepfm model, ' 'only[label, deep, wide, wide_and_deep are supported]' % tower_name) for tower_name in tower_names: if tower_name in self._tower_dicts: self._tower_dicts[tower_name].append(field) else: self._tower_dicts[tower_name] = [field] else: if tower_name in self._tower_dicts: self._tower_dicts[tower_name].append(field) else: self._tower_dicts[tower_name] = [field] def _is_str(self, v): if isinstance(v, str): return True try: if isinstance(v, unicode): # noqa: F821 return True except NameError: return False return False def _parse_features(self): df = pd.read_excel(self._excel_path, sheet_name='features') for i, row in df.iterrows(): field = {} name = field['name'] = row['name'].strip() self._feature_names.append(name) field['data_type'] = row['data_type'].strip() field['type'] = row['type'].strip() g = str(row['global']).strip() if g and g != 'nan': field['global'] = g field['field_name'] = name if row['type'].strip() == 'label': self._label = name if 'global' in field and field['global'] in self._dict_global: # 如果是global 有值,就跳过 def _is_good(v): return str(v) not in ['nan', ''] if _is_good(self._dict_global[field['global']]['default_value']): field['default_value'] = self._dict_global[ field['global']]['default_value'] if _is_good(self._dict_global[field['global']]['hash_bucket_size']): field['hash_bucket_size'] = self._dict_global[ field['global']]['hash_bucket_size'] if _is_good(self._dict_global[field['global']]['embedding_dim']): field['embedding_dim'] = self._dict_global[ field['global']]['embedding_dim'] field['embedding_name'] = field['global'] for t in [ 'type', 'global', 'hash_bucket_size', 'embedding_dim', 'default_value', 'weights', 'boundaries' ]: if t not in row: continue v = row[t] if v not in ['', ' ', 'NaN', np.NaN, np.NAN, 'nan']: if self._is_str(v): field[t] = v.strip() elif not math.isnan(v): field[t] = int(v) if t == 'default_value' and t not in field: field[t] = '' if field['type'] == 'dense': field[t] = 0.0 if field['type'] == 'weights': field['default_value'] = '1' tower_name = row['group'] if name in self._dict_global: field['type'] = 'category' field['hash_bucket_size'] = self._dict_global[name]['hash_bucket_size'] field['embedding_dim'] = self._dict_global[name]['embedding_dim'] field['default_value'] = self._dict_global[name]['default_value'] if field['data_type'] == 'bigint': field['default_value'] = 0 elif field['data_type'] == 'double': field['default_value'] = 0.0 if field['type'] not in ['notneed', 'not_need', 'not_needed']: tower_name = str(tower_name).strip() self._add_to_tower(tower_name, field) self._feature_details[name] = field # check that tag features weights are one of the fields for name, config in self._feature_details.items(): if config['type'] == 'tags': if 'weights' in config and config[ 'weights'] not in self._feature_details: raise ValueError(config['weights'] + ' not in field names') def _write_train_eval_config(self, fout): fout.write('train_input_path: "%s"\n' % self._train_input_path) fout.write('eval_input_path: "%s"\n' % self._eval_input_path) fout.write(""" model_dir: "%s" train_config { log_step_count_steps: 200 # fine_tune_checkpoint: "" optimizer_config: { adam_optimizer: { learning_rate: { exponential_decay_learning_rate { initial_learning_rate: 0.0001 decay_steps: 10000 decay_factor: 0.5 min_learning_rate: 0.0000001 } } } } num_steps: 2000 sync_replicas: true } eval_config { metrics_set: { auc {} } }""" % self._model_dir) def _write_deepfm_config(self, fout): # write model_config fout.write('model_config:{\n') fout.write(' model_class: "DeepFM"\n') # write feature group configs tower_names = list(self._tower_dicts.keys()) tower_names.sort() for tower_name in tower_names: fout.write(' feature_groups: {\n') fout.write(' group_name: "%s"\n' % tower_name) curr_feas = self._tower_dicts[tower_name] for fea in curr_feas: if fea['type'] == 'weights': continue fout.write(' feature_names: "%s"\n' % fea['name']) fout.write(' wide_deep:%s\n' % tower_name.upper()) fout.write(' }\n') # write deepfm configs fout.write(""" deepfm { dnn { hidden_units: [128, 64, 32] } final_dnn { hidden_units: [128, 64] } wide_output_dim: 16 l2_regularization: 1e-5 } embedding_regularization: 1e-5 } """) def _write_multi_tower_config(self, fout): # write model_config fout.write('model_config:{\n') fout.write(' model_class: "MultiTower"\n') # write each tower features tower_names = list(self._tower_dicts.keys()) tower_names.sort() for tower_name in tower_names: fout.write(' feature_groups: {\n') fout.write(' group_name: "%s"\n' % tower_name) curr_feas = self._tower_dicts[tower_name] for fea in curr_feas: if fea['type'] == 'weights': continue fout.write(' feature_names: "%s"\n' % fea['name']) fout.write(' wide_deep:DEEP\n') fout.write(' }\n') # write each tower dnn configs fout.write('multi_tower { \n') for tower_name in tower_names: fout.write(""" towers { input: "%s" dnn { hidden_units: [256, 192, 128] } }""" % tower_name) fout.write(""" final_dnn { hidden_units: [192, 128, 64] } l2_regularization: 1e-5 } embedding_regularization: 1e-5 }""") def _write_data_config(self, fout): fout.write('data_config {\n') fout.write(' separator: "%s"\n' % self._column_separator) for name in self._feature_names: fout.write(' input_fields: {\n') fout.write(' input_name: "%s"\n' % name) fout.write(' input_type: %s\n' % self._get_type_name(self._feature_details[name]['data_type'])) if 'default_value' in self._feature_details[name]: fout.write(' default_val:"%s"\n' % self._feature_details[name]['default_value']) fout.write(' }\n') fout.write(' label_fields: "%s"\n' % self._label) fout.write(""" batch_size: 1024 prefetch_size: 32 input_type: CSVInput }""") def _write_feature_config(self, fout): for name in self._feature_names: feature = self._feature_details[name] if feature['type'] in ['weights', 'notneed', 'label']: continue if name == self._label: continue fout.write('feature_configs: {\n') fout.write(' input_names: "%s"\n' % name) if feature['type'] == 'category': fout.write(' feature_type: IdFeature\n') fout.write(' embedding_dim: %d\n' % feature['embedding_dim']) fout.write(' hash_bucket_size: %d\n' % feature['hash_bucket_size']) if 'embedding_name' in feature: fout.write(' embedding_name: "%s"\n' % feature['embedding_name']) elif feature['type'] == 'dense': fout.write(' feature_type: RawFeature\n') if self._model_type == 'deepfm': assert feature[ 'boundaries'] != '', 'raw features must be discretized by specifying boundaries' if 'boundaries' in feature and feature['boundaries'] != '': fout.write(' boundaries: [%s]\n' % str(feature['boundaries']).strip()) fout.write(' embedding_dim: %d\n' % int(feature['embedding_dim'])) elif feature['type'] == 'tags': if 'weights' in feature: fout.write(' input_names: "%s"\n' % feature['weights']) fout.write(' feature_type: TagFeature\n') fout.write(' hash_bucket_size: %d\n' % feature['hash_bucket_size']) fout.write(' embedding_dim: %d\n' % feature['embedding_dim']) if 'embedding_name' in feature: fout.write(' embedding_name: "%s"\n' % feature['embedding_name']) fout.write(' separator: "%s"\n' % self._incol_separator) elif feature['type'] == 'indexes': fout.write(' feature_type: TagFeature\n') assert 'hash_bucket_size' in feature fout.write(' num_buckets: %d\n' % feature['hash_bucket_size']) if 'embedding_dim' in feature: fout.write(' embedding_dim: %d\n' % feature['embedding_dim']) if 'embedding_name' in feature: fout.write(' embedding_name: "%s"\n' % feature['embedding_name']) fout.write(' separator: "%s"\n' % self._incol_separator) else: assert False, 'invalid feature types: %s' % feature['type'] fout.write('}\n') def convert(self): self._parse_features() logging.info( 'TOWERS[%d]: %s' % (len(self._tower_dicts), ','.join(list(self._tower_dicts.keys())))) with open(self._output_path, 'w') as fout: self._write_train_eval_config(fout) self._write_data_config(fout) self._write_feature_config(fout) if self._model_type == 'deepfm': self._write_deepfm_config(fout) elif self._model_type == 'multi_tower': self._write_multi_tower_config(fout) else: logging.warning( 'the model_config could not be generated automatically, you have to write the model_config manually.' ) # reformat the config pipeline_config = config_util.get_configs_from_pipeline_file( self._output_path) config_util.save_message(pipeline_config, self._output_path) model_types = ['deepfm', 'multi_tower'] if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument( '--model_type', type=str, choices=model_types, help='model type, currently support: %s' % ','.join(model_types)) parser.add_argument('--excel_path', type=str, help='excel config path') parser.add_argument('--output_path', type=str, help='generated config path') parser.add_argument( '--column_separator', type=str, default=',', help='column separator, separator betwen features') parser.add_argument( '--incol_separator', type=str, default='|', help='separator within features, such as tag features') parser.add_argument( '--train_input_path', type=str, default='', help='train input path') parser.add_argument( '--eval_input_path', type=str, default='', help='eval input path') parser.add_argument('--model_dir', type=str, default='', help='model dir') args = parser.parse_args() if not args.excel_path or not args.output_path: parser.print_usage() sys.exit(1) logging.info('column_separator = %s in_column_separator = %s' % (args.column_separator, args.incol_separator)) converter = ModelConfigConverter(args.excel_path, args.output_path, args.model_type, args.column_separator, args.incol_separator, args.train_input_path, args.eval_input_path, args.model_dir) converter.convert() logging.info('Conversion done') logging.info('Tips:') if args.train_input_path == '' or args.eval_input_path == '': logging.info('*.you have to update train_input_path, eval_input_path') logging.info('*.you may need to adjust dnn config or final_dnn config')