in tfx_bsl/cc/coders/example_decoder.cc [1100:1241]
absl::Status SchemalessIncrementalSequenceExamplesDecoder::Add(
const tensorflow::SequenceExample& sequence_example) {
TFX_BSL_RETURN_IF_ERROR(DecodeTopLevelFeatures(
sequence_example.context().feature(), all_context_features_,
num_examples_processed_, context_feature_decoders_));
if (sequence_example.has_feature_lists()) {
feature_lists_observed_ = true;
}
for (const auto& p : sequence_example.feature_lists().feature_list()) {
const std::string& sequence_feature_name = p.first;
const tensorflow::FeatureList& sequence_feature_list = p.second;
FeatureListDecoder* sequence_feature_decoder = nullptr;
UnknownTypeFeatureListDecoder* unknown_type_sequence_feature_decoder =
nullptr;
// Determine if there is an existing decoder for this sequence feature.
const auto it = sequence_feature_decoders_.find(sequence_feature_name);
if (it != sequence_feature_decoders_.end()) {
if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
it->second)) {
sequence_feature_decoder =
absl::get<std::unique_ptr<FeatureListDecoder>>(it->second).get();
} else {
unknown_type_sequence_feature_decoder =
absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(
it->second)
.get();
}
}
// If there was an unknown type decoder for this sequence feature,
// determine if its type can be determined from the current sequence
// example. If so, convert it to a feature list decoder of the
// appropriate type.
if (unknown_type_sequence_feature_decoder) {
for (const auto& feature : sequence_feature_list.feature()) {
if (feature.kind_case() != tensorflow::Feature::KIND_NOT_SET) {
TFX_BSL_RETURN_IF_ERROR(
unknown_type_sequence_feature_decoder->ConvertToTypedListDecoder(
feature.kind_case(), &sequence_feature_decoder));
sequence_feature_decoders_[sequence_feature_name] =
absl::WrapUnique(sequence_feature_decoder);
unknown_type_sequence_feature_decoder = nullptr;
break;
}
}
}
if (sequence_feature_decoder == nullptr &&
unknown_type_sequence_feature_decoder == nullptr) {
// If there is no existing decoder for this sequence feature, create
// one.
if (sequence_feature_list.feature_size() == 0) {
unknown_type_sequence_feature_decoder =
UnknownTypeFeatureListDecoder::Make();
} else {
// Determine if the type can be identified from any of the features
// in the feature list. Use the first type found. If there is a type
// inconsistency, it will be found and addressed in the decoder.
tensorflow::Feature::KindCase feature_kind_case =
tensorflow::Feature::KIND_NOT_SET;
for (const auto& feature : sequence_feature_list.feature()) {
if (feature.kind_case() != tensorflow::Feature::KIND_NOT_SET) {
feature_kind_case = feature.kind_case();
break;
}
}
switch (feature_kind_case) {
case tensorflow::Feature::kInt64List:
sequence_feature_decoder = IntListDecoder::Make();
break;
case tensorflow::Feature::kFloatList:
sequence_feature_decoder = FloatListDecoder::Make();
break;
case tensorflow::Feature::kBytesList:
sequence_feature_decoder = BytesListDecoder::Make();
break;
case tensorflow::Feature::KIND_NOT_SET:
unknown_type_sequence_feature_decoder =
UnknownTypeFeatureListDecoder::Make();
break;
}
} // end clause processing a feature list with > 0 features.
if (unknown_type_sequence_feature_decoder) {
// Handle the situation in which we see a sequence feature of
// unknown type for the first time after already having processed
// some sequence examples. In that case, append a null for each
// sequence example that has already been processed (in which this
// sequence feature was not seen), excluding the current sequence
// example.
for (int i = 0; i < num_examples_processed_; ++i) {
unknown_type_sequence_feature_decoder->AppendNull();
}
sequence_feature_decoders_[sequence_feature_name] =
absl::WrapUnique(unknown_type_sequence_feature_decoder);
} else if (sequence_feature_decoder) {
// Similarly handle the situation in which we see a sequence feature
// of a known type for the first time after already having processed
// some sequence examples.
for (int i = 0; i < num_examples_processed_; ++i) {
TFX_BSL_RETURN_IF_ERROR(sequence_feature_decoder->AppendNull());
}
sequence_feature_decoders_[sequence_feature_name] =
absl::WrapUnique(sequence_feature_decoder);
}
} // End adding new decoder.
// Decode the current feature list using the appropriate feature
// decoder.
absl::Status status;
if (sequence_feature_decoder) {
status =
sequence_feature_decoder->DecodeFeatureList(sequence_feature_list);
} else if (unknown_type_sequence_feature_decoder) {
status = unknown_type_sequence_feature_decoder->DecodeFeatureList(
sequence_feature_list);
}
if (!status.ok()) {
return absl::Status(
status.code(),
absl::StrCat(status.message(), " for sequence feature \"",
sequence_feature_name, "\""));
}
} // End processing the current feature list.
// Calling FinishFeatureList ensures that a Null is appended for a given
// feature if it was not decoded (e.g., because it was not seen) in the
// current SequenceExample.
for (const auto& p : sequence_feature_decoders_) {
if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
p.second)) {
TFX_BSL_RETURN_IF_ERROR(
absl::get<std::unique_ptr<FeatureListDecoder>>(p.second)
.get()
->FinishFeatureList());
} else {
TFX_BSL_RETURN_IF_ERROR(
absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(p.second)
.get()
->FinishFeatureList());
}
}
++num_examples_processed_;
return absl::OkStatus();
}