Status Read()

in tensorflow_io/core/kernels/parquet_kernels.cc [123:283]


  Status Read(const string& component,
              const absl::InlinedVector<int64, 4>& start,
              const TensorShape& shape,
              std::function<Status(const TensorShape& shape, Tensor** value)>
                  allocate_func) {
    mutex_lock l(mu_);

    if (columns_index_.find(component) == columns_index_.end()) {
      return errors::InvalidArgument("component ", component, " is invalid");
    }
    const int64 column_index = columns_index_[component];

    Tensor* value;
    TF_RETURN_IF_ERROR(allocate_func(shape, &value));

    const string& column = component;
    int64 element_start = start[0];
    int64 element_stop = start[0] + shape.dim_size(0);

    int64 row_group_offset = 0;
    for (int row_group = 0; row_group < parquet_metadata_->num_row_groups();
         row_group++) {
      std::shared_ptr<parquet::RowGroupReader> row_group_reader =
          parquet_reader_->RowGroup(row_group);
      // Skip if row group is not within [start..stop]
      if ((row_group_offset + row_group_reader->metadata()->num_rows() <
           element_start) ||
          (element_stop <= row_group_offset)) {
        row_group_offset += row_group_reader->metadata()->num_rows();
        continue;
      }
      // Find row_to_read range
      int64 row_to_read_start =
          row_group_offset > element_start ? row_group_offset : element_start;
      int64 row_to_read_final =
          (row_group_offset + row_group_reader->metadata()->num_rows()) <
                  (element_stop)
              ? (row_group_offset + row_group_reader->metadata()->num_rows())
              : (element_stop);
      int64 row_to_read_count = row_to_read_final - row_to_read_start;

      // TODO: parquet is RowGroup based so ideally the RowGroup should be
      // cached with the hope of indexing and slicing happens on each row. For
      // now no caching is done yet.
      std::shared_ptr<parquet::ColumnReader> column_reader =
          row_group_reader->Column(column_index);

      // buffer to fill location is value.data()[row_to_read_start - start]
      // Note: ReadBatch may not be able to read the elements requested
      // (row_to_read_count) in one shot, as such we use while loop of
      // `while (row_left > 0) {...}` to read until complete.

#define PARQUET_PROCESS_TYPE(ptype, type)                                     \
  {                                                                           \
    parquet::TypedColumnReader<ptype>* reader =                               \
        static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
    if (row_to_read_start > row_group_offset) {                               \
      reader->Skip(row_to_read_start - row_group_offset);                     \
    }                                                                         \
    ptype::c_type* value_p = (ptype::c_type*)(void*)(&(                       \
        value->flat<type>().data()[row_to_read_start - element_start]));      \
    int64_t row_left = row_to_read_count;                                     \
    while (row_left > 0) {                                                    \
      int64_t values_read;                                                    \
      int64_t levels_read = reader->ReadBatch(                                \
          row_left, nullptr, nullptr, &value_p[row_to_read_count - row_left], \
          &values_read);                                                      \
      if (!(levels_read == values_read && levels_read > 0)) {                 \
        return errors::InvalidArgument("null value in column: ", column);     \
      }                                                                       \
      row_left -= levels_read;                                                \
    }                                                                         \
  }

#define PARQUET_PROCESS_BYTE_ARRAY(ptype)                                     \
  {                                                                           \
    parquet::TypedColumnReader<ptype>* reader =                               \
        static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
    if (row_to_read_start > row_group_offset) {                               \
      reader->Skip(row_to_read_start - row_group_offset);                     \
    }                                                                         \
    std::unique_ptr<ptype::c_type[]> value_p(                                 \
        new ptype::c_type[row_to_read_count]);                                \
    int64_t row_left = row_to_read_count;                                     \
    while (row_left > 0) {                                                    \
      int64_t values_read;                                                    \
      int64_t levels_read = reader->ReadBatch(                                \
          row_left, nullptr, nullptr,                                         \
          &value_p.get()[row_to_read_count - row_left], &values_read);        \
      if (!(levels_read == values_read && levels_read > 0)) {                 \
        return errors::InvalidArgument("null value in column: ", column);     \
      }                                                                       \
      row_left -= levels_read;                                                \
    }                                                                         \
    for (int64_t index = 0; index < row_to_read_count; index++) {             \
      value->flat<tstring>()(row_to_read_start - element_start + index) =     \
          ByteArrayToString(value_p[index]);                                  \
    }                                                                         \
  }

#define PARQUET_PROCESS_FIXED_LEN_BYTE_ARRAY(ptype, len)                      \
  {                                                                           \
    parquet::TypedColumnReader<ptype>* reader =                               \
        static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
    if (row_to_read_start > row_group_offset) {                               \
      reader->Skip(row_to_read_start - row_group_offset);                     \
    }                                                                         \
    std::unique_ptr<ptype::c_type[]> value_p(                                 \
        new ptype::c_type[row_to_read_count]);                                \
    int64_t row_left = row_to_read_count;                                     \
    while (row_left > 0) {                                                    \
      int64_t values_read;                                                    \
      int64_t levels_read = reader->ReadBatch(                                \
          row_left, nullptr, nullptr,                                         \
          &value_p.get()[row_to_read_count - row_left], &values_read);        \
      if (!(levels_read == values_read && levels_read > 0)) {                 \
        return errors::InvalidArgument("null value in column: ", column);     \
      }                                                                       \
      row_left -= levels_read;                                                \
    }                                                                         \
    for (int64_t index = 0; index < row_to_read_count; index++) {             \
      value->flat<tstring>()(row_to_read_start - element_start + index) =     \
          string((const char*)value_p[index].ptr, len);                       \
    }                                                                         \
  }

      switch (
          parquet_metadata_->schema()->Column(column_index)->physical_type()) {
        case parquet::Type::BOOLEAN:
          PARQUET_PROCESS_TYPE(parquet::BooleanType, bool);
          break;
        case parquet::Type::INT32:
          PARQUET_PROCESS_TYPE(parquet::Int32Type, int32);
          break;
        case parquet::Type::INT64:
          PARQUET_PROCESS_TYPE(parquet::Int64Type, int64);
          break;
        case parquet::Type::FLOAT:
          PARQUET_PROCESS_TYPE(parquet::FloatType, float);
          break;
        case parquet::Type::DOUBLE:
          PARQUET_PROCESS_TYPE(parquet::DoubleType, double);
          break;
        case parquet::Type::BYTE_ARRAY:
          PARQUET_PROCESS_BYTE_ARRAY(parquet::ByteArrayType);
          break;
        case parquet::Type::FIXED_LEN_BYTE_ARRAY:
          PARQUET_PROCESS_FIXED_LEN_BYTE_ARRAY(
              parquet::FLBAType,
              parquet_metadata_->schema()->Column(column_index)->type_length());
          break;
        default:
          return errors::InvalidArgument("invalid data type: ",
                                         parquet_metadata_->schema()
                                             ->Column(column_index)
                                             ->physical_type());
      }
      row_group_offset += row_group_reader->metadata()->num_rows();
    }
    return Status::OK();
  }