easy_rec/python/inference/parquet_predictor.py [42:108]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                                           fg_json_path)
    self._output_sep = output_sep
    self._ds_vector_recall = ds_vector_recall
    input_type = DatasetConfig.InputType.Name(data_config.input_type).lower()
    self.pipeline_config = pipeline_config

    if 'rtp' in input_type:
      self._is_rtp = True
      self._input_sep = data_config.rtp_separator
    else:
      self._is_rtp = False
      self._input_sep = data_config.separator

    if selected_cols and not ds_vector_recall:
      self._selected_cols = [int(x) for x in selected_cols.split(',')]
    elif ds_vector_recall:
      self._selected_cols = selected_cols.split(',')
    else:
      self._selected_cols = None

  def _parse_line(self, line):
    out_dict = {}
    for key in line['feature']:
      out_dict[key] = line['feature'][key]
    if 'reserve' in line:
      out_dict['reserve'] = line['reserve']
    #   for key in line['reserve']:
    #     out_dict[key] = line['reserve'][key]
    return out_dict

  def _get_reserved_cols(self, reserved_cols):
    # already parsed in _get_dataset
    return self._reserved_cols

  def _get_dataset(self, input_path, num_parallel_calls, batch_size, slice_num,
                   slice_id):
    feature_configs = config_util.get_compatible_feature_configs(
        self.pipeline_config)

    kwargs = {}
    if self._reserved_args is not None and len(self._reserved_args) > 0:
      if self._reserved_args == 'ALL_COLUMNS':
        parquet_file = gfile.Glob(input_path.split(',')[0])[0]
        # gfile not supported, read_parquet requires random access
        all_data = pd.read_parquet(parquet_file)
        all_cols = list(all_data.columns)
        kwargs['reserve_fields'] = all_cols
        self._all_fields = all_cols
        self._reserved_cols = all_cols
        kwargs['reserve_types'] = input_utils.get_tf_type_from_parquet_file(
            all_cols, parquet_file)
      else:
        self._reserved_cols = [
            x.strip() for x in self._reserved_args.split(',') if x.strip() != ''
        ]
        kwargs['reserve_fields'] = self._reserved_cols
        parquet_file = gfile.Glob(input_path.split(',')[0])[0]
        kwargs['reserve_types'] = input_utils.get_tf_type_from_parquet_file(
            self._reserved_cols, parquet_file)
      logging.info('reserve_fields=%s reserve_types=%s' %
                   (','.join(self._reserved_cols), ','.join(
                       [str(x) for x in kwargs['reserve_types']])))
    else:
      self._reserved_cols = []
    self.pipeline_config.data_config.batch_size = batch_size

    kwargs['is_predictor'] = True
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



easy_rec/python/inference/parquet_predictor_v2.py [42:108]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                                             fg_json_path)
    self._output_sep = output_sep
    self._ds_vector_recall = ds_vector_recall
    input_type = DatasetConfig.InputType.Name(data_config.input_type).lower()
    self.pipeline_config = pipeline_config

    if 'rtp' in input_type:
      self._is_rtp = True
      self._input_sep = data_config.rtp_separator
    else:
      self._is_rtp = False
      self._input_sep = data_config.separator

    if selected_cols and not ds_vector_recall:
      self._selected_cols = [int(x) for x in selected_cols.split(',')]
    elif ds_vector_recall:
      self._selected_cols = selected_cols.split(',')
    else:
      self._selected_cols = None

  def _parse_line(self, line):
    out_dict = {}
    for key in line['feature']:
      out_dict[key] = line['feature'][key]
    if 'reserve' in line:
      out_dict['reserve'] = line['reserve']
    #   for key in line['reserve']:
    #     out_dict[key] = line['reserve'][key]
    return out_dict

  def _get_reserved_cols(self, reserved_cols):
    # already parsed in _get_dataset
    return self._reserved_cols

  def _get_dataset(self, input_path, num_parallel_calls, batch_size, slice_num,
                   slice_id):
    feature_configs = config_util.get_compatible_feature_configs(
        self.pipeline_config)

    kwargs = {}
    if self._reserved_args is not None and len(self._reserved_args) > 0:
      if self._reserved_args == 'ALL_COLUMNS':
        parquet_file = gfile.Glob(input_path.split(',')[0])[0]
        # gfile not supported, read_parquet requires random access
        all_data = pd.read_parquet(parquet_file)
        all_cols = list(all_data.columns)
        kwargs['reserve_fields'] = all_cols
        self._all_fields = all_cols
        self._reserved_cols = all_cols
        kwargs['reserve_types'] = input_utils.get_tf_type_from_parquet_file(
            all_cols, parquet_file)
      else:
        self._reserved_cols = [
            x.strip() for x in self._reserved_args.split(',') if x.strip() != ''
        ]
        kwargs['reserve_fields'] = self._reserved_cols
        parquet_file = gfile.Glob(input_path.split(',')[0])[0]
        kwargs['reserve_types'] = input_utils.get_tf_type_from_parquet_file(
            self._reserved_cols, parquet_file)
      logging.info('reserve_fields=%s reserve_types=%s' %
                   (','.join(self._reserved_cols), ','.join(
                       [str(x) for x in kwargs['reserve_types']])))
    else:
      self._reserved_cols = []
    self.pipeline_config.data_config.batch_size = batch_size

    kwargs['is_predictor'] = True
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



