in tensorflow_io/core/kernels/arrow/arrow_kernels.cc [535:641]
Status Init(const std::vector<string>& input,
const std::vector<string>& metadata, const void* memory_data,
const int64 memory_size) override {
if (input.size() > 1) {
return errors::InvalidArgument("more than 1 filename is not supported");
}
const string& filename = input[0];
file_.reset(
new SizedRandomAccessFile(env_, filename, memory_data, memory_size));
TF_RETURN_IF_ERROR(file_->GetFileSize(&file_size_));
// FEA1.....[metadata][uint32 metadata_length]FEA1
static constexpr const char* kFeatherMagicBytes = "FEA1";
size_t header_length = strlen(kFeatherMagicBytes);
size_t footer_length = sizeof(uint32) + strlen(kFeatherMagicBytes);
string buffer;
buffer.resize(header_length > footer_length ? header_length
: footer_length);
StringPiece result;
TF_RETURN_IF_ERROR(file_->Read(0, header_length, &result, &buffer[0]));
if (memcmp(buffer.data(), kFeatherMagicBytes, header_length) != 0) {
return errors::InvalidArgument("not a feather file");
}
TF_RETURN_IF_ERROR(file_->Read(file_size_ - footer_length, footer_length,
&result, &buffer[0]));
if (memcmp(buffer.data() + sizeof(uint32), kFeatherMagicBytes,
footer_length - sizeof(uint32)) != 0) {
return errors::InvalidArgument("incomplete feather file");
}
uint32 metadata_length = *reinterpret_cast<const uint32*>(buffer.data());
buffer.resize(metadata_length);
TF_RETURN_IF_ERROR(file_->Read(file_size_ - footer_length - metadata_length,
metadata_length, &result, &buffer[0]));
const ::arrow::ipc::feather::fbs::CTable* table =
::arrow::ipc::feather::fbs::GetCTable(buffer.data());
if (table->version() < ::arrow::ipc::feather::kFeatherV1Version) {
return errors::InvalidArgument("feather file is old: ", table->version(),
" vs. ",
::arrow::ipc::feather::kFeatherV1Version);
}
for (size_t i = 0; i < table->columns()->size(); i++) {
::tensorflow::DataType dtype = ::tensorflow::DataType::DT_INVALID;
switch (table->columns()->Get(i)->values()->type()) {
case ::arrow::ipc::feather::fbs::Type::BOOL:
dtype = ::tensorflow::DataType::DT_BOOL;
break;
case ::arrow::ipc::feather::fbs::Type::INT8:
dtype = ::tensorflow::DataType::DT_INT8;
break;
case ::arrow::ipc::feather::fbs::Type::INT16:
dtype = ::tensorflow::DataType::DT_INT16;
break;
case ::arrow::ipc::feather::fbs::Type::INT32:
dtype = ::tensorflow::DataType::DT_INT32;
break;
case ::arrow::ipc::feather::fbs::Type::INT64:
dtype = ::tensorflow::DataType::DT_INT64;
break;
case ::arrow::ipc::feather::fbs::Type::UINT8:
dtype = ::tensorflow::DataType::DT_UINT8;
break;
case ::arrow::ipc::feather::fbs::Type::UINT16:
dtype = ::tensorflow::DataType::DT_UINT16;
break;
case ::arrow::ipc::feather::fbs::Type::UINT32:
dtype = ::tensorflow::DataType::DT_UINT32;
break;
case ::arrow::ipc::feather::fbs::Type::UINT64:
dtype = ::tensorflow::DataType::DT_UINT64;
break;
case ::arrow::ipc::feather::fbs::Type::FLOAT:
dtype = ::tensorflow::DataType::DT_FLOAT;
break;
case ::arrow::ipc::feather::fbs::Type::DOUBLE:
dtype = ::tensorflow::DataType::DT_DOUBLE;
break;
case ::arrow::ipc::feather::fbs::Type::UTF8:
case ::arrow::ipc::feather::fbs::Type::BINARY:
case ::arrow::ipc::feather::fbs::Type::CATEGORY:
case ::arrow::ipc::feather::fbs::Type::TIMESTAMP:
case ::arrow::ipc::feather::fbs::Type::DATE:
case ::arrow::ipc::feather::fbs::Type::TIME:
// case ::arrow::ipc::feather::fbs::Type::LARGE_UTF8:
// case ::arrow::ipc::feather::fbs::Type::LARGE_BINARY:
default:
break;
}
shapes_.push_back(TensorShape({static_cast<int64>(table->num_rows())}));
dtypes_.push_back(dtype);
columns_.push_back(table->columns()->Get(i)->name()->str());
columns_index_[table->columns()->Get(i)->name()->str()] = i;
}
return Status::OK();
}