Status Init()

in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [535:641]


  Status Init(const std::vector<string>& input,
              const std::vector<string>& metadata, const void* memory_data,
              const int64 memory_size) override {
    if (input.size() > 1) {
      return errors::InvalidArgument("more than 1 filename is not supported");
    }

    const string& filename = input[0];
    file_.reset(
        new SizedRandomAccessFile(env_, filename, memory_data, memory_size));
    TF_RETURN_IF_ERROR(file_->GetFileSize(&file_size_));

    // FEA1.....[metadata][uint32 metadata_length]FEA1
    static constexpr const char* kFeatherMagicBytes = "FEA1";

    size_t header_length = strlen(kFeatherMagicBytes);
    size_t footer_length = sizeof(uint32) + strlen(kFeatherMagicBytes);

    string buffer;
    buffer.resize(header_length > footer_length ? header_length
                                                : footer_length);

    StringPiece result;

    TF_RETURN_IF_ERROR(file_->Read(0, header_length, &result, &buffer[0]));
    if (memcmp(buffer.data(), kFeatherMagicBytes, header_length) != 0) {
      return errors::InvalidArgument("not a feather file");
    }

    TF_RETURN_IF_ERROR(file_->Read(file_size_ - footer_length, footer_length,
                                   &result, &buffer[0]));
    if (memcmp(buffer.data() + sizeof(uint32), kFeatherMagicBytes,
               footer_length - sizeof(uint32)) != 0) {
      return errors::InvalidArgument("incomplete feather file");
    }

    uint32 metadata_length = *reinterpret_cast<const uint32*>(buffer.data());

    buffer.resize(metadata_length);

    TF_RETURN_IF_ERROR(file_->Read(file_size_ - footer_length - metadata_length,
                                   metadata_length, &result, &buffer[0]));

    const ::arrow::ipc::feather::fbs::CTable* table =
        ::arrow::ipc::feather::fbs::GetCTable(buffer.data());

    if (table->version() < ::arrow::ipc::feather::kFeatherV1Version) {
      return errors::InvalidArgument("feather file is old: ", table->version(),
                                     " vs. ",
                                     ::arrow::ipc::feather::kFeatherV1Version);
    }

    for (size_t i = 0; i < table->columns()->size(); i++) {
      ::tensorflow::DataType dtype = ::tensorflow::DataType::DT_INVALID;
      switch (table->columns()->Get(i)->values()->type()) {
        case ::arrow::ipc::feather::fbs::Type::BOOL:
          dtype = ::tensorflow::DataType::DT_BOOL;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT8:
          dtype = ::tensorflow::DataType::DT_INT8;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT16:
          dtype = ::tensorflow::DataType::DT_INT16;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT32:
          dtype = ::tensorflow::DataType::DT_INT32;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT64:
          dtype = ::tensorflow::DataType::DT_INT64;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT8:
          dtype = ::tensorflow::DataType::DT_UINT8;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT16:
          dtype = ::tensorflow::DataType::DT_UINT16;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT32:
          dtype = ::tensorflow::DataType::DT_UINT32;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT64:
          dtype = ::tensorflow::DataType::DT_UINT64;
          break;
        case ::arrow::ipc::feather::fbs::Type::FLOAT:
          dtype = ::tensorflow::DataType::DT_FLOAT;
          break;
        case ::arrow::ipc::feather::fbs::Type::DOUBLE:
          dtype = ::tensorflow::DataType::DT_DOUBLE;
          break;
        case ::arrow::ipc::feather::fbs::Type::UTF8:
        case ::arrow::ipc::feather::fbs::Type::BINARY:
        case ::arrow::ipc::feather::fbs::Type::CATEGORY:
        case ::arrow::ipc::feather::fbs::Type::TIMESTAMP:
        case ::arrow::ipc::feather::fbs::Type::DATE:
        case ::arrow::ipc::feather::fbs::Type::TIME:
        // case ::arrow::ipc::feather::fbs::Type::LARGE_UTF8:
        // case ::arrow::ipc::feather::fbs::Type::LARGE_BINARY:
        default:
          break;
      }
      shapes_.push_back(TensorShape({static_cast<int64>(table->num_rows())}));
      dtypes_.push_back(dtype);
      columns_.push_back(table->columns()->Get(i)->name()->str());
      columns_index_[table->columns()->Get(i)->name()->str()] = i;
    }

    return Status::OK();
  }