Status Read()

in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [660:760]


  Status Read(const int64 start, const int64 stop, const string& component,
              int64* record_read, Tensor* value, Tensor* label) override {
    if (columns_index_.find(component) == columns_index_.end()) {
      return errors::InvalidArgument("component ", component, " is invalid");
    }
    int64 column_index = columns_index_[component];

    (*record_read) = 0;
    if (start >= shapes_[column_index].dim_size(0)) {
      return Status::OK();
    }
    int64 element_start = start < shapes_[column_index].dim_size(0)
                              ? start
                              : shapes_[column_index].dim_size(0);
    int64 element_stop = stop < shapes_[column_index].dim_size(0)
                             ? stop
                             : shapes_[column_index].dim_size(0);

    if (element_start > element_stop) {
      return errors::InvalidArgument("dataset selection is out of boundary");
    }
    if (element_start == element_stop) {
      return Status::OK();
    }

    if (feather_file_.get() == nullptr) {
      feather_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));
      arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
          arrow::ipc::feather::Reader::Open(feather_file_);
      if (!result.ok()) {
        return errors::Internal(result.status().ToString());
      }
      reader_ = std::move(result).ValueUnsafe();
    }

    std::shared_ptr<arrow::Table> table;
    arrow::Status s = reader_->Read(&table);
    if (!s.ok()) {
      return errors::Internal(s.ToString());
    }
    std::shared_ptr<arrow::ChunkedArray> column = table->column(column_index);

    std::shared_ptr<::arrow::ChunkedArray> slice =
        column->Slice(element_start, element_stop);

#define FEATHER_PROCESS_TYPE(TTYPE, ATYPE)                     \
  {                                                            \
    int64 curr_index = 0;                                      \
    for (auto chunk : slice->chunks()) {                       \
      for (int64_t item = 0; item < chunk->length(); item++) { \
        value->flat<TTYPE>()(curr_index) =                     \
            (dynamic_cast<ATYPE*>(chunk.get()))->Value(item);  \
        curr_index++;                                          \
      }                                                        \
    }                                                          \
  }
    switch (value->dtype()) {
      case DT_BOOL:
        FEATHER_PROCESS_TYPE(bool, ::arrow::BooleanArray);
        break;
      case DT_INT8:
        FEATHER_PROCESS_TYPE(int8, ::arrow::NumericArray<::arrow::Int8Type>);
        break;
      case DT_UINT8:
        FEATHER_PROCESS_TYPE(uint8, ::arrow::NumericArray<::arrow::UInt8Type>);
        break;
      case DT_INT16:
        FEATHER_PROCESS_TYPE(int16, ::arrow::NumericArray<::arrow::Int16Type>);
        break;
      case DT_UINT16:
        FEATHER_PROCESS_TYPE(uint16,
                             ::arrow::NumericArray<::arrow::UInt16Type>);
        break;
      case DT_INT32:
        FEATHER_PROCESS_TYPE(int32, ::arrow::NumericArray<::arrow::Int32Type>);
        break;
      case DT_UINT32:
        FEATHER_PROCESS_TYPE(uint32,
                             ::arrow::NumericArray<::arrow::UInt32Type>);
        break;
      case DT_INT64:
        FEATHER_PROCESS_TYPE(int64, ::arrow::NumericArray<::arrow::Int64Type>);
        break;
      case DT_UINT64:
        FEATHER_PROCESS_TYPE(uint64,
                             ::arrow::NumericArray<::arrow::UInt64Type>);
        break;
      case DT_FLOAT:
        FEATHER_PROCESS_TYPE(float, ::arrow::NumericArray<::arrow::FloatType>);
        break;
      case DT_DOUBLE:
        FEATHER_PROCESS_TYPE(double,
                             ::arrow::NumericArray<::arrow::DoubleType>);
        break;
      default:
        return errors::InvalidArgument("data type is not supported: ",
                                       DataTypeString(value->dtype()));
    }
    (*record_read) = element_stop - element_start;
    return Status::OK();
  }