in tensorflow_io/core/kernels/bigquery/bigquery_lib.h [341:563]
Status ReadRecord(IteratorContext *ctx, std::vector<Tensor> *out_tensors,
const std::vector<string> &columns,
const std::vector<DataType> &output_types,
const std::vector<absl::any> &typed_default_values)
TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) override {
avro::decode(*this->decoder_, *this->datum_);
if (this->datum_->type() != avro::AVRO_RECORD) {
return errors::Unknown("record is not of AVRO_RECORD type");
}
const avro::GenericRecord &record =
this->datum_->template value<avro::GenericRecord>();
if (this->column_indices_.size() == 0) {
this->column_indices_.reserve(columns.size());
std::vector<DataType> expected_output_types;
expected_output_types.reserve(output_types.size());
for (size_t i = 0; i < columns.size(); i++) {
const string &column = columns[i];
size_t column_index = record.fieldIndex(column);
this->column_indices_.emplace_back(column_index);
const avro::GenericDatum &field = record.fieldAt(column_index);
DataType dtype;
switch (field.type()) {
case avro::AVRO_BOOL:
dtype = DT_BOOL;
break;
case avro::AVRO_INT:
dtype = DT_INT32;
break;
case avro::AVRO_LONG:
dtype = DT_INT64;
break;
case avro::AVRO_FLOAT:
dtype = DT_FLOAT;
break;
case avro::AVRO_DOUBLE:
dtype = DT_DOUBLE;
break;
case avro::AVRO_STRING:
dtype = DT_STRING;
break;
case avro::AVRO_BYTES:
dtype = DT_STRING;
break;
case avro::AVRO_FIXED:
dtype = DT_STRING;
break;
case avro::AVRO_ENUM:
dtype = DT_STRING;
break;
case avro::AVRO_ARRAY: {
auto values_vector = field.value<avro::GenericArray>().value();
if (values_vector.empty())
dtype = output_types[i];
else {
auto value_type = values_vector[0].type();
if (value_type == avro::AVRO_BOOL)
dtype = DT_BOOL;
else if (value_type == avro::AVRO_INT)
dtype = DT_INT32;
else if (value_type == avro::AVRO_LONG)
dtype = DT_INT64;
else if (value_type == avro::AVRO_FLOAT)
dtype = DT_FLOAT;
else if (value_type == avro::AVRO_DOUBLE)
dtype = DT_DOUBLE;
else if (value_type == avro::AVRO_STRING)
dtype = DT_STRING;
else
return errors::InvalidArgument(
"unsupported data type within AVRO_ARRAY ", value_type);
}
} break;
case avro::AVRO_NULL:
dtype = output_types[i];
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
}
if (dtype != output_types[i]) {
return errors::InvalidArgument(
"output type mismatch for column: ", columns[i],
" expected type: ", DataType_Name(dtype),
" actual type: ", DataType_Name(output_types[i]));
}
expected_output_types.emplace_back(dtype);
}
}
out_tensors->clear();
out_tensors->reserve(columns.size());
for (size_t i = 0; i < columns.size(); i++) {
Tensor tensor(ctx->allocator({}), output_types[i], {});
out_tensors->emplace_back(std::move(tensor));
const avro::GenericDatum &field =
record.fieldAt(this->column_indices_[i]);
switch (field.type()) {
case avro::AVRO_BOOL:
((*out_tensors)[i]).scalar<bool>()() = field.value<bool>();
break;
case avro::AVRO_INT:
((*out_tensors)[i]).scalar<int32>()() = field.value<int32_t>();
break;
case avro::AVRO_LONG:
((*out_tensors)[i]).scalar<int64>()() = field.value<int64_t>();
break;
case avro::AVRO_FLOAT:
((*out_tensors)[i]).scalar<float>()() = field.value<float>();
break;
case avro::AVRO_DOUBLE:
((*out_tensors)[i]).scalar<double>()() = field.value<double>();
break;
case avro::AVRO_STRING:
((*out_tensors)[i]).scalar<tstring>()() = field.value<string>();
break;
case avro::AVRO_ENUM:
((*out_tensors)[i]).scalar<tstring>()() =
field.value<avro::GenericEnum>().symbol();
break;
case avro::AVRO_BYTES: {
const std::vector<uint8_t> &field_value =
field.value<std::vector<uint8_t>>();
((*out_tensors)[i]).scalar<tstring>()() =
string((char *)&field_value[0], field_value.size());
} break;
case avro::AVRO_ARRAY: {
if (output_types[i] == DT_BOOL) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_BOOL, {size});
auto output_flat = output_tensor.flat<bool>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<bool>();
}
(*out_tensors)[i] = output_tensor;
} else if (output_types[i] == DT_INT32) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_INT32, {size});
auto output_flat = output_tensor.flat<int32>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<int32_t>();
}
(*out_tensors)[i] = output_tensor;
} else if (output_types[i] == DT_INT64) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_INT64, {size});
auto output_flat = output_tensor.flat<int64>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<int64_t>();
}
(*out_tensors)[i] = output_tensor;
} else if (output_types[i] == DT_FLOAT) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_FLOAT, {size});
auto output_flat = output_tensor.flat<float>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<float>();
}
(*out_tensors)[i] = output_tensor;
} else if (output_types[i] == DT_DOUBLE) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_DOUBLE, {size});
auto output_flat = output_tensor.flat<double>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<double>();
}
(*out_tensors)[i] = output_tensor;
} else if (output_types[i] == DT_STRING) {
auto values_vector = field.value<avro::GenericArray>().value();
unsigned int size = values_vector.size();
Tensor output_tensor(ctx->allocator({}), DT_STRING, {size});
auto output_flat = output_tensor.flat<tstring>();
for (unsigned int idx = 0; idx < size; idx++) {
output_flat(idx) = values_vector[idx].value<string>();
}
(*out_tensors)[i] = output_tensor;
}
} break;
case avro::AVRO_NULL:
switch (output_types[i]) {
case DT_BOOL:
((*out_tensors)[i]).scalar<bool>()() =
absl::any_cast<bool>(typed_default_values[i]);
break;
case DT_INT32:
((*out_tensors)[i]).scalar<int32>()() =
absl::any_cast<int32_t>(typed_default_values[i]);
break;
case DT_INT64:
((*out_tensors)[i]).scalar<int64>()() =
absl::any_cast<int64_t>(typed_default_values[i]);
break;
case DT_FLOAT:
((*out_tensors)[i]).scalar<float>()() =
absl::any_cast<float>(typed_default_values[i]);
break;
case DT_DOUBLE:
((*out_tensors)[i]).scalar<double>()() =
absl::any_cast<double>(typed_default_values[i]);
break;
case DT_STRING:
((*out_tensors)[i]).scalar<tstring>()() =
absl::any_cast<string>(typed_default_values[i]);
break;
default:
return errors::InvalidArgument(
"unsupported data type against AVRO_NULL: ", output_types[i]);
}
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
}
}
return Status::OK();
}