in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [387:518]
void Compute(OpKernelContext* context) override {
const Tensor& filename_tensor = context->input(0);
const string filename = filename_tensor.scalar<tstring>()();
const Tensor& memory_tensor = context->input(1);
const string& memory = memory_tensor.scalar<tstring>()();
std::unique_ptr<SizedRandomAccessFile> file(new SizedRandomAccessFile(
env_, filename, memory.data(), memory.size()));
uint64 size;
OP_REQUIRES_OK(context, file->GetFileSize(&size));
// FEA1.....[metadata][uint32 metadata_length]FEA1
static constexpr const char* kFeatherMagicBytes = "FEA1";
size_t header_length = strlen(kFeatherMagicBytes);
size_t footer_length = sizeof(uint32) + strlen(kFeatherMagicBytes);
string buffer;
buffer.resize(header_length > footer_length ? header_length
: footer_length);
StringPiece result;
OP_REQUIRES_OK(context, file->Read(0, header_length, &result, &buffer[0]));
OP_REQUIRES(context,
!memcmp(buffer.data(), kFeatherMagicBytes, header_length),
errors::InvalidArgument("not a feather file"));
OP_REQUIRES_OK(context, file->Read(size - footer_length, footer_length,
&result, &buffer[0]));
OP_REQUIRES(context,
!memcmp(buffer.data() + sizeof(uint32), kFeatherMagicBytes,
footer_length - sizeof(uint32)),
errors::InvalidArgument("incomplete feather file"));
uint32 metadata_length = *reinterpret_cast<const uint32*>(buffer.data());
buffer.resize(metadata_length);
OP_REQUIRES_OK(context, file->Read(size - footer_length - metadata_length,
metadata_length, &result, &buffer[0]));
const ::arrow::ipc::feather::fbs::CTable* table =
::arrow::ipc::feather::fbs::GetCTable(buffer.data());
OP_REQUIRES(context,
(table->version() >= ::arrow::ipc::feather::kFeatherV1Version),
errors::InvalidArgument(
"feather file is old: ", table->version(), " vs. ",
::arrow::ipc::feather::kFeatherV1Version));
std::vector<string> columns;
std::vector<string> dtypes;
std::vector<int64> counts;
columns.reserve(table->columns()->size());
dtypes.reserve(table->columns()->size());
counts.reserve(table->columns()->size());
for (int64 i = 0; i < table->columns()->size(); i++) {
DataType dtype = ::tensorflow::DataType::DT_INVALID;
switch (table->columns()->Get(i)->values()->type()) {
case ::arrow::ipc::feather::fbs::Type::BOOL:
dtype = ::tensorflow::DataType::DT_BOOL;
break;
case ::arrow::ipc::feather::fbs::Type::INT8:
dtype = ::tensorflow::DataType::DT_INT8;
break;
case ::arrow::ipc::feather::fbs::Type::INT16:
dtype = ::tensorflow::DataType::DT_INT16;
break;
case ::arrow::ipc::feather::fbs::Type::INT32:
dtype = ::tensorflow::DataType::DT_INT32;
break;
case ::arrow::ipc::feather::fbs::Type::INT64:
dtype = ::tensorflow::DataType::DT_INT64;
break;
case ::arrow::ipc::feather::fbs::Type::UINT8:
dtype = ::tensorflow::DataType::DT_UINT8;
break;
case ::arrow::ipc::feather::fbs::Type::UINT16:
dtype = ::tensorflow::DataType::DT_UINT16;
break;
case ::arrow::ipc::feather::fbs::Type::UINT32:
dtype = ::tensorflow::DataType::DT_UINT32;
break;
case ::arrow::ipc::feather::fbs::Type::UINT64:
dtype = ::tensorflow::DataType::DT_UINT64;
break;
case ::arrow::ipc::feather::fbs::Type::FLOAT:
dtype = ::tensorflow::DataType::DT_FLOAT;
break;
case ::arrow::ipc::feather::fbs::Type::DOUBLE:
dtype = ::tensorflow::DataType::DT_DOUBLE;
break;
case ::arrow::ipc::feather::fbs::Type::UTF8:
case ::arrow::ipc::feather::fbs::Type::BINARY:
case ::arrow::ipc::feather::fbs::Type::CATEGORY:
case ::arrow::ipc::feather::fbs::Type::TIMESTAMP:
case ::arrow::ipc::feather::fbs::Type::DATE:
case ::arrow::ipc::feather::fbs::Type::TIME:
// case ::arrow::ipc::feather::fbs::Type::LARGE_UTF8:
// case ::arrow::ipc::feather::fbs::Type::LARGE_BINARY:
default:
break;
}
columns.push_back(table->columns()->Get(i)->name()->str());
dtypes.push_back(::tensorflow::DataTypeString(dtype));
counts.push_back(table->num_rows());
}
TensorShape output_shape = filename_tensor.shape();
output_shape.AddDim(columns.size());
Tensor* columns_tensor;
OP_REQUIRES_OK(context,
context->allocate_output(0, output_shape, &columns_tensor));
Tensor* dtypes_tensor;
OP_REQUIRES_OK(context,
context->allocate_output(1, output_shape, &dtypes_tensor));
output_shape.AddDim(1);
Tensor* shapes_tensor;
OP_REQUIRES_OK(context,
context->allocate_output(2, output_shape, &shapes_tensor));
for (size_t i = 0; i < columns.size(); i++) {
columns_tensor->flat<tstring>()(i) = columns[i];
dtypes_tensor->flat<tstring>()(i) = dtypes[i];
shapes_tensor->flat<int64>()(i) = counts[i];
}
}