Status Read()

in tensorflow_io/core/kernels/json_kernels.cc [155:254]


  Status Read(const int64 start, const int64 stop, const string& component,
              int64* record_read, Tensor* value, Tensor* label) override {
    if (columns_index_.find(component) == columns_index_.end()) {
      return errors::InvalidArgument("component ", component, " is invalid");
    }
    int64 column_index = columns_index_[component];

    (*record_read) = 0;
    if (start >= shapes_[column_index].dim_size(0)) {
      return Status::OK();
    }
    const string& column = component;
    int64 element_start = start < shapes_[column_index].dim_size(0)
                              ? start
                              : shapes_[column_index].dim_size(0);
    int64 element_stop = stop < shapes_[column_index].dim_size(0)
                             ? stop
                             : shapes_[column_index].dim_size(0);

    if (element_start > element_stop) {
      return errors::InvalidArgument("dataset ", column,
                                     " selection is out of boundary");
    }
    if (element_start == element_stop) {
      return Status::OK();
    }

    if (mode_ == "records") {
      if (dtypes_[column_index] == DT_INT64) {
        memcpy(&value->flat<int64>().data()[0],
               &tensors_[column_index].flat<int64>().data()[element_start],
               sizeof(int64) * (element_stop - element_start));
      } else if (dtypes_[column_index] == DT_DOUBLE) {
        memcpy(&value->flat<double>().data()[0],
               &tensors_[column_index].flat<double>().data()[element_start],
               sizeof(double) * (element_stop - element_start));
      } else {
        return errors::InvalidArgument("invalid data type: ",
                                       dtypes_[column_index]);
      }
      (*record_read) = element_stop - element_start;

      return Status::OK();
    }

    std::shared_ptr<::arrow::ChunkedArray> slice =
        table_->column(column_index)->Slice(element_start, element_stop);

#define PROCESS_TYPE(TTYPE, ATYPE)                             \
  {                                                            \
    int64 curr_index = 0;                                      \
    for (auto chunk : slice->chunks()) {                       \
      for (int64_t item = 0; item < chunk->length(); item++) { \
        value->flat<TTYPE>()(curr_index) =                     \
            (dynamic_cast<ATYPE*>(chunk.get()))->Value(item);  \
        curr_index++;                                          \
      }                                                        \
    }                                                          \
  }
    switch (value->dtype()) {
      case DT_BOOL:
        PROCESS_TYPE(bool, ::arrow::BooleanArray);
        break;
      case DT_INT8:
        PROCESS_TYPE(int8, ::arrow::NumericArray<::arrow::Int8Type>);
        break;
      case DT_UINT8:
        PROCESS_TYPE(uint8, ::arrow::NumericArray<::arrow::UInt8Type>);
        break;
      case DT_INT16:
        PROCESS_TYPE(int16, ::arrow::NumericArray<::arrow::Int16Type>);
        break;
      case DT_UINT16:
        PROCESS_TYPE(uint16, ::arrow::NumericArray<::arrow::UInt16Type>);
        break;
      case DT_INT32:
        PROCESS_TYPE(int32, ::arrow::NumericArray<::arrow::Int32Type>);
        break;
      case DT_UINT32:
        PROCESS_TYPE(uint32, ::arrow::NumericArray<::arrow::UInt32Type>);
        break;
      case DT_INT64:
        PROCESS_TYPE(int64, ::arrow::NumericArray<::arrow::Int64Type>);
        break;
      case DT_UINT64:
        PROCESS_TYPE(uint64, ::arrow::NumericArray<::arrow::UInt64Type>);
        break;
      case DT_FLOAT:
        PROCESS_TYPE(float, ::arrow::NumericArray<::arrow::FloatType>);
        break;
      case DT_DOUBLE:
        PROCESS_TYPE(double, ::arrow::NumericArray<::arrow::DoubleType>);
        break;
      default:
        return errors::InvalidArgument("data type is not supported: ",
                                       DataTypeString(value->dtype()));
    }
    (*record_read) = element_stop - element_start;
    return Status::OK();
  }