Status ReadRecord()

in tensorflow_io/core/kernels/bigquery/bigquery_lib.h [341:563]


  Status ReadRecord(IteratorContext *ctx, std::vector<Tensor> *out_tensors,
                    const std::vector<string> &columns,
                    const std::vector<DataType> &output_types,
                    const std::vector<absl::any> &typed_default_values)
      TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) override {
    avro::decode(*this->decoder_, *this->datum_);
    if (this->datum_->type() != avro::AVRO_RECORD) {
      return errors::Unknown("record is not of AVRO_RECORD type");
    }
    const avro::GenericRecord &record =
        this->datum_->template value<avro::GenericRecord>();

    if (this->column_indices_.size() == 0) {
      this->column_indices_.reserve(columns.size());
      std::vector<DataType> expected_output_types;
      expected_output_types.reserve(output_types.size());
      for (size_t i = 0; i < columns.size(); i++) {
        const string &column = columns[i];
        size_t column_index = record.fieldIndex(column);
        this->column_indices_.emplace_back(column_index);

        const avro::GenericDatum &field = record.fieldAt(column_index);
        DataType dtype;
        switch (field.type()) {
          case avro::AVRO_BOOL:
            dtype = DT_BOOL;
            break;
          case avro::AVRO_INT:
            dtype = DT_INT32;
            break;
          case avro::AVRO_LONG:
            dtype = DT_INT64;
            break;
          case avro::AVRO_FLOAT:
            dtype = DT_FLOAT;
            break;
          case avro::AVRO_DOUBLE:
            dtype = DT_DOUBLE;
            break;
          case avro::AVRO_STRING:
            dtype = DT_STRING;
            break;
          case avro::AVRO_BYTES:
            dtype = DT_STRING;
            break;
          case avro::AVRO_FIXED:
            dtype = DT_STRING;
            break;
          case avro::AVRO_ENUM:
            dtype = DT_STRING;
            break;
          case avro::AVRO_ARRAY: {
            auto values_vector = field.value<avro::GenericArray>().value();
            if (values_vector.empty())
              dtype = output_types[i];
            else {
              auto value_type = values_vector[0].type();
              if (value_type == avro::AVRO_BOOL)
                dtype = DT_BOOL;
              else if (value_type == avro::AVRO_INT)
                dtype = DT_INT32;
              else if (value_type == avro::AVRO_LONG)
                dtype = DT_INT64;
              else if (value_type == avro::AVRO_FLOAT)
                dtype = DT_FLOAT;
              else if (value_type == avro::AVRO_DOUBLE)
                dtype = DT_DOUBLE;
              else if (value_type == avro::AVRO_STRING)
                dtype = DT_STRING;
              else
                return errors::InvalidArgument(
                    "unsupported data type within AVRO_ARRAY ", value_type);
            }
          } break;
          case avro::AVRO_NULL:
            dtype = output_types[i];
            break;
          default:
            return errors::InvalidArgument("unsupported data type: ",
                                           field.type());
        }
        if (dtype != output_types[i]) {
          return errors::InvalidArgument(
              "output type mismatch for column: ", columns[i],
              " expected type: ", DataType_Name(dtype),
              " actual type: ", DataType_Name(output_types[i]));
        }
        expected_output_types.emplace_back(dtype);
      }
    }

    out_tensors->clear();
    out_tensors->reserve(columns.size());
    for (size_t i = 0; i < columns.size(); i++) {
      Tensor tensor(ctx->allocator({}), output_types[i], {});
      out_tensors->emplace_back(std::move(tensor));
      const avro::GenericDatum &field =
          record.fieldAt(this->column_indices_[i]);
      switch (field.type()) {
        case avro::AVRO_BOOL:
          ((*out_tensors)[i]).scalar<bool>()() = field.value<bool>();
          break;
        case avro::AVRO_INT:
          ((*out_tensors)[i]).scalar<int32>()() = field.value<int32_t>();
          break;
        case avro::AVRO_LONG:
          ((*out_tensors)[i]).scalar<int64>()() = field.value<int64_t>();
          break;
        case avro::AVRO_FLOAT:
          ((*out_tensors)[i]).scalar<float>()() = field.value<float>();
          break;
        case avro::AVRO_DOUBLE:
          ((*out_tensors)[i]).scalar<double>()() = field.value<double>();
          break;
        case avro::AVRO_STRING:
          ((*out_tensors)[i]).scalar<tstring>()() = field.value<string>();
          break;
        case avro::AVRO_ENUM:
          ((*out_tensors)[i]).scalar<tstring>()() =
              field.value<avro::GenericEnum>().symbol();
          break;
        case avro::AVRO_BYTES: {
          const std::vector<uint8_t> &field_value =
              field.value<std::vector<uint8_t>>();
          ((*out_tensors)[i]).scalar<tstring>()() =
              string((char *)&field_value[0], field_value.size());
        } break;
        case avro::AVRO_ARRAY: {
          if (output_types[i] == DT_BOOL) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_BOOL, {size});
            auto output_flat = output_tensor.flat<bool>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<bool>();
            }
            (*out_tensors)[i] = output_tensor;
          } else if (output_types[i] == DT_INT32) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_INT32, {size});
            auto output_flat = output_tensor.flat<int32>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<int32_t>();
            }
            (*out_tensors)[i] = output_tensor;
          } else if (output_types[i] == DT_INT64) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_INT64, {size});
            auto output_flat = output_tensor.flat<int64>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<int64_t>();
            }
            (*out_tensors)[i] = output_tensor;
          } else if (output_types[i] == DT_FLOAT) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_FLOAT, {size});
            auto output_flat = output_tensor.flat<float>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<float>();
            }
            (*out_tensors)[i] = output_tensor;
          } else if (output_types[i] == DT_DOUBLE) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_DOUBLE, {size});
            auto output_flat = output_tensor.flat<double>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<double>();
            }
            (*out_tensors)[i] = output_tensor;
          } else if (output_types[i] == DT_STRING) {
            auto values_vector = field.value<avro::GenericArray>().value();
            unsigned int size = values_vector.size();
            Tensor output_tensor(ctx->allocator({}), DT_STRING, {size});
            auto output_flat = output_tensor.flat<tstring>();
            for (unsigned int idx = 0; idx < size; idx++) {
              output_flat(idx) = values_vector[idx].value<string>();
            }
            (*out_tensors)[i] = output_tensor;
          }
        } break;
        case avro::AVRO_NULL:
          switch (output_types[i]) {
            case DT_BOOL:
              ((*out_tensors)[i]).scalar<bool>()() =
                  absl::any_cast<bool>(typed_default_values[i]);
              break;
            case DT_INT32:
              ((*out_tensors)[i]).scalar<int32>()() =
                  absl::any_cast<int32_t>(typed_default_values[i]);
              break;
            case DT_INT64:
              ((*out_tensors)[i]).scalar<int64>()() =
                  absl::any_cast<int64_t>(typed_default_values[i]);
              break;
            case DT_FLOAT:
              ((*out_tensors)[i]).scalar<float>()() =
                  absl::any_cast<float>(typed_default_values[i]);
              break;
            case DT_DOUBLE:
              ((*out_tensors)[i]).scalar<double>()() =
                  absl::any_cast<double>(typed_default_values[i]);
              break;
            case DT_STRING:
              ((*out_tensors)[i]).scalar<tstring>()() =
                  absl::any_cast<string>(typed_default_values[i]);
              break;
            default:
              return errors::InvalidArgument(
                  "unsupported data type against AVRO_NULL: ", output_types[i]);
          }
          break;
        default:
          return errors::InvalidArgument("unsupported data type: ",
                                         field.type());
      }
    }

    return Status::OK();
  }