easy_rec/python/input/parquet_input.py (297 lines of code) (raw):

# -*- encoding:utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. import logging import multiprocessing import queue # import threading import time # import numpy as np # import pandas as pd import tensorflow as tf # from tensorflow.python.framework import ops from tensorflow.python.ops import array_ops # from tensorflow.python.ops import logging_ops # from tensorflow.python.ops import math_ops from tensorflow.python.platform import gfile from easy_rec.python.compat import queues from easy_rec.python.input import load_parquet from easy_rec.python.input.input import Input class ParquetInput(Input): def __init__(self, data_config, feature_config, input_path, task_index=0, task_num=1, check_mode=False, pipeline_config=None, **kwargs): super(ParquetInput, self).__init__(data_config, feature_config, input_path, task_index, task_num, check_mode, pipeline_config, **kwargs) self._need_pack = True if input_path is None: return self._input_files = [] for sub_path in input_path.strip().split(','): self._input_files.extend(gfile.Glob(sub_path)) logging.info('parquet input_path=%s file_num=%d' % (input_path, len(self._input_files))) mp_ctxt = multiprocessing.get_context('spawn') self._data_que = queues.Queue( name='data_que', ctx=mp_ctxt, maxsize=self._data_config.prefetch_size) file_num = len(self._input_files) logging.info('[task_index=%d] total_file_num=%d task_num=%d' % (task_index, file_num, task_num)) self._my_files = [] for file_id in range(file_num): if (file_id % task_num) == task_index: self._my_files.append(self._input_files[file_id]) # self._my_files = self._input_files logging.info('[task_index=%d] task_file_num=%d' % (task_index, len(self._my_files))) self._file_que = queues.Queue(name='file_que', ctx=mp_ctxt) self._num_proc = 8 if file_num < self._num_proc: self._num_proc = file_num self._proc_start = False self._proc_start_que = queues.Queue(name='proc_start_que', ctx=mp_ctxt) self._proc_stop = False self._proc_stop_que = queues.Queue(name='proc_stop_que', ctx=mp_ctxt) self._reserve_fields = None self._reserve_types = None if 'reserve_fields' in kwargs and 'reserve_types' in kwargs: self._reserve_fields = kwargs['reserve_fields'] self._reserve_types = kwargs['reserve_types'] # indicator whether is called from Predictor, do not go pass if 'is_predictor' in kwargs: self._is_predictor = kwargs['is_predictor'] else: self._is_predictor = False self._proc_arr = None self._sparse_fea_names = [] self._dense_fea_names = [] self._dense_fea_cfgs = [] self._total_dense_fea_dim = 0 for fc in self._feature_configs: feature_type = fc.feature_type if feature_type in [fc.IdFeature, fc.TagFeature]: input_name0 = fc.input_names[0] self._sparse_fea_names.append(input_name0) elif feature_type in [fc.RawFeature]: input_name0 = fc.input_names[0] self._dense_fea_names.append(input_name0) self._dense_fea_cfgs.append(fc) self._total_dense_fea_dim += fc.raw_input_dim else: assert False, 'feature_type[%s] not supported' % str(feature_type) def _rebuild_que(self): mp_ctxt = multiprocessing.get_context('spawn') self._data_que = queues.Queue( name='data_que', ctx=mp_ctxt, maxsize=self._data_config.prefetch_size) self._file_que = queues.Queue(name='file_que', ctx=mp_ctxt) self._proc_start_que = queues.Queue(name='proc_start_que', ctx=mp_ctxt) self._proc_stop_que = queues.Queue(name='proc_stop_que', ctx=mp_ctxt) def _sample_generator(self): if not self._proc_start: self._proc_start = True for proc in (self._proc_arr): self._proc_start_que.put(True) logging.info('task[%s] data_proc=%s is_alive=%s' % (self._task_index, proc, proc.is_alive())) done_proc_cnt = 0 fetch_timeout_cnt = 0 # # for mock purpose # all_samples = [] # while len(all_samples) < 64: # try: # sample = self._data_que.get(block=False) # all_samples.append(sample) # except queue.Empty: # continue # sid = 0 # while True: # yield all_samples[sid] # sid += 1 # if sid >= len(all_samples): # sid = 0 fetch_good_cnt = 0 while True: try: sample = self._data_que.get(timeout=1) if sample is None: done_proc_cnt += 1 else: fetch_good_cnt += 1 yield sample if fetch_good_cnt % 200 == 0: logging.info( 'task[%d] fetch_batch_cnt=%d, fetch_timeout_cnt=%d, qsize=%d' % (self._task_index, fetch_good_cnt, fetch_timeout_cnt, self._data_que.qsize())) except queue.Empty: fetch_timeout_cnt += 1 if done_proc_cnt >= len(self._proc_arr): logging.info('all sample finished, fetch_timeout_cnt=%d' % fetch_timeout_cnt) break except Exception as ex: logging.warning('task[%d] get from data_que exception: %s' % (self._task_index, str(ex))) break logging.info('task[%d] sample_generator: total_batches=%d' % (self._task_index, fetch_good_cnt)) def stop(self): if self._proc_arr is None or len(self._proc_arr) == 0: return logging.info('task[%d] will stop dataset procs, proc_num=%d' % (self._task_index, len(self._proc_arr))) self._file_que.close() if self._proc_start: logging.info('try close data que') for _ in range(len(self._proc_arr)): self._proc_stop_que.put(1) self._proc_stop_que.close() def _any_alive(): for proc in self._proc_arr: if proc.is_alive(): return True return False # to ensure the sender part of the python Queue could exit while _any_alive(): try: self._data_que.get(timeout=1) except Exception: pass time.sleep(1) self._data_que.close() logging.info('data que closed') # import time # time.sleep(10) for proc in self._proc_arr: # proc.terminate() proc.join() logging.info('join proc done') # rebuild for next run, which is necessary for evaluation self._rebuild_que() self._proc_arr = None self._proc_start = False self._proc_stop = False def _to_fea_dict(self, input_dict): fea_dict = {} if len(self._sparse_fea_names) > 0: if self._has_ev: tmp_vals, tmp_lens = input_dict['sparse_fea'][1], input_dict[ 'sparse_fea'][0] fea_dict['sparse_fea'] = (tmp_vals, tmp_lens) else: tmp_vals, tmp_lens = input_dict['sparse_fea'][1], input_dict[ 'sparse_fea'][0] num_buckets = -1 for fc in self._feature_configs: if fc.num_buckets > 0: if num_buckets < 0: num_buckets = fc.num_buckets else: assert num_buckets == fc.num_buckets, 'all features must share the same buckets, but are %d and %s' % ( num_buckets, str(fc)) fea_dict['sparse_fea'] = (tmp_vals % num_buckets, tmp_lens) if len(self._dense_fea_names) > 0: fea_dict['dense_fea'] = input_dict['dense_fea'] output_dict = {'feature': fea_dict} lbl_dict = {} for lbl_name in self._label_fields: if lbl_name in input_dict: lbl_dict[lbl_name] = input_dict[lbl_name] if len(lbl_dict) > 0: output_dict['label'] = lbl_dict if self._reserve_fields is not None: output_dict['reserve'] = input_dict['reserve'] return output_dict def add_fea_type_and_shape(self, out_types, out_shapes): # all features are packed into one tuple sparse_fea # first field: field lengths # second field: field values if len(self._sparse_fea_names) > 0: out_types['sparse_fea'] = (tf.int32, tf.int64) out_shapes['sparse_fea'] = (tf.TensorShape([None]), tf.TensorShape([None ])) if len(self._dense_fea_names) > 0: out_types['dense_fea'] = tf.float32 out_shapes['dense_fea'] = tf.TensorShape( [None, self._total_dense_fea_dim]) def _build(self, mode, params): if mode == tf.estimator.ModeKeys.TRAIN and self._data_config.num_epochs > 1: logging.info('will repeat train data for %d epochs' % self._data_config.num_epochs) my_files = self._my_files * self._data_config.num_epochs else: my_files = self._my_files if mode == tf.estimator.ModeKeys.TRAIN: drop_remainder = self._data_config.drop_remainder lbl_fields = self._label_fields else: lbl_fields = self._label_fields if mode == tf.estimator.ModeKeys.PREDICT: lbl_fields = None drop_remainder = False self._proc_arr = load_parquet.start_data_proc( self._task_index, self._task_num, self._num_proc, self._file_que, self._data_que, self._proc_start_que, self._proc_stop_que, self._batch_size, lbl_fields, # self._effective_fields, self._sparse_fea_names, self._dense_fea_names, self._dense_fea_cfgs, self._reserve_fields, drop_remainder, need_pack=self._need_pack) for input_file in my_files: self._file_que.put(input_file) # add end signal for proc in self._proc_arr: self._file_que.put(None) logging.info('add input_files to file_que, qsize=%d' % self._file_que.qsize()) out_types = {} out_shapes = {} if mode != tf.estimator.ModeKeys.PREDICT: for k in self._label_fields: out_types[k] = tf.int32 out_shapes[k] = tf.TensorShape([None]) if self._reserve_fields is not None: out_types['reserve'] = {} out_shapes['reserve'] = {} for k, t in zip(self._reserve_fields, self._reserve_types): out_types['reserve'][k] = t out_shapes['reserve'][k] = tf.TensorShape([None]) self.add_fea_type_and_shape(out_types, out_shapes) dataset = tf.data.Dataset.from_generator( self._sample_generator, output_types=out_types, output_shapes=out_shapes) num_parallel_calls = self._data_config.num_parallel_calls dataset = dataset.map( self._to_fea_dict, num_parallel_calls=num_parallel_calls) dataset = dataset.prefetch(buffer_size=self._prefetch_size) # Note: Input._preprocess is currently not supported as all features # are concatenated together # dataset = dataset.map( # map_func=self._preprocess, num_parallel_calls=num_parallel_calls) if mode != tf.estimator.ModeKeys.PREDICT: dataset = dataset.map(lambda x: (self._get_features(x), self._get_labels(x))) # initial test show that prefetch to gpu has no performance gain # dataset = dataset.apply(tf.data.experimental.prefetch_to_device('/gpu:0')) else: if self._is_predictor: dataset = dataset.map(self._get_for_predictor) else: dataset = dataset.map(lambda x: self._get_features(x)) dataset = dataset.prefetch(buffer_size=self._prefetch_size) return dataset def _get_for_predictor(self, fea_dict): out_dict = { 'feature': { 'ragged_ids': fea_dict['feature']['sparse_fea'][0], 'ragged_lens': fea_dict['feature']['sparse_fea'][1] } } if self._is_predictor and self._reserve_fields is not None: out_dict['reserve'] = fea_dict['reserve'] return out_dict def create_input(self, export_config=None): def _input_fn(mode=None, params=None, config=None): """Build input_fn for estimator. Args: mode: tf.estimator.ModeKeys.(TRAIN, EVAL, PREDICT) params: `dict` of hyper parameters, from Estimator config: tf.estimator.RunConfig instance Return: if mode is not None, return: features: inputs to the model. labels: groundtruth else, return: tf.estimator.export.ServingInputReceiver instance """ if mode in (tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT): # build dataset from self._config.input_path self._mode = mode dataset = self._build(mode, params) return dataset elif mode is None: # serving_input_receiver_fn for export SavedModel inputs, features = {}, {} if len(self._sparse_fea_names) > 0: ragged_ids = array_ops.placeholder( tf.int64, [None], name='ragged_ids') ragged_lens = array_ops.placeholder( tf.int32, [None], name='ragged_lens') inputs = {'ragged_ids': ragged_ids, 'ragged_lens': ragged_lens} if self._has_ev: features = {'ragged_ids': ragged_ids, 'ragged_lens': ragged_lens} else: features = { 'ragged_ids': ragged_ids % self._feature_configs[0].num_buckets, 'ragged_lens': ragged_lens } if len(self._dense_fea_names) > 0: inputs['dense_fea'] = array_ops.placeholder( tf.float32, [None, self._total_dense_fea_dim], name='dense_fea') features['dense_fea'] = inputs['dense_fea'] return tf.estimator.export.ServingInputReceiver(features, inputs) _input_fn.input_creator = self return _input_fn