void Compute()

in tensorflow_io/core/kernels/kafka_kernels_deprecated.cc [854:983]


  void Compute(OpKernelContext* context) override {
    const Tensor* input_tensor;
    OP_REQUIRES_OK(context, context->input("input", &input_tensor));

    DecodeAvroResource* resource;
    std::unique_ptr<DecodeAvroResource> resource_scope;
    if (context->input_dtype(1) == DT_RESOURCE) {
      OP_REQUIRES_OK(context,
                     GetResourceFromContext(context, "schema", &resource));
    } else {
      const Tensor* schema_tensor;
      OP_REQUIRES_OK(context, context->input("schema", &schema_tensor));
      const string& schema = schema_tensor->scalar<tstring>()();

      resource_scope.reset(new DecodeAvroResource(env_));
      OP_REQUIRES_OK(context, resource_scope->Init(schema));
      resource_scope->Ref();
      resource = resource_scope.get();
    }
    core::ScopedUnref unref(resource);

    std::vector<Tensor*> value;
    value.reserve(resource->avro_schema().root()->names());
    for (size_t i = 0; i < resource->avro_schema().root()->names(); i++) {
      Tensor* value_tensor = nullptr;
      OP_REQUIRES_OK(context, context->allocate_output(static_cast<int64>(i),
                                                       input_tensor->shape(),
                                                       &value_tensor));
      value.push_back(value_tensor);
    }

    avro::GenericDatum datum(resource->avro_schema());
    for (int64 entry_index = 0; entry_index < input_tensor->NumElements();
         entry_index++) {
      const string& entry = input_tensor->flat<tstring>()(entry_index);
      std::unique_ptr<avro::InputStream> in =
          avro::memoryInputStream((const uint8_t*)entry.data(), entry.size());

      avro::DecoderPtr d = avro::binaryDecoder();
      d->init(*in);
      avro::decode(*d, datum);
      const avro::GenericRecord& record = datum.value<avro::GenericRecord>();
      for (int i = 0; i < resource->avro_schema().root()->names(); i++) {
        const avro::GenericDatum& field = record.fieldAt(i);
        switch (field.type()) {
          case avro::AVRO_NULL:
            switch (context->expected_output_dtype(i)) {
              case DT_BOOL:
                value[i]->flat<bool>()(entry_index) = false;
                break;
              case DT_INT32:
                value[i]->flat<int32>()(entry_index) = 0;
                break;
              case DT_INT64:
                value[i]->flat<int64>()(entry_index) = 0;
                break;
              case DT_FLOAT:
                value[i]->flat<float>()(entry_index) = 0.0;
                break;
              case DT_DOUBLE:
                value[i]->flat<double>()(entry_index) = 0.0;
                break;
              case DT_STRING:
                value[i]->flat<tstring>()(entry_index) = "";
                break;
              default:
                OP_REQUIRES(context, false,
                            errors::InvalidArgument(
                                "unsupported data type against AVRO_NULL: ",
                                field.type()));
            }
            break;
          case avro::AVRO_BOOL:
            value[i]->flat<bool>()(entry_index) = field.value<bool>();
            break;
          case avro::AVRO_INT:
            value[i]->flat<int32>()(entry_index) = field.value<int32_t>();
            break;
          case avro::AVRO_LONG:
            value[i]->flat<int64>()(entry_index) = field.value<int64_t>();
            break;
          case avro::AVRO_FLOAT:
            value[i]->flat<float>()(entry_index) = field.value<float>();
            break;
          case avro::AVRO_DOUBLE:
            value[i]->flat<double>()(entry_index) = field.value<double>();
            break;
          case avro::AVRO_STRING: {
            // make a concrete explicit copy as otherwise avro may override the
            // underlying buffer.
            const string& field_value = field.value<string>();
            string v;
            if (field_value.size() > 0) {
              v.resize(field_value.size());
              memcpy(&v[0], &field_value[0], field_value.size());
            }
            value[i]->flat<tstring>()(entry_index) = v;
          } break;
          case avro::AVRO_BYTES: {
            const std::vector<uint8_t>& field_value =
                field.value<std::vector<uint8_t>>();
            string v;
            if (field_value.size() > 0) {
              v.resize(field_value.size());
              memcpy(&v[0], &field_value[0], field_value.size());
            }
            value[i]->flat<tstring>()(entry_index) = std::move(v);
          } break;
          case avro::AVRO_FIXED: {
            const std::vector<uint8_t>& field_value =
                field.value<avro::GenericFixed>().value();
            string v;
            if (field_value.size() > 0) {
              v.resize(field_value.size());
              memcpy(&v[0], &field_value[0], field_value.size());
            }
            value[i]->flat<tstring>()(entry_index) = std::move(v);
          } break;
          case avro::AVRO_ENUM:
            value[i]->flat<tstring>()(entry_index) =
                field.value<avro::GenericEnum>().symbol();
            break;
          default:
            OP_REQUIRES(context, false,
                        errors::InvalidArgument("unsupported data type: ",
                                                field.type()));
        }
      }
    }
  }