in tensorflow_io/core/kernels/parquet_kernels.cc [123:283]
Status Read(const string& component,
const absl::InlinedVector<int64, 4>& start,
const TensorShape& shape,
std::function<Status(const TensorShape& shape, Tensor** value)>
allocate_func) {
mutex_lock l(mu_);
if (columns_index_.find(component) == columns_index_.end()) {
return errors::InvalidArgument("component ", component, " is invalid");
}
const int64 column_index = columns_index_[component];
Tensor* value;
TF_RETURN_IF_ERROR(allocate_func(shape, &value));
const string& column = component;
int64 element_start = start[0];
int64 element_stop = start[0] + shape.dim_size(0);
int64 row_group_offset = 0;
for (int row_group = 0; row_group < parquet_metadata_->num_row_groups();
row_group++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader_->RowGroup(row_group);
// Skip if row group is not within [start..stop]
if ((row_group_offset + row_group_reader->metadata()->num_rows() <
element_start) ||
(element_stop <= row_group_offset)) {
row_group_offset += row_group_reader->metadata()->num_rows();
continue;
}
// Find row_to_read range
int64 row_to_read_start =
row_group_offset > element_start ? row_group_offset : element_start;
int64 row_to_read_final =
(row_group_offset + row_group_reader->metadata()->num_rows()) <
(element_stop)
? (row_group_offset + row_group_reader->metadata()->num_rows())
: (element_stop);
int64 row_to_read_count = row_to_read_final - row_to_read_start;
// TODO: parquet is RowGroup based so ideally the RowGroup should be
// cached with the hope of indexing and slicing happens on each row. For
// now no caching is done yet.
std::shared_ptr<parquet::ColumnReader> column_reader =
row_group_reader->Column(column_index);
// buffer to fill location is value.data()[row_to_read_start - start]
// Note: ReadBatch may not be able to read the elements requested
// (row_to_read_count) in one shot, as such we use while loop of
// `while (row_left > 0) {...}` to read until complete.
#define PARQUET_PROCESS_TYPE(ptype, type) \
{ \
parquet::TypedColumnReader<ptype>* reader = \
static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
if (row_to_read_start > row_group_offset) { \
reader->Skip(row_to_read_start - row_group_offset); \
} \
ptype::c_type* value_p = (ptype::c_type*)(void*)(&( \
value->flat<type>().data()[row_to_read_start - element_start])); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, &value_p[row_to_read_count - row_left], \
&values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
}
#define PARQUET_PROCESS_BYTE_ARRAY(ptype) \
{ \
parquet::TypedColumnReader<ptype>* reader = \
static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
if (row_to_read_start > row_group_offset) { \
reader->Skip(row_to_read_start - row_group_offset); \
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
ByteArrayToString(value_p[index]); \
} \
}
#define PARQUET_PROCESS_FIXED_LEN_BYTE_ARRAY(ptype, len) \
{ \
parquet::TypedColumnReader<ptype>* reader = \
static_cast<parquet::TypedColumnReader<ptype>*>(column_reader.get()); \
if (row_to_read_start > row_group_offset) { \
reader->Skip(row_to_read_start - row_group_offset); \
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
string((const char*)value_p[index].ptr, len); \
} \
}
switch (
parquet_metadata_->schema()->Column(column_index)->physical_type()) {
case parquet::Type::BOOLEAN:
PARQUET_PROCESS_TYPE(parquet::BooleanType, bool);
break;
case parquet::Type::INT32:
PARQUET_PROCESS_TYPE(parquet::Int32Type, int32);
break;
case parquet::Type::INT64:
PARQUET_PROCESS_TYPE(parquet::Int64Type, int64);
break;
case parquet::Type::FLOAT:
PARQUET_PROCESS_TYPE(parquet::FloatType, float);
break;
case parquet::Type::DOUBLE:
PARQUET_PROCESS_TYPE(parquet::DoubleType, double);
break;
case parquet::Type::BYTE_ARRAY:
PARQUET_PROCESS_BYTE_ARRAY(parquet::ByteArrayType);
break;
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
PARQUET_PROCESS_FIXED_LEN_BYTE_ARRAY(
parquet::FLBAType,
parquet_metadata_->schema()->Column(column_index)->type_length());
break;
default:
return errors::InvalidArgument("invalid data type: ",
parquet_metadata_->schema()
->Column(column_index)
->physical_type());
}
row_group_offset += row_group_reader->metadata()->num_rows();
}
return Status::OK();
}