in tfx_bsl/cc/coders/example_encoder.cc [366:421]
absl::Status RecordBatchToExamples(
const arrow::RecordBatch& record_batch,
const FeatureNameToColumnsMap& nested_features,
std::vector<std::string>* serialized_examples) {
std::vector<std::pair<std::string, std::unique_ptr<FeatureEncoderInterface>>>
feature_encoders;
feature_encoders.reserve(record_batch.num_columns());
const std::vector<std::string> field_names =
record_batch.schema()->field_names();
TFX_BSL_RETURN_IF_ERROR(
ValidateProducedFeatureNames(field_names, nested_features));
for (const auto& name : field_names) {
const std::shared_ptr<arrow::Array> array =
record_batch.GetColumnByName(name);
feature_encoders.emplace_back(name, nullptr);
TFX_BSL_RETURN_IF_ERROR(
MakeFeatureEncoder(array, &feature_encoders.back().second));
if (nested_features.find(name) != nested_features.end()) {
// Check that actual nested depth of a field is consistent with
// `nested_features`.
TFX_BSL_RETURN_IF_ERROR(
feature_encoders.back().second->ValidateNumProducedFeatures(
nested_features.at(name).size(), name));
} else {
// Check that fields that are not nested are mapped into exactly one
// feature.
TFX_BSL_RETURN_IF_ERROR(
feature_encoders.back().second->ValidateNumProducedFeatures(1, name));
}
}
serialized_examples->resize(record_batch.num_rows());
for (int64_t example_index = 0; example_index < record_batch.num_rows();
++example_index) {
google::protobuf::Arena arena;
auto* example = google::protobuf::Arena::CreateMessage<tensorflow::Example>(&arena);
auto* feature_map = example->mutable_features()->mutable_feature();
for (const auto& p : feature_encoders) {
std::vector<tensorflow::Feature*> features;
if (nested_features.find(p.first) != nested_features.end()) {
for (const auto& component_name : nested_features.at(p.first)) {
features.push_back(&(*feature_map)[component_name]);
}
} else {
features.push_back(&(*feature_map)[p.first]);
}
TFX_BSL_RETURN_IF_ERROR(
p.second->EncodeFeatures(example_index, features));
}
if (!example->SerializeToString(&(*serialized_examples)[example_index])) {
return absl::DataLossError("Unable to serialize example");
}
}
return absl::OkStatus();
}