in tensorflow_io/core/kernels/json_kernels.cc [155:254]
Status Read(const int64 start, const int64 stop, const string& component,
int64* record_read, Tensor* value, Tensor* label) override {
if (columns_index_.find(component) == columns_index_.end()) {
return errors::InvalidArgument("component ", component, " is invalid");
}
int64 column_index = columns_index_[component];
(*record_read) = 0;
if (start >= shapes_[column_index].dim_size(0)) {
return Status::OK();
}
const string& column = component;
int64 element_start = start < shapes_[column_index].dim_size(0)
? start
: shapes_[column_index].dim_size(0);
int64 element_stop = stop < shapes_[column_index].dim_size(0)
? stop
: shapes_[column_index].dim_size(0);
if (element_start > element_stop) {
return errors::InvalidArgument("dataset ", column,
" selection is out of boundary");
}
if (element_start == element_stop) {
return Status::OK();
}
if (mode_ == "records") {
if (dtypes_[column_index] == DT_INT64) {
memcpy(&value->flat<int64>().data()[0],
&tensors_[column_index].flat<int64>().data()[element_start],
sizeof(int64) * (element_stop - element_start));
} else if (dtypes_[column_index] == DT_DOUBLE) {
memcpy(&value->flat<double>().data()[0],
&tensors_[column_index].flat<double>().data()[element_start],
sizeof(double) * (element_stop - element_start));
} else {
return errors::InvalidArgument("invalid data type: ",
dtypes_[column_index]);
}
(*record_read) = element_stop - element_start;
return Status::OK();
}
std::shared_ptr<::arrow::ChunkedArray> slice =
table_->column(column_index)->Slice(element_start, element_stop);
#define PROCESS_TYPE(TTYPE, ATYPE) \
{ \
int64 curr_index = 0; \
for (auto chunk : slice->chunks()) { \
for (int64_t item = 0; item < chunk->length(); item++) { \
value->flat<TTYPE>()(curr_index) = \
(dynamic_cast<ATYPE*>(chunk.get()))->Value(item); \
curr_index++; \
} \
} \
}
switch (value->dtype()) {
case DT_BOOL:
PROCESS_TYPE(bool, ::arrow::BooleanArray);
break;
case DT_INT8:
PROCESS_TYPE(int8, ::arrow::NumericArray<::arrow::Int8Type>);
break;
case DT_UINT8:
PROCESS_TYPE(uint8, ::arrow::NumericArray<::arrow::UInt8Type>);
break;
case DT_INT16:
PROCESS_TYPE(int16, ::arrow::NumericArray<::arrow::Int16Type>);
break;
case DT_UINT16:
PROCESS_TYPE(uint16, ::arrow::NumericArray<::arrow::UInt16Type>);
break;
case DT_INT32:
PROCESS_TYPE(int32, ::arrow::NumericArray<::arrow::Int32Type>);
break;
case DT_UINT32:
PROCESS_TYPE(uint32, ::arrow::NumericArray<::arrow::UInt32Type>);
break;
case DT_INT64:
PROCESS_TYPE(int64, ::arrow::NumericArray<::arrow::Int64Type>);
break;
case DT_UINT64:
PROCESS_TYPE(uint64, ::arrow::NumericArray<::arrow::UInt64Type>);
break;
case DT_FLOAT:
PROCESS_TYPE(float, ::arrow::NumericArray<::arrow::FloatType>);
break;
case DT_DOUBLE:
PROCESS_TYPE(double, ::arrow::NumericArray<::arrow::DoubleType>);
break;
default:
return errors::InvalidArgument("data type is not supported: ",
DataTypeString(value->dtype()));
}
(*record_read) = element_stop - element_start;
return Status::OK();
}