in tensorflow_io/core/kernels/avro_kernels.cc [439:547]
Status Read(const int64 start, const int64 stop, const string& component,
int64* record_read, Tensor* value, Tensor* label) override {
if (columns_index_.find(component) == columns_index_.end()) {
return errors::InvalidArgument("component ", component, " is invalid");
}
int64 column_index = columns_index_[component];
(*record_read) = 0;
if (start >= shapes_[column_index].dim_size(0)) {
return Status::OK();
}
const string& column = component;
int64 element_start = start < shapes_[column_index].dim_size(0)
? start
: shapes_[column_index].dim_size(0);
int64 element_stop = stop < shapes_[column_index].dim_size(0)
? stop
: shapes_[column_index].dim_size(0);
if (element_start > element_stop) {
return errors::InvalidArgument("dataset ", column,
" selection is out of boundary");
}
if (element_start == element_stop) {
return Status::OK();
}
avro::GenericDatum datum(reader_schema_);
// Find the start sync point
int64 item_index_sync = 0;
for (size_t i = 0; i < positions_.size();
i++, item_index_sync += positions_[i].first) {
if (item_index_sync >= element_stop) {
continue;
}
if (item_index_sync + positions_[i].first <= element_start) {
continue;
}
// TODO: Avro is sync point partitioned and each block is very similiar to
// Row Group of parquet. Ideally each block should be cached with the hope
// that slicing and indexing will happend around the same block across
// multiple rows. Caching is not done yet.
// Seek to sync
reader_->seek(positions_[i].second);
for (int64 item_index = item_index_sync;
item_index < (item_index_sync + positions_[i].first) &&
item_index < element_stop;
item_index++) {
// Read anyway
if (!reader_->read(datum)) {
return errors::Internal("unable to read record at: ", item_index);
}
// Assign only when in range
if (item_index >= element_start) {
const avro::GenericRecord& record =
datum.value<avro::GenericRecord>();
const avro::GenericDatum& field = record.field(column);
switch (field.type()) {
case avro::AVRO_BOOL:
value->flat<bool>()(item_index - element_start) =
field.value<bool>();
break;
case avro::AVRO_INT:
value->flat<int32>()(item_index - element_start) =
field.value<int32_t>();
break;
case avro::AVRO_LONG:
value->flat<int64>()(item_index - element_start) =
field.value<int64_t>();
break;
case avro::AVRO_FLOAT:
value->flat<float>()(item_index - element_start) =
field.value<float>();
break;
case avro::AVRO_DOUBLE:
value->flat<double>()(item_index - element_start) =
field.value<double>();
break;
case avro::AVRO_STRING:
value->flat<tstring>()(item_index - element_start) =
field.value<string>();
break;
case avro::AVRO_BYTES: {
const std::vector<uint8_t>& field_value =
field.value<std::vector<uint8_t>>();
value->flat<tstring>()(item_index - element_start) =
string((char*)&field_value[0], field_value.size());
} break;
case avro::AVRO_FIXED: {
const std::vector<uint8_t>& field_value =
field.value<avro::GenericFixed>().value();
value->flat<tstring>()(item_index - element_start) =
string((char*)&field_value[0], field_value.size());
} break;
case avro::AVRO_ENUM:
value->flat<tstring>()(item_index - element_start) =
field.value<avro::GenericEnum>().symbol();
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
}
}
}
}
(*record_read) = element_stop - element_start;
return Status::OK();
}