void Compute()

in tensorflow_io/core/kernels/avro_kernels.cc [162:290]


  void Compute(OpKernelContext* context) override {
    const Tensor& filename_tensor = context->input(0);
    string filename = filename_tensor.scalar<tstring>()();

    const Tensor& schema_tensor = context->input(1);
    string schema = schema_tensor.scalar<tstring>()();

    const Tensor& column_tensor = context->input(2);
    string column = column_tensor.scalar<tstring>()();

    const Tensor& memory_tensor = context->input(3);
    string memory = memory_tensor.scalar<tstring>()();

    const Tensor& offset_tensor = context->input(4);
    const int64 offset = offset_tensor.scalar<int64>()();

    const Tensor& length_tensor = context->input(5);
    int64 length = length_tensor.scalar<int64>()();

    avro::ValidSchema reader_schema;

    string error;
    std::istringstream ss(schema);
    OP_REQUIRES(context, avro::compileJsonSchema(ss, reader_schema, error),
                errors::Unimplemented("Avro schema error: ", error));

    std::unique_ptr<SizedRandomAccessFile> file(new SizedRandomAccessFile(
        env_, filename, memory.data(), memory.size()));
    uint64 size;
    OP_REQUIRES_OK(context, file->GetFileSize(&size));

    if (length < 0) {
      length = size - offset;
    }

    avro::GenericDatum datum(reader_schema);

    std::unique_ptr<avro::InputStream> stream(new AvroInputStream(file.get()));
    std::unique_ptr<avro::DataFileReader<avro::GenericDatum>> reader(
        new avro::DataFileReader<avro::GenericDatum>(std::move(stream),
                                                     reader_schema));

    if (offset != 0) {
      reader->sync(offset);
    }

#define BOOL_VALUE records.push_back(field.value<bool>())
#define INT32_VALUE records.emplace_back(field.value<int32_t>())
#define INT64_VALUE records.emplace_back(field.value<int64_t>())
#define FLOAT_VALUE records.emplace_back(field.value<float>())
#define DOUBLE_VALUE records.emplace_back(field.value<double>())
#define STRING_VALUE records.emplace_back(field.value<string>())
#define BYTES_VALUE                                                          \
  {                                                                          \
    const std::vector<uint8_t>& value = field.value<std::vector<uint8_t>>(); \
    string v;                                                                \
    if (value.size() > 0) {                                                  \
      v.resize(value.size());                                                \
      memcpy(&v[0], &value[0], value.size());                                \
    }                                                                        \
    records.emplace_back(v);                                                 \
  }
#define FIXED_VALUE                                \
  {                                                \
    const std::vector<uint8_t>& value =            \
        field.value<avro::GenericFixed>().value(); \
    string v;                                      \
    if (value.size() > 0) {                        \
      v.resize(value.size());                      \
      memcpy(&v[0], &value[0], value.size());      \
    }                                              \
    records.emplace_back(v);                       \
  }
#define ENUM_VALUE \
  records.emplace_back(field.value<avro::GenericEnum>().symbol())

#define PROCESS_RECORD(TYPE, ATYPE, VALUE)                                    \
  {                                                                           \
    std::vector<TYPE> records;                                                \
    while (!reader->pastSync(offset + length) && reader->read(datum)) {       \
      const avro::GenericRecord& record = datum.value<avro::GenericRecord>(); \
      const avro::GenericDatum& field = record.field(column);                 \
      VALUE;                                                                  \
    }                                                                         \
    Tensor* output_tensor;                                                    \
    OP_REQUIRES_OK(context,                                                   \
                   context->allocate_output(                                  \
                       0, TensorShape({static_cast<int64>(records.size())}),  \
                       &output_tensor));                                      \
    for (size_t i = 0; i < records.size(); i++) {                             \
      output_tensor->flat<TYPE>()(i) = std::move(records[i]);                 \
    }                                                                         \
  }
    switch (datum.value<avro::GenericRecord>().field(column).type()) {
      case avro::AVRO_BOOL:
        PROCESS_RECORD(bool, bool, BOOL_VALUE);
        break;
      case avro::AVRO_INT:
        PROCESS_RECORD(int32, int32_t, INT32_VALUE);
        break;
      case avro::AVRO_LONG:
        PROCESS_RECORD(int64, int64_t, INT64_VALUE);
        break;
      case avro::AVRO_FLOAT:
        PROCESS_RECORD(float, float, FLOAT_VALUE);
        break;
      case avro::AVRO_DOUBLE:
        PROCESS_RECORD(double, double, DOUBLE_VALUE);
        break;
      case avro::AVRO_STRING:
        PROCESS_RECORD(tstring, string, STRING_VALUE);
        break;
      case avro::AVRO_BYTES:
        PROCESS_RECORD(tstring, string, BYTES_VALUE);
        break;
      case avro::AVRO_FIXED:
        PROCESS_RECORD(tstring, string, FIXED_VALUE);
        break;
      case avro::AVRO_ENUM:
        PROCESS_RECORD(tstring, string, ENUM_VALUE);
        break;
      default:
        OP_REQUIRES(
            context, false,
            errors::InvalidArgument(
                "unsupported data type: ",
                datum.value<avro::GenericRecord>().field(column).type()));
    }
  }