in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [660:760]
Status Read(const int64 start, const int64 stop, const string& component,
int64* record_read, Tensor* value, Tensor* label) override {
if (columns_index_.find(component) == columns_index_.end()) {
return errors::InvalidArgument("component ", component, " is invalid");
}
int64 column_index = columns_index_[component];
(*record_read) = 0;
if (start >= shapes_[column_index].dim_size(0)) {
return Status::OK();
}
int64 element_start = start < shapes_[column_index].dim_size(0)
? start
: shapes_[column_index].dim_size(0);
int64 element_stop = stop < shapes_[column_index].dim_size(0)
? stop
: shapes_[column_index].dim_size(0);
if (element_start > element_stop) {
return errors::InvalidArgument("dataset selection is out of boundary");
}
if (element_start == element_stop) {
return Status::OK();
}
if (feather_file_.get() == nullptr) {
feather_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));
arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
arrow::ipc::feather::Reader::Open(feather_file_);
if (!result.ok()) {
return errors::Internal(result.status().ToString());
}
reader_ = std::move(result).ValueUnsafe();
}
std::shared_ptr<arrow::Table> table;
arrow::Status s = reader_->Read(&table);
if (!s.ok()) {
return errors::Internal(s.ToString());
}
std::shared_ptr<arrow::ChunkedArray> column = table->column(column_index);
std::shared_ptr<::arrow::ChunkedArray> slice =
column->Slice(element_start, element_stop);
#define FEATHER_PROCESS_TYPE(TTYPE, ATYPE) \
{ \
int64 curr_index = 0; \
for (auto chunk : slice->chunks()) { \
for (int64_t item = 0; item < chunk->length(); item++) { \
value->flat<TTYPE>()(curr_index) = \
(dynamic_cast<ATYPE*>(chunk.get()))->Value(item); \
curr_index++; \
} \
} \
}
switch (value->dtype()) {
case DT_BOOL:
FEATHER_PROCESS_TYPE(bool, ::arrow::BooleanArray);
break;
case DT_INT8:
FEATHER_PROCESS_TYPE(int8, ::arrow::NumericArray<::arrow::Int8Type>);
break;
case DT_UINT8:
FEATHER_PROCESS_TYPE(uint8, ::arrow::NumericArray<::arrow::UInt8Type>);
break;
case DT_INT16:
FEATHER_PROCESS_TYPE(int16, ::arrow::NumericArray<::arrow::Int16Type>);
break;
case DT_UINT16:
FEATHER_PROCESS_TYPE(uint16,
::arrow::NumericArray<::arrow::UInt16Type>);
break;
case DT_INT32:
FEATHER_PROCESS_TYPE(int32, ::arrow::NumericArray<::arrow::Int32Type>);
break;
case DT_UINT32:
FEATHER_PROCESS_TYPE(uint32,
::arrow::NumericArray<::arrow::UInt32Type>);
break;
case DT_INT64:
FEATHER_PROCESS_TYPE(int64, ::arrow::NumericArray<::arrow::Int64Type>);
break;
case DT_UINT64:
FEATHER_PROCESS_TYPE(uint64,
::arrow::NumericArray<::arrow::UInt64Type>);
break;
case DT_FLOAT:
FEATHER_PROCESS_TYPE(float, ::arrow::NumericArray<::arrow::FloatType>);
break;
case DT_DOUBLE:
FEATHER_PROCESS_TYPE(double,
::arrow::NumericArray<::arrow::DoubleType>);
break;
default:
return errors::InvalidArgument("data type is not supported: ",
DataTypeString(value->dtype()));
}
(*record_read) = element_stop - element_start;
return Status::OK();
}