in tensorflow_io/core/kernels/kafka_kernels_deprecated.cc [854:983]
void Compute(OpKernelContext* context) override {
const Tensor* input_tensor;
OP_REQUIRES_OK(context, context->input("input", &input_tensor));
DecodeAvroResource* resource;
std::unique_ptr<DecodeAvroResource> resource_scope;
if (context->input_dtype(1) == DT_RESOURCE) {
OP_REQUIRES_OK(context,
GetResourceFromContext(context, "schema", &resource));
} else {
const Tensor* schema_tensor;
OP_REQUIRES_OK(context, context->input("schema", &schema_tensor));
const string& schema = schema_tensor->scalar<tstring>()();
resource_scope.reset(new DecodeAvroResource(env_));
OP_REQUIRES_OK(context, resource_scope->Init(schema));
resource_scope->Ref();
resource = resource_scope.get();
}
core::ScopedUnref unref(resource);
std::vector<Tensor*> value;
value.reserve(resource->avro_schema().root()->names());
for (size_t i = 0; i < resource->avro_schema().root()->names(); i++) {
Tensor* value_tensor = nullptr;
OP_REQUIRES_OK(context, context->allocate_output(static_cast<int64>(i),
input_tensor->shape(),
&value_tensor));
value.push_back(value_tensor);
}
avro::GenericDatum datum(resource->avro_schema());
for (int64 entry_index = 0; entry_index < input_tensor->NumElements();
entry_index++) {
const string& entry = input_tensor->flat<tstring>()(entry_index);
std::unique_ptr<avro::InputStream> in =
avro::memoryInputStream((const uint8_t*)entry.data(), entry.size());
avro::DecoderPtr d = avro::binaryDecoder();
d->init(*in);
avro::decode(*d, datum);
const avro::GenericRecord& record = datum.value<avro::GenericRecord>();
for (int i = 0; i < resource->avro_schema().root()->names(); i++) {
const avro::GenericDatum& field = record.fieldAt(i);
switch (field.type()) {
case avro::AVRO_NULL:
switch (context->expected_output_dtype(i)) {
case DT_BOOL:
value[i]->flat<bool>()(entry_index) = false;
break;
case DT_INT32:
value[i]->flat<int32>()(entry_index) = 0;
break;
case DT_INT64:
value[i]->flat<int64>()(entry_index) = 0;
break;
case DT_FLOAT:
value[i]->flat<float>()(entry_index) = 0.0;
break;
case DT_DOUBLE:
value[i]->flat<double>()(entry_index) = 0.0;
break;
case DT_STRING:
value[i]->flat<tstring>()(entry_index) = "";
break;
default:
OP_REQUIRES(context, false,
errors::InvalidArgument(
"unsupported data type against AVRO_NULL: ",
field.type()));
}
break;
case avro::AVRO_BOOL:
value[i]->flat<bool>()(entry_index) = field.value<bool>();
break;
case avro::AVRO_INT:
value[i]->flat<int32>()(entry_index) = field.value<int32_t>();
break;
case avro::AVRO_LONG:
value[i]->flat<int64>()(entry_index) = field.value<int64_t>();
break;
case avro::AVRO_FLOAT:
value[i]->flat<float>()(entry_index) = field.value<float>();
break;
case avro::AVRO_DOUBLE:
value[i]->flat<double>()(entry_index) = field.value<double>();
break;
case avro::AVRO_STRING: {
// make a concrete explicit copy as otherwise avro may override the
// underlying buffer.
const string& field_value = field.value<string>();
string v;
if (field_value.size() > 0) {
v.resize(field_value.size());
memcpy(&v[0], &field_value[0], field_value.size());
}
value[i]->flat<tstring>()(entry_index) = v;
} break;
case avro::AVRO_BYTES: {
const std::vector<uint8_t>& field_value =
field.value<std::vector<uint8_t>>();
string v;
if (field_value.size() > 0) {
v.resize(field_value.size());
memcpy(&v[0], &field_value[0], field_value.size());
}
value[i]->flat<tstring>()(entry_index) = std::move(v);
} break;
case avro::AVRO_FIXED: {
const std::vector<uint8_t>& field_value =
field.value<avro::GenericFixed>().value();
string v;
if (field_value.size() > 0) {
v.resize(field_value.size());
memcpy(&v[0], &field_value[0], field_value.size());
}
value[i]->flat<tstring>()(entry_index) = std::move(v);
} break;
case avro::AVRO_ENUM:
value[i]->flat<tstring>()(entry_index) =
field.value<avro::GenericEnum>().symbol();
break;
default:
OP_REQUIRES(context, false,
errors::InvalidArgument("unsupported data type: ",
field.type()));
}
}
}
}