absl::Status SchemalessIncrementalSequenceExamplesDecoder::Finish()

in tfx_bsl/cc/coders/example_decoder.cc [1243:1304]


absl::Status SchemalessIncrementalSequenceExamplesDecoder::Finish(
    std::shared_ptr<arrow::RecordBatch>* result) {
  std::vector<std::shared_ptr<arrow::Array>> arrays;
  std::vector<std::shared_ptr<arrow::Field>> fields;
  TFX_BSL_RETURN_IF_ERROR(
      FinishTopLevelFeatures(all_context_features_, context_feature_decoders_,
                             num_examples_processed_, &arrays, &fields));

  std::vector<std::shared_ptr<arrow::Array>> sequence_feature_arrays;
  std::vector<std::shared_ptr<arrow::Field>> sequence_feature_fields;
  for (const auto& sequence_feature_decoder : sequence_feature_decoders_) {
    sequence_feature_arrays.emplace_back();
    if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
            sequence_feature_decoder.second)) {
      TFX_BSL_RETURN_IF_ERROR(absl::get<std::unique_ptr<FeatureListDecoder>>(
                                  sequence_feature_decoder.second)
                                  .get()
                                  ->Finish(&sequence_feature_arrays.back()));
    } else {
      TFX_BSL_RETURN_IF_ERROR(
          absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(
              sequence_feature_decoder.second)
              .get()
              ->Finish(&sequence_feature_arrays.back()));
    }
    sequence_feature_fields.push_back(
        arrow::field(sequence_feature_decoder.first,
                     sequence_feature_arrays.back()->type()));
  }  // end getting arrays for every sequence feature.

  if (!sequence_feature_arrays.empty()) {
    const arrow::Result<std::shared_ptr<arrow::StructArray>>&
        result_or_sequence_feature_array = arrow::StructArray::Make(
            sequence_feature_arrays, sequence_feature_fields);
    absl::Status status =
        FromArrowStatus((result_or_sequence_feature_array.status()));
    if (status != absl::OkStatus()) {
      return absl::InternalError(absl::StrCat(
          "Attempt to make struct array from sequence features failed with "
          "status: ",
          status.message()));
    }
    arrays.push_back(result_or_sequence_feature_array.ValueOrDie());
    fields.push_back(
        arrow::field(sequence_feature_column_name_, arrays.back()->type()));
  } else if (feature_lists_observed_) {
    // If feature lists but no sequence features have been observed, still
    // add a sequence feature column containing a StructArray, but do not
    // include any child arrays in it.
    arrays.push_back(std::make_shared<arrow::StructArray>(
        std::make_shared<arrow::StructType>(sequence_feature_fields),
        num_examples_processed_, sequence_feature_arrays));
    fields.push_back(
        arrow::field(sequence_feature_column_name_, arrays.back()->type()));
  }

  *result = arrow::RecordBatch::Make(arrow::schema(fields),
                                     num_examples_processed_, arrays);

  Reset();
  return absl::OkStatus();
}