in tfx_bsl/cc/coders/example_decoder.cc [1243:1304]
absl::Status SchemalessIncrementalSequenceExamplesDecoder::Finish(
std::shared_ptr<arrow::RecordBatch>* result) {
std::vector<std::shared_ptr<arrow::Array>> arrays;
std::vector<std::shared_ptr<arrow::Field>> fields;
TFX_BSL_RETURN_IF_ERROR(
FinishTopLevelFeatures(all_context_features_, context_feature_decoders_,
num_examples_processed_, &arrays, &fields));
std::vector<std::shared_ptr<arrow::Array>> sequence_feature_arrays;
std::vector<std::shared_ptr<arrow::Field>> sequence_feature_fields;
for (const auto& sequence_feature_decoder : sequence_feature_decoders_) {
sequence_feature_arrays.emplace_back();
if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
sequence_feature_decoder.second)) {
TFX_BSL_RETURN_IF_ERROR(absl::get<std::unique_ptr<FeatureListDecoder>>(
sequence_feature_decoder.second)
.get()
->Finish(&sequence_feature_arrays.back()));
} else {
TFX_BSL_RETURN_IF_ERROR(
absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(
sequence_feature_decoder.second)
.get()
->Finish(&sequence_feature_arrays.back()));
}
sequence_feature_fields.push_back(
arrow::field(sequence_feature_decoder.first,
sequence_feature_arrays.back()->type()));
} // end getting arrays for every sequence feature.
if (!sequence_feature_arrays.empty()) {
const arrow::Result<std::shared_ptr<arrow::StructArray>>&
result_or_sequence_feature_array = arrow::StructArray::Make(
sequence_feature_arrays, sequence_feature_fields);
absl::Status status =
FromArrowStatus((result_or_sequence_feature_array.status()));
if (status != absl::OkStatus()) {
return absl::InternalError(absl::StrCat(
"Attempt to make struct array from sequence features failed with "
"status: ",
status.message()));
}
arrays.push_back(result_or_sequence_feature_array.ValueOrDie());
fields.push_back(
arrow::field(sequence_feature_column_name_, arrays.back()->type()));
} else if (feature_lists_observed_) {
// If feature lists but no sequence features have been observed, still
// add a sequence feature column containing a StructArray, but do not
// include any child arrays in it.
arrays.push_back(std::make_shared<arrow::StructArray>(
std::make_shared<arrow::StructType>(sequence_feature_fields),
num_examples_processed_, sequence_feature_arrays));
fields.push_back(
arrow::field(sequence_feature_column_name_, arrays.back()->type()));
}
*result = arrow::RecordBatch::Make(arrow::schema(fields),
num_examples_processed_, arrays);
Reset();
return absl::OkStatus();
}