in tensorflow_io/core/kernels/avro_kernels.cc [162:290]
void Compute(OpKernelContext* context) override {
const Tensor& filename_tensor = context->input(0);
string filename = filename_tensor.scalar<tstring>()();
const Tensor& schema_tensor = context->input(1);
string schema = schema_tensor.scalar<tstring>()();
const Tensor& column_tensor = context->input(2);
string column = column_tensor.scalar<tstring>()();
const Tensor& memory_tensor = context->input(3);
string memory = memory_tensor.scalar<tstring>()();
const Tensor& offset_tensor = context->input(4);
const int64 offset = offset_tensor.scalar<int64>()();
const Tensor& length_tensor = context->input(5);
int64 length = length_tensor.scalar<int64>()();
avro::ValidSchema reader_schema;
string error;
std::istringstream ss(schema);
OP_REQUIRES(context, avro::compileJsonSchema(ss, reader_schema, error),
errors::Unimplemented("Avro schema error: ", error));
std::unique_ptr<SizedRandomAccessFile> file(new SizedRandomAccessFile(
env_, filename, memory.data(), memory.size()));
uint64 size;
OP_REQUIRES_OK(context, file->GetFileSize(&size));
if (length < 0) {
length = size - offset;
}
avro::GenericDatum datum(reader_schema);
std::unique_ptr<avro::InputStream> stream(new AvroInputStream(file.get()));
std::unique_ptr<avro::DataFileReader<avro::GenericDatum>> reader(
new avro::DataFileReader<avro::GenericDatum>(std::move(stream),
reader_schema));
if (offset != 0) {
reader->sync(offset);
}
#define BOOL_VALUE records.push_back(field.value<bool>())
#define INT32_VALUE records.emplace_back(field.value<int32_t>())
#define INT64_VALUE records.emplace_back(field.value<int64_t>())
#define FLOAT_VALUE records.emplace_back(field.value<float>())
#define DOUBLE_VALUE records.emplace_back(field.value<double>())
#define STRING_VALUE records.emplace_back(field.value<string>())
#define BYTES_VALUE \
{ \
const std::vector<uint8_t>& value = field.value<std::vector<uint8_t>>(); \
string v; \
if (value.size() > 0) { \
v.resize(value.size()); \
memcpy(&v[0], &value[0], value.size()); \
} \
records.emplace_back(v); \
}
#define FIXED_VALUE \
{ \
const std::vector<uint8_t>& value = \
field.value<avro::GenericFixed>().value(); \
string v; \
if (value.size() > 0) { \
v.resize(value.size()); \
memcpy(&v[0], &value[0], value.size()); \
} \
records.emplace_back(v); \
}
#define ENUM_VALUE \
records.emplace_back(field.value<avro::GenericEnum>().symbol())
#define PROCESS_RECORD(TYPE, ATYPE, VALUE) \
{ \
std::vector<TYPE> records; \
while (!reader->pastSync(offset + length) && reader->read(datum)) { \
const avro::GenericRecord& record = datum.value<avro::GenericRecord>(); \
const avro::GenericDatum& field = record.field(column); \
VALUE; \
} \
Tensor* output_tensor; \
OP_REQUIRES_OK(context, \
context->allocate_output( \
0, TensorShape({static_cast<int64>(records.size())}), \
&output_tensor)); \
for (size_t i = 0; i < records.size(); i++) { \
output_tensor->flat<TYPE>()(i) = std::move(records[i]); \
} \
}
switch (datum.value<avro::GenericRecord>().field(column).type()) {
case avro::AVRO_BOOL:
PROCESS_RECORD(bool, bool, BOOL_VALUE);
break;
case avro::AVRO_INT:
PROCESS_RECORD(int32, int32_t, INT32_VALUE);
break;
case avro::AVRO_LONG:
PROCESS_RECORD(int64, int64_t, INT64_VALUE);
break;
case avro::AVRO_FLOAT:
PROCESS_RECORD(float, float, FLOAT_VALUE);
break;
case avro::AVRO_DOUBLE:
PROCESS_RECORD(double, double, DOUBLE_VALUE);
break;
case avro::AVRO_STRING:
PROCESS_RECORD(tstring, string, STRING_VALUE);
break;
case avro::AVRO_BYTES:
PROCESS_RECORD(tstring, string, BYTES_VALUE);
break;
case avro::AVRO_FIXED:
PROCESS_RECORD(tstring, string, FIXED_VALUE);
break;
case avro::AVRO_ENUM:
PROCESS_RECORD(tstring, string, ENUM_VALUE);
break;
default:
OP_REQUIRES(
context, false,
errors::InvalidArgument(
"unsupported data type: ",
datum.value<avro::GenericRecord>().field(column).type()));
}
}