void Compute()

in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [387:518]


  void Compute(OpKernelContext* context) override {
    const Tensor& filename_tensor = context->input(0);
    const string filename = filename_tensor.scalar<tstring>()();

    const Tensor& memory_tensor = context->input(1);
    const string& memory = memory_tensor.scalar<tstring>()();
    std::unique_ptr<SizedRandomAccessFile> file(new SizedRandomAccessFile(
        env_, filename, memory.data(), memory.size()));
    uint64 size;
    OP_REQUIRES_OK(context, file->GetFileSize(&size));

    // FEA1.....[metadata][uint32 metadata_length]FEA1
    static constexpr const char* kFeatherMagicBytes = "FEA1";

    size_t header_length = strlen(kFeatherMagicBytes);
    size_t footer_length = sizeof(uint32) + strlen(kFeatherMagicBytes);

    string buffer;
    buffer.resize(header_length > footer_length ? header_length
                                                : footer_length);

    StringPiece result;

    OP_REQUIRES_OK(context, file->Read(0, header_length, &result, &buffer[0]));
    OP_REQUIRES(context,
                !memcmp(buffer.data(), kFeatherMagicBytes, header_length),
                errors::InvalidArgument("not a feather file"));

    OP_REQUIRES_OK(context, file->Read(size - footer_length, footer_length,
                                       &result, &buffer[0]));
    OP_REQUIRES(context,
                !memcmp(buffer.data() + sizeof(uint32), kFeatherMagicBytes,
                        footer_length - sizeof(uint32)),
                errors::InvalidArgument("incomplete feather file"));

    uint32 metadata_length = *reinterpret_cast<const uint32*>(buffer.data());

    buffer.resize(metadata_length);

    OP_REQUIRES_OK(context, file->Read(size - footer_length - metadata_length,
                                       metadata_length, &result, &buffer[0]));

    const ::arrow::ipc::feather::fbs::CTable* table =
        ::arrow::ipc::feather::fbs::GetCTable(buffer.data());

    OP_REQUIRES(context,
                (table->version() >= ::arrow::ipc::feather::kFeatherV1Version),
                errors::InvalidArgument(
                    "feather file is old: ", table->version(), " vs. ",
                    ::arrow::ipc::feather::kFeatherV1Version));

    std::vector<string> columns;
    std::vector<string> dtypes;
    std::vector<int64> counts;
    columns.reserve(table->columns()->size());
    dtypes.reserve(table->columns()->size());
    counts.reserve(table->columns()->size());

    for (int64 i = 0; i < table->columns()->size(); i++) {
      DataType dtype = ::tensorflow::DataType::DT_INVALID;
      switch (table->columns()->Get(i)->values()->type()) {
        case ::arrow::ipc::feather::fbs::Type::BOOL:
          dtype = ::tensorflow::DataType::DT_BOOL;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT8:
          dtype = ::tensorflow::DataType::DT_INT8;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT16:
          dtype = ::tensorflow::DataType::DT_INT16;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT32:
          dtype = ::tensorflow::DataType::DT_INT32;
          break;
        case ::arrow::ipc::feather::fbs::Type::INT64:
          dtype = ::tensorflow::DataType::DT_INT64;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT8:
          dtype = ::tensorflow::DataType::DT_UINT8;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT16:
          dtype = ::tensorflow::DataType::DT_UINT16;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT32:
          dtype = ::tensorflow::DataType::DT_UINT32;
          break;
        case ::arrow::ipc::feather::fbs::Type::UINT64:
          dtype = ::tensorflow::DataType::DT_UINT64;
          break;
        case ::arrow::ipc::feather::fbs::Type::FLOAT:
          dtype = ::tensorflow::DataType::DT_FLOAT;
          break;
        case ::arrow::ipc::feather::fbs::Type::DOUBLE:
          dtype = ::tensorflow::DataType::DT_DOUBLE;
          break;
        case ::arrow::ipc::feather::fbs::Type::UTF8:
        case ::arrow::ipc::feather::fbs::Type::BINARY:
        case ::arrow::ipc::feather::fbs::Type::CATEGORY:
        case ::arrow::ipc::feather::fbs::Type::TIMESTAMP:
        case ::arrow::ipc::feather::fbs::Type::DATE:
        case ::arrow::ipc::feather::fbs::Type::TIME:
        // case ::arrow::ipc::feather::fbs::Type::LARGE_UTF8:
        // case ::arrow::ipc::feather::fbs::Type::LARGE_BINARY:
        default:
          break;
      }
      columns.push_back(table->columns()->Get(i)->name()->str());
      dtypes.push_back(::tensorflow::DataTypeString(dtype));
      counts.push_back(table->num_rows());
    }

    TensorShape output_shape = filename_tensor.shape();
    output_shape.AddDim(columns.size());

    Tensor* columns_tensor;
    OP_REQUIRES_OK(context,
                   context->allocate_output(0, output_shape, &columns_tensor));
    Tensor* dtypes_tensor;
    OP_REQUIRES_OK(context,
                   context->allocate_output(1, output_shape, &dtypes_tensor));

    output_shape.AddDim(1);

    Tensor* shapes_tensor;
    OP_REQUIRES_OK(context,
                   context->allocate_output(2, output_shape, &shapes_tensor));

    for (size_t i = 0; i < columns.size(); i++) {
      columns_tensor->flat<tstring>()(i) = columns[i];
      dtypes_tensor->flat<tstring>()(i) = dtypes[i];
      shapes_tensor->flat<int64>()(i) = counts[i];
    }
  }