absl::Status SchemalessIncrementalSequenceExamplesDecoder::Add()

in tfx_bsl/cc/coders/example_decoder.cc [1100:1241]


absl::Status SchemalessIncrementalSequenceExamplesDecoder::Add(
    const tensorflow::SequenceExample& sequence_example) {
  TFX_BSL_RETURN_IF_ERROR(DecodeTopLevelFeatures(
      sequence_example.context().feature(), all_context_features_,
      num_examples_processed_, context_feature_decoders_));
  if (sequence_example.has_feature_lists()) {
    feature_lists_observed_ = true;
  }
  for (const auto& p : sequence_example.feature_lists().feature_list()) {
    const std::string& sequence_feature_name = p.first;
    const tensorflow::FeatureList& sequence_feature_list = p.second;
    FeatureListDecoder* sequence_feature_decoder = nullptr;
    UnknownTypeFeatureListDecoder* unknown_type_sequence_feature_decoder =
        nullptr;
    // Determine if there is an existing decoder for this sequence feature.
    const auto it = sequence_feature_decoders_.find(sequence_feature_name);
    if (it != sequence_feature_decoders_.end()) {
      if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
              it->second)) {
        sequence_feature_decoder =
            absl::get<std::unique_ptr<FeatureListDecoder>>(it->second).get();
      } else {
        unknown_type_sequence_feature_decoder =
            absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(
                it->second)
                .get();
      }
    }
    // If there was an unknown type decoder for this sequence feature,
    // determine if its type can be determined from the current sequence
    // example. If so, convert it to a feature list decoder of the
    // appropriate type.
    if (unknown_type_sequence_feature_decoder) {
      for (const auto& feature : sequence_feature_list.feature()) {
        if (feature.kind_case() != tensorflow::Feature::KIND_NOT_SET) {
          TFX_BSL_RETURN_IF_ERROR(
              unknown_type_sequence_feature_decoder->ConvertToTypedListDecoder(
                  feature.kind_case(), &sequence_feature_decoder));
          sequence_feature_decoders_[sequence_feature_name] =
              absl::WrapUnique(sequence_feature_decoder);
          unknown_type_sequence_feature_decoder = nullptr;
          break;
        }
      }
    }
    if (sequence_feature_decoder == nullptr &&
        unknown_type_sequence_feature_decoder == nullptr) {
      // If there is no existing decoder for this sequence feature, create
      // one.
      if (sequence_feature_list.feature_size() == 0) {
        unknown_type_sequence_feature_decoder =
            UnknownTypeFeatureListDecoder::Make();
      } else {
        // Determine if the type can be identified from any of the features
        // in the feature list. Use the first type found. If there is a type
        // inconsistency, it will be found and addressed in the decoder.
        tensorflow::Feature::KindCase feature_kind_case =
            tensorflow::Feature::KIND_NOT_SET;
        for (const auto& feature : sequence_feature_list.feature()) {
          if (feature.kind_case() != tensorflow::Feature::KIND_NOT_SET) {
            feature_kind_case = feature.kind_case();
            break;
          }
        }
        switch (feature_kind_case) {
          case tensorflow::Feature::kInt64List:
            sequence_feature_decoder = IntListDecoder::Make();
            break;
          case tensorflow::Feature::kFloatList:
            sequence_feature_decoder = FloatListDecoder::Make();
            break;
          case tensorflow::Feature::kBytesList:
            sequence_feature_decoder = BytesListDecoder::Make();
            break;
          case tensorflow::Feature::KIND_NOT_SET:
            unknown_type_sequence_feature_decoder =
                UnknownTypeFeatureListDecoder::Make();
            break;
        }
      }  // end clause processing a feature list with > 0 features.
      if (unknown_type_sequence_feature_decoder) {
        // Handle the situation in which we see a sequence feature of
        // unknown type for the first time after already having processed
        // some sequence examples. In that case, append a null for each
        // sequence example that has already been processed (in which this
        // sequence feature was not seen), excluding the current sequence
        // example.
        for (int i = 0; i < num_examples_processed_; ++i) {
          unknown_type_sequence_feature_decoder->AppendNull();
        }
        sequence_feature_decoders_[sequence_feature_name] =
            absl::WrapUnique(unknown_type_sequence_feature_decoder);
      } else if (sequence_feature_decoder) {
        // Similarly handle the situation in which we see a sequence feature
        // of a known type for the first time after already having processed
        // some sequence examples.
        for (int i = 0; i < num_examples_processed_; ++i) {
          TFX_BSL_RETURN_IF_ERROR(sequence_feature_decoder->AppendNull());
        }
        sequence_feature_decoders_[sequence_feature_name] =
            absl::WrapUnique(sequence_feature_decoder);
      }
    }  // End adding new decoder.
    // Decode the current feature list using the appropriate feature
    // decoder.
    absl::Status status;
    if (sequence_feature_decoder) {
      status =
          sequence_feature_decoder->DecodeFeatureList(sequence_feature_list);
    } else if (unknown_type_sequence_feature_decoder) {
      status = unknown_type_sequence_feature_decoder->DecodeFeatureList(
          sequence_feature_list);
    }
    if (!status.ok()) {
      return absl::Status(
          status.code(),
          absl::StrCat(status.message(), " for sequence feature \"",
                       sequence_feature_name, "\""));
    }
  }  // End processing the current feature list.

  // Calling FinishFeatureList ensures that a Null is appended for a given
  // feature if it was not decoded (e.g., because it was not seen) in the
  // current SequenceExample.
  for (const auto& p : sequence_feature_decoders_) {
    if (absl::holds_alternative<std::unique_ptr<FeatureListDecoder>>(
            p.second)) {
      TFX_BSL_RETURN_IF_ERROR(
          absl::get<std::unique_ptr<FeatureListDecoder>>(p.second)
              .get()
              ->FinishFeatureList());
    } else {
      TFX_BSL_RETURN_IF_ERROR(
          absl::get<std::unique_ptr<UnknownTypeFeatureListDecoder>>(p.second)
              .get()
              ->FinishFeatureList());
    }
  }
  ++num_examples_processed_;

  return absl::OkStatus();
}