def convert_rtp_fg()

in easy_rec/python/utils/convert_rtp_fg.py [0:0]


def convert_rtp_fg(rtp_fg,
                   embedding_dim=16,
                   batch_size=1024,
                   label_fields=[],
                   num_steps=10,
                   model_type='',
                   separator='\002',
                   incol_separator='\003',
                   train_input_path=None,
                   eval_input_path=None,
                   selected_cols='',
                   input_type='OdpsRTPInput',
                   is_async=False):
  with tf.gfile.GFile(rtp_fg, 'r') as fin:
    rtp_fg = json.load(fin)

  model_dir = rtp_fg.get('model_dir', 'experiments/rtp_fg_demo')
  num_steps = rtp_fg.get('num_steps', num_steps)
  model_type = rtp_fg.get('model_type', model_type)
  label_fields = rtp_fg.get('label_fields', label_fields)
  model_path = rtp_fg.get('model_path', '')
  edit_config_json = rtp_fg.get('edit_config_json', None)
  rtp_features = rtp_fg['features']

  logging.info('model_dir = %s' % model_dir)
  logging.info('num_steps = %d' % num_steps)
  logging.info('model_type = %s' % model_type)
  logging.info('model_path = %s' % model_path)
  logging.info('edit_config_json = %s' % edit_config_json)

  pipeline_config = load_input_field_and_feature_config(rtp_fg, label_fields,
                                                        embedding_dim,
                                                        incol_separator)
  pipeline_config.model_dir = model_dir
  pipeline_config.data_config.separator = separator
  if selected_cols:
    pipeline_config.data_config.selected_cols = selected_cols
  if train_input_path is not None:
    pipeline_config.train_input_path = train_input_path
  if eval_input_path is not None:
    pipeline_config.eval_input_path = eval_input_path

  pipeline_config.data_config.batch_size = batch_size
  pipeline_config.data_config.rtp_separator = ';'
  pipeline_config.data_config.label_fields.extend(label_fields)

  text_format.Merge('input_type: %s' % input_type, pipeline_config.data_config)

  if model_path:
    model_type = None
    with tf.gfile.GFile(model_path, 'r') as fin:
      model_config = fin.read()
      text_format.Merge(model_config, pipeline_config)

  if not pipeline_config.HasField('train_config'):
    train_config_str = """
    train_config {
      log_step_count_steps: 200
      optimizer_config: {
        %s: {
          learning_rate: {
            exponential_decay_learning_rate {
              initial_learning_rate: 0.0001
              decay_steps: 100000
              decay_factor: 0.5
              min_learning_rate: 0.0000001
            }
          }
        }
        use_moving_average: false
      }

      sync_replicas: %s
    }
    """ % ('adam_optimizer' if not is_async else 'adam_async_optimizer',
           'true' if not is_async else 'false')
    text_format.Merge(train_config_str, pipeline_config)

  pipeline_config.train_config.num_steps = num_steps

  if model_type == 'deepfm':
    pipeline_config.model_config.model_class = 'DeepFM'
    wide_group = FeatureGroupConfig()
    wide_group.group_name = 'wide'
    wide_group.wide_deep = WideOrDeep.WIDE
    for feature in rtp_features:
      feature_name = feature['feature_name']
      wide_group.feature_names.append(feature_name)
    pipeline_config.model_config.feature_groups.append(wide_group)
    deep_group = FeatureGroupConfig()
    deep_group.CopyFrom(wide_group)
    deep_group.group_name = 'deep'
    deep_group.wide_deep = WideOrDeep.DEEP
    pipeline_config.model_config.feature_groups.append(deep_group)
    deepfm_config_str = """
    deepfm {
      dnn {
        hidden_units: [128, 64, 32]
      }
      final_dnn {
        hidden_units: [128, 64]
      }
      wide_output_dim: 32
      l2_regularization: 1e-5
    }
    """
    text_format.Merge(deepfm_config_str, pipeline_config.model_config)
    pipeline_config.model_config.embedding_regularization = 1e-5
  elif model_type == 'wide_and_deep':
    pipeline_config.model_config.model_class = 'WideAndDeep'
    wide_group = FeatureGroupConfig()
    wide_group.group_name = 'wide'
    wide_group.wide_deep = WideOrDeep.WIDE
    for feature in rtp_features:
      feature_name = feature['feature_name']
      group = feature.get('group', 'wide_and_deep')
      if group not in ['wide', 'deep', 'wide_and_deep']:
        logging.warning('invalid group %s for %s' % (group, feature_name))
        group = 'wide_and_deep'
      if group in ['wide', 'wide_and_deep']:
        wide_group.feature_names.append(feature_name)
    pipeline_config.model_config.feature_groups.append(wide_group)
    deep_group = FeatureGroupConfig()
    deep_group.group_name = 'deep'
    deep_group.wide_deep = WideOrDeep.DEEP
    for feature in rtp_features:
      feature_name = feature['feature_name']
      group = feature.get('group', 'wide_and_deep')
      if group not in ['wide', 'deep', 'wide_and_deep']:
        group = 'wide_and_deep'
      if group in ['deep', 'wide_and_deep']:
        deep_group.feature_names.append(feature_name)
    pipeline_config.model_config.feature_groups.append(deep_group)
    deepfm_config_str = """
    wide_and_deep {
      dnn {
        hidden_units: [128, 64, 32]
      }
      l2_regularization: 1e-5
    }
    """
    text_format.Merge(deepfm_config_str, pipeline_config.model_config)
    pipeline_config.model_config.embedding_regularization = 1e-5
  elif model_type == 'multi_tower':
    pipeline_config.model_config.model_class = 'MultiTower'

    feature_groups = {}
    group_map = {
        'u': 'user',
        'i': 'item',
        'ctx': 'combo',
        'q': 'combo',
        'comb': 'combo'
    }
    for feature in rtp_features:
      feature_name = feature['feature_name'].strip()
      group_name = ''
      if 'group' in feature:
        group_name = feature['group']
      else:
        toks = feature_name.split('_')
        group_name = toks[0]
        if group_name in group_map:
          group_name = group_map[group_name]
      if group_name in feature_groups:
        feature_groups[group_name].append(feature_name)
      else:
        feature_groups[group_name] = [feature_name]

    logging.info(
        'if group is specified, group will be used as feature group name; '
        'otherwise, the prefix of feature_name in fg.json is used as feature group name'
    )
    logging.info('prefix map: %s' % str(group_map))
    for group_name in feature_groups:
      logging.info('add group = %s' % group_name)
      group = FeatureGroupConfig()
      group.group_name = group_name
      for fea_name in feature_groups[group_name]:
        group.feature_names.append(fea_name)
      group.wide_deep = WideOrDeep.DEEP
      pipeline_config.model_config.feature_groups.append(group)

    multi_tower_config_str = '  multi_tower {\n'
    for group_name in feature_groups:
      multi_tower_config_str += """
      towers {
        input: "%s"
        dnn {
          hidden_units: [256, 192, 128]
        }
      }
      """ % group_name

    multi_tower_config_str = multi_tower_config_str + """
      final_dnn {
        hidden_units: [192, 128, 64]
      }
      l2_regularization: 1e-4
    }
    """
    text_format.Merge(multi_tower_config_str, pipeline_config.model_config)
    pipeline_config.model_config.embedding_regularization = 1e-5

  elif model_type == 'esmm':
    pipeline_config.model_config.model_class = 'ESMM'

    feature_groups = {}
    for feature in rtp_features:
      feature_name = feature['feature_name']
      group = feature.get('group', 'all')
      if group in feature_groups:
        feature_groups[group].append(feature_name)
      else:
        feature_groups[group] = [feature_name]

    for group_name in feature_groups:
      logging.info('add group = %s' % group_name)
      group = FeatureGroupConfig()
      group.group_name = group_name
      for fea_name in feature_groups[group_name]:
        group.feature_names.append(fea_name)
      group.wide_deep = WideOrDeep.DEEP
      pipeline_config.model_config.feature_groups.append(group)

    esmm_config_str = '  esmm {\n'
    for group_name in feature_groups:
      esmm_config_str += """
        groups {
          input: "%s"
          dnn {
            hidden_units: [256, 128, 96, 64]
          }
        }""" % group_name

    esmm_config_str += """
        ctr_tower {
          tower_name: "ctr"
          label_name: "%s"
          dnn {
            hidden_units: [128, 96, 64, 32, 16]
          }
          num_class: 1
          weight: 1.0
          loss_type: CLASSIFICATION
          metrics_set: {
           auc {}
          }
        }
        cvr_tower {
          tower_name: "cvr"
          label_name: "%s"
          dnn {
            hidden_units: [128, 96, 64, 32, 16]
          }
          num_class: 1
          weight: 1.0
          loss_type: CLASSIFICATION
          metrics_set: {
           auc {}
          }
        }
        l2_regularization: 1e-6
      }""" % (label_fields[0], label_fields[1])
    text_format.Merge(esmm_config_str, pipeline_config.model_config)
    pipeline_config.model_config.embedding_regularization = 5e-5
  elif model_type == 'dbmtl':
    pipeline_config.model_config.model_class = 'DBMTL'

    feature_groups = {}
    for feature in rtp_features:
      feature_name = feature['feature_name']
      group = 'all'
      if group in feature_groups:
        feature_groups[group].append(feature_name)
      else:
        feature_groups[group] = [feature_name]

    for group_name in feature_groups:
      logging.info('add group = %s' % group_name)
      group = FeatureGroupConfig()
      group.group_name = group_name
      for fea_name in feature_groups[group_name]:
        group.feature_names.append(fea_name)
      group.wide_deep = WideOrDeep.DEEP
      pipeline_config.model_config.feature_groups.append(group)

    dbmtl_config_str = """
      dbmtl {
        bottom_dnn {
          hidden_units: [1024]
        }
        expert_dnn {
          hidden_units: [256, 128, 64, 32]
        }
        num_expert: 8
        task_towers {
          tower_name: "ctr"
          label_name: "%s"
          loss_type: CLASSIFICATION
          metrics_set: {
            auc {}
          }
          dnn {
            hidden_units: [256, 128, 64, 32]
          }
          relation_dnn {
            hidden_units: [32]
          }
          weight: 1.0
        }
        task_towers {
          tower_name: "cvr"
          label_name: "%s"
          loss_type: CLASSIFICATION
          metrics_set: {
            auc {}
          }
          dnn {
            hidden_units: [256, 128, 64, 32]
          }
          relation_tower_names: ["ctr"]
          relation_dnn {
            hidden_units: [32]
          }
          weight: 1.0
        }
        l2_regularization: 1e-6
      }
    """ % (label_fields[0], label_fields[1])
    text_format.Merge(dbmtl_config_str, pipeline_config.model_config)
    pipeline_config.model_config.embedding_regularization = 5e-6

  if model_type in ['wide_and_deep', 'deepfm', 'multi_tower']:
    text_format.Merge("""