def load_data_proc()

in easy_rec/python/input/load_parquet.py [0:0]


def load_data_proc(proc_id, file_que, data_que, proc_start_que, proc_stop_que,
                   batch_size, label_fields, sparse_fea_names, dense_fea_names,
                   dense_fea_cfgs, reserve_fields, drop_remainder, task_index,
                   task_num, need_pack):
  logging.info('data proc %d start, proc_start_que=%s' %
               (proc_id, proc_start_que.qsize()))
  proc_start_que.get()
  effective_fields = sparse_fea_names + dense_fea_names
  all_fields = effective_fields
  if label_fields is not None:
    all_fields = all_fields + label_fields
  if reserve_fields is not None:
    for tmp in reserve_fields:
      if tmp not in all_fields:
        all_fields.append(tmp)
  logging.info('data proc %d start, file_que.qsize=%d' %
               (proc_id, file_que.qsize()))
  num_files = 0
  part_data_dict = {}

  is_good = True
  total_batch_cnt = 0
  total_sample_cnt = 0
  while is_good:
    if _should_stop(proc_stop_que):
      is_good = False
      break
    input_file = _get_one_file(file_que, proc_stop_que)
    if input_file is None:
      break
    num_files += 1
    input_data = pd.read_parquet(input_file, columns=all_fields)
    data_len = len(input_data[all_fields[0]])
    total_sample_cnt += data_len
    batch_num = int(data_len / batch_size)
    res_num = data_len % batch_size

    sid = 0
    for batch_id in range(batch_num):
      eid = sid + batch_size
      data_dict = {}

      if label_fields is not None and len(label_fields) > 0:
        _load_dense(input_data, label_fields, sid, eid, data_dict)

      if reserve_fields is not None and len(reserve_fields) > 0:
        data_dict['reserve'] = {}
        _load_dense(input_data, reserve_fields, sid, eid, data_dict['reserve'])

      if len(sparse_fea_names) > 0:
        for k in sparse_fea_names:
          val = input_data[k][sid:eid]
          if isinstance(input_data[k][sid], np.ndarray):
            all_lens = np.array([len(x) for x in val], dtype=np.int32)
            all_vals = np.concatenate(val.to_numpy())
          else:
            all_lens = np.ones([len(val)], dtype=np.int32)
            all_vals = val.to_numpy()
          assert np.sum(all_lens) == len(
              all_vals), 'len(all_vals)=%d np.sum(all_lens)=%d' % (
                  len(all_vals), np.sum(all_lens))
          data_dict[k] = (all_lens, all_vals)

      if len(dense_fea_names) > 0:
        _load_dense(input_data, dense_fea_names, sid, eid, data_dict)

      if need_pack:
        if len(sparse_fea_names) > 0:
          _pack_sparse_feas(data_dict, sparse_fea_names)
        if len(dense_fea_names) > 0:
          _pack_dense_feas(data_dict, dense_fea_names, dense_fea_cfgs)
      else:
        if len(dense_fea_names) > 0:
          _reshape_dense_feas(data_dict, dense_fea_names, dense_fea_cfgs)
      # logging.info('task_index=%d sid=%d eid=%d total_len=%d' % (task_index, sid, eid,
      #      len(data_dict['sparse_fea'][1])))
      if not _add_to_que(data_dict, data_que, proc_stop_que):
        logging.info('add to que failed')
        is_good = False
        break
      total_batch_cnt += 1
      sid += batch_size

    if res_num > 0 and is_good:
      data_dict = {}
      part_data_dict_n = {}

      if label_fields is not None and len(label_fields) > 0:
        _load_and_pad_dense(input_data, label_fields, sid, data_dict,
                            part_data_dict, part_data_dict_n, batch_size)

      if reserve_fields is not None and len(reserve_fields) > 0:
        data_dict['reserve'] = {}
        part_data_dict_n['reserve'] = {}
        _load_and_pad_dense(input_data, label_fields, sid, data_dict['reserve'],
                            part_data_dict['reserve'],
                            part_data_dict_n['reserve'], batch_size)

      if len(dense_fea_names) > 0:
        _load_and_pad_dense(input_data, dense_fea_names, sid, data_dict,
                            part_data_dict, part_data_dict_n, batch_size)

      if len(sparse_fea_names) > 0:
        for k in sparse_fea_names:
          val = input_data[k][sid:]

          if isinstance(input_data[k][sid], np.ndarray):
            all_lens = np.array([len(x) for x in val], dtype=np.int32)
            all_vals = np.concatenate(val.to_numpy())
          else:
            all_lens = np.ones([len(val)], dtype=np.int32)
            all_vals = val.to_numpy()

          if part_data_dict is not None and k in part_data_dict:
            tmp_lens = np.concatenate([part_data_dict[k][0], all_lens], axis=0)
            tmp_vals = np.concatenate([part_data_dict[k][1], all_vals], axis=0)
            if len(tmp_lens) > batch_size:
              tmp_res_lens = tmp_lens[batch_size:]
              tmp_lens = tmp_lens[:batch_size]
              tmp_num_elems = np.sum(tmp_lens)
              tmp_res_vals = tmp_vals[tmp_num_elems:]
              tmp_vals = tmp_vals[:tmp_num_elems]
              part_data_dict_n[k] = (tmp_res_lens, tmp_res_vals)
              data_dict[k] = (tmp_lens, tmp_vals)
            elif len(tmp_lens) == batch_size:
              data_dict[k] = (tmp_lens, tmp_vals)
            else:
              part_data_dict_n[k] = (tmp_lens, tmp_vals)
          else:
            part_data_dict_n[k] = (all_lens, all_vals)

      if effective_fields[0] in data_dict:
        if need_pack:
          if len(sparse_fea_names) > 0:
            _pack_sparse_feas(data_dict, sparse_fea_names)
          if len(dense_fea_names) > 0:
            _pack_dense_feas(data_dict, dense_fea_names, dense_fea_cfgs)
        else:
          if len(dense_fea_names) > 0:
            _reshape_dense_feas(data_dict, dense_fea_names, dense_fea_cfgs)
        if not _add_to_que(data_dict, data_que, proc_stop_que):
          logging.info('add to que failed')
          is_good = False
          break
        total_batch_cnt += 1
      part_data_dict = part_data_dict_n
  if len(part_data_dict) > 0 and is_good:
    batch_len = len(part_data_dict[effective_fields[0]][0])
    if not drop_remainder:
      if need_pack:
        if len(sparse_fea_names) > 0:
          _pack_sparse_feas(part_data_dict, sparse_fea_names)
        if len(dense_fea_names) > 0:
          _pack_dense_feas(part_data_dict, dense_fea_names, dense_fea_cfgs)
      else:
        if len(dense_fea_names) > 0:
          _reshape_dense_feas(part_data_dict, dense_fea_names, dense_fea_cfgs)
      logging.info('remainder batch: %s sample_num=%d' %
                   (','.join(part_data_dict.keys()), batch_len))
      _add_to_que(part_data_dict, data_que, proc_stop_que)
      total_batch_cnt += 1
    else:
      logging.warning('drop remain %d samples as drop_remainder is set' %
                      batch_len)
  if is_good:
    is_good = _add_to_que(None, data_que, proc_stop_que)
  logging.info(
      'data_proc_id[%d]: is_good = %s, total_batch_cnt=%d, total_sample_cnt=%d'
      % (proc_id, is_good, total_batch_cnt, total_sample_cnt))
  data_que.close(wait_send_finish=is_good)

  while not is_good:
    try:
      if file_que.get(timeout=1) is None:
        break
    except queue.Empty:
      pass
  file_que.close()
  logging.info('data proc %d done, file_num=%d' % (proc_id, num_files))