absl::Status RecordBatchToExamples()

in tfx_bsl/cc/coders/example_encoder.cc [366:421]


absl::Status RecordBatchToExamples(
    const arrow::RecordBatch& record_batch,
    const FeatureNameToColumnsMap& nested_features,
    std::vector<std::string>* serialized_examples) {
  std::vector<std::pair<std::string, std::unique_ptr<FeatureEncoderInterface>>>
      feature_encoders;
  feature_encoders.reserve(record_batch.num_columns());
  const std::vector<std::string> field_names =
      record_batch.schema()->field_names();
  TFX_BSL_RETURN_IF_ERROR(
      ValidateProducedFeatureNames(field_names, nested_features));

  for (const auto& name : field_names) {
    const std::shared_ptr<arrow::Array> array =
        record_batch.GetColumnByName(name);
    feature_encoders.emplace_back(name, nullptr);
    TFX_BSL_RETURN_IF_ERROR(
        MakeFeatureEncoder(array, &feature_encoders.back().second));
    if (nested_features.find(name) != nested_features.end()) {
      // Check that actual nested depth of a field is consistent with
      // `nested_features`.
      TFX_BSL_RETURN_IF_ERROR(
          feature_encoders.back().second->ValidateNumProducedFeatures(
              nested_features.at(name).size(), name));
    } else {
      // Check that fields that are not nested are mapped into exactly one
      // feature.
      TFX_BSL_RETURN_IF_ERROR(
          feature_encoders.back().second->ValidateNumProducedFeatures(1, name));
    }
  }

  serialized_examples->resize(record_batch.num_rows());
  for (int64_t example_index = 0; example_index < record_batch.num_rows();
       ++example_index) {
    google::protobuf::Arena arena;
    auto* example = google::protobuf::Arena::CreateMessage<tensorflow::Example>(&arena);
    auto* feature_map = example->mutable_features()->mutable_feature();
    for (const auto& p : feature_encoders) {
      std::vector<tensorflow::Feature*> features;
      if (nested_features.find(p.first) != nested_features.end()) {
        for (const auto& component_name : nested_features.at(p.first)) {
          features.push_back(&(*feature_map)[component_name]);
        }
      } else {
        features.push_back(&(*feature_map)[p.first]);
      }
      TFX_BSL_RETURN_IF_ERROR(
          p.second->EncodeFeatures(example_index, features));
    }
    if (!example->SerializeToString(&(*serialized_examples)[example_index])) {
      return absl::DataLossError("Unable to serialize example");
    }
  }
  return absl::OkStatus();
}