in tensorflow_io/core/kernels/csv_kernels.cc [155:265]
Status Read(const int64 start, const int64 stop, const string& component,
int64* record_read, Tensor* value, Tensor* label) override {
if (columns_index_.find(component) == columns_index_.end()) {
return errors::InvalidArgument("component ", component, " is invalid");
}
int64 column_index = columns_index_[component];
(*record_read) = 0;
if (start >= shapes_[column_index].dim_size(0)) {
return Status::OK();
}
const string& column = component;
int64 element_start = start < shapes_[column_index].dim_size(0)
? start
: shapes_[column_index].dim_size(0);
int64 element_stop = stop < shapes_[column_index].dim_size(0)
? stop
: shapes_[column_index].dim_size(0);
if (element_start > element_stop) {
return errors::InvalidArgument("dataset ", column,
" selection is out of boundary");
}
if (element_start == element_stop) {
return Status::OK();
}
std::shared_ptr<::arrow::ChunkedArray> slice =
table_->column(column_index)->Slice(element_start, element_stop);
#define PROCESS_TYPE(TTYPE, ATYPE) \
{ \
int64 curr_index = 0; \
for (auto chunk : slice->chunks()) { \
for (int64_t item = 0; item < chunk->length(); item++) { \
value->flat<TTYPE>()(curr_index) = \
(dynamic_cast<ATYPE*>(chunk.get()))->Value(item); \
curr_index++; \
} \
} \
}
#define PROCESS_STRING_TYPE(ATYPE) \
{ \
int64 curr_index = 0; \
for (auto chunk : slice->chunks()) { \
for (int64_t item = 0; item < chunk->length(); item++) { \
value->flat<tstring>()(curr_index) = \
(dynamic_cast<ATYPE*>(chunk.get()))->GetString(item); \
curr_index++; \
} \
} \
}
if (value != nullptr) {
switch (value->dtype()) {
case DT_BOOL:
PROCESS_TYPE(bool, ::arrow::BooleanArray);
break;
case DT_INT8:
PROCESS_TYPE(int8, ::arrow::NumericArray<::arrow::Int8Type>);
break;
case DT_UINT8:
PROCESS_TYPE(uint8, ::arrow::NumericArray<::arrow::UInt8Type>);
break;
case DT_INT16:
PROCESS_TYPE(int16, ::arrow::NumericArray<::arrow::Int16Type>);
break;
case DT_UINT16:
PROCESS_TYPE(uint16, ::arrow::NumericArray<::arrow::UInt16Type>);
break;
case DT_INT32:
PROCESS_TYPE(int32, ::arrow::NumericArray<::arrow::Int32Type>);
break;
case DT_UINT32:
PROCESS_TYPE(uint32, ::arrow::NumericArray<::arrow::UInt32Type>);
break;
case DT_INT64:
PROCESS_TYPE(int64, ::arrow::NumericArray<::arrow::Int64Type>);
break;
case DT_UINT64:
PROCESS_TYPE(uint64, ::arrow::NumericArray<::arrow::UInt64Type>);
break;
case DT_FLOAT:
PROCESS_TYPE(float, ::arrow::NumericArray<::arrow::FloatType>);
break;
case DT_DOUBLE:
PROCESS_TYPE(double, ::arrow::NumericArray<::arrow::DoubleType>);
break;
case DT_STRING:
PROCESS_STRING_TYPE(::arrow::StringArray);
break;
default:
return errors::InvalidArgument("data type is not supported: ",
DataTypeString(value->dtype()));
}
}
if (label != nullptr) {
int64 curr_index = 0;
for (auto chunk : slice->chunks()) {
for (int64_t item = 0; item < chunk->length(); item++) {
label->flat<bool>()(curr_index) = chunk->IsNull(item);
curr_index++;
}
}
}
(*record_read) = element_stop - element_start;
return Status::OK();
}