Status FastParseSingleExample()

in tensorflow/tensorflow/core/util/example_proto_fast_parsing.cc [1276:1625]


Status FastParseSingleExample(const Config& config,
                              absl::string_view serialized, Result* result) {
  DCHECK(result != nullptr);
  // Check config so we can safely CHECK(false) in switches on config.*.dtype
  for (auto& c : config.sparse) {
    TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
  }
  for (auto& c : config.dense) {
    TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
  }

  PerExampleFeatureStats* stats = nullptr;
  if (config.collect_feature_stats) {
    result->feature_stats.emplace_back();
    stats = &result->feature_stats.back();
  }

  // TODO(mrry): Cache the construction of this map at Op construction time.
  size_t config_size = config.dense.size() + config.sparse.size();
  SeededHasher hasher;
  // Build config index.
  PresizedCuckooMap<std::pair<size_t, Type>> config_index(config_size);
  bool ok = true;
  for (size_t i = 0; i < 1000; ++i) {
    for (size_t d = 0; d < config.dense.size(); ++d) {
      ok &= config_index.InsertUnique(hasher(config.dense[d].feature_name),
                                      {d, Type::Dense});
    }
    for (size_t d = 0; d < config.sparse.size(); ++d) {
      ok &= config_index.InsertUnique(hasher(config.sparse[d].feature_name),
                                      {d, Type::Sparse});
    }
    if (ok) break;
    LOG(WARNING) << "Collision found. This should happen only if you have "
                    "around 2^32 entries in your config.";
    hasher.seed++;
    config_index.Clear(config_size);
    ok = true;
  }
  if (!ok) {
    return errors::Internal(
        "Could not avoid collision. This should not happen.");
  }

  // Allocate dense output tensors.
  for (size_t d = 0; d < config.dense.size(); ++d) {
    if (!config.dense[d].variable_length) {
      TensorShape values_shape;
      if (!config.dense[d].shape.AsTensorShape(&values_shape)) {
        return errors::Internal(
            "Fixed-length shape was not a statically defined shape.");
      }
      result->dense_values.emplace_back(config.dense[d].dtype, values_shape);
    } else {
      // Variable-length tensor will be allocated later.
      result->dense_values.emplace_back();
    }
  }

  // Allocate sparse output tensors.
  for (size_t d = 0; d < config.sparse.size(); ++d) {
    // The dense_shape is always a vector of length 1.
    result->sparse_shapes.emplace_back(DT_INT64, TensorShape({1}));
    // Variable-length tensors will be allocated later.
    result->sparse_indices.emplace_back();
    result->sparse_values.emplace_back();
  }

  parsed::Example parsed_example;
  if (!ParseExample(serialized, &parsed_example)) {
    return errors::InvalidArgument("Could not parse example input, value: '",
                                   serialized, "'");
  }
  std::vector<bool> sparse_feature_already_seen(config.sparse.size(), false);
  std::vector<bool> dense_feature_already_seen(config.dense.size(), false);

  if (stats) {
    // TODO(b/111553342): This may over-count the number of features if there
    // are duplicate keys in the feature map. Consider deduplicating the keys
    // before computing the count.
    stats->features_count = parsed_example.size();
  }

  // Handle features present in the example.
  const size_t parsed_example_size = parsed_example.size();
  for (size_t i = 0; i < parsed_example_size; ++i) {
    // This is a logic that standard protobuf parsing is implementing.
    // I.e. last entry in the map overwrites all the previous ones.
    parsed::FeatureMapEntry& name_and_feature =
        parsed_example[parsed_example_size - i - 1];

    const StringPiece feature_name = name_and_feature.first;
    parsed::Feature& feature = name_and_feature.second;

    std::pair<size_t, Type> d_and_type;
    uint64 h = hasher(feature_name);
    if (!config_index.Find(h, &d_and_type)) continue;

    size_t d = d_and_type.first;
    bool is_dense = d_and_type.second == Type::Dense;

    {
      // Testing for PresizedCuckooMap collision.
      // TODO(lew): Use dense_hash_map and avoid this and hasher creation.
      const string& config_feature_name = is_dense
                                              ? config.dense[d].feature_name
                                              : config.sparse[d].feature_name;
      if (feature_name != config_feature_name) continue;
    }

    auto example_error = [feature_name](StringPiece suffix) {
      return errors::InvalidArgument("Key: ", feature_name, ".  ", suffix);
    };

    auto parse_error = [feature_name] {
      return errors::InvalidArgument("Key: ", feature_name,
                                     ".  Can't parse serialized Example.");
    };

    DataType example_dtype;
    TF_RETURN_IF_ERROR(feature.ParseDataType(&example_dtype));
    if (example_dtype == DT_INVALID) continue;

    if (is_dense && !config.dense[d].variable_length) {
      // If feature was already visited, skip.
      // Compare comment at the beginning of the loop.
      if (dense_feature_already_seen[d]) {
        LogDenseFeatureDataLoss(feature_name);
        continue;
      }
      dense_feature_already_seen[d] = true;

      if (example_dtype != config.dense[d].dtype) {
        return example_error(strings::StrCat(
            "Data types don't match. Data type: ",
            DataTypeString(example_dtype),
            " but expected type: ", DataTypeString(config.dense[d].dtype)));
      }

      Tensor* out = &result->dense_values[d];
      const std::size_t num_elements = config.dense[d].elements_per_stride;
      if (stats) {
        // TODO(b/111553342): If desirable, we could add support for counting
        // elements in the features that aren't parsed, but this could add
        // considerable runtime cost.
        stats->feature_values_count += num_elements;
      }
      switch (example_dtype) {
        case DT_INT64: {
          auto out_p = out->flat<int64>().data();
          LimitedArraySlice<int64> slice(out_p, num_elements);
          if (!feature.ParseInt64List(&slice)) return parse_error();
          if (slice.EndDistance() != 0) {
            return parse_error();
          }
          break;
        }
        case DT_FLOAT: {
          auto out_p = out->flat<float>().data();
          LimitedArraySlice<float> slice(out_p, num_elements);
          if (!feature.ParseFloatList(&slice)) return parse_error();
          if (slice.EndDistance() != 0) {
            return parse_error();
          }
          break;
        }
        case DT_STRING: {
          auto out_p = out->flat<tstring>().data();
          LimitedArraySlice<tstring> slice(out_p, num_elements);
          if (!feature.ParseBytesList(&slice)) return parse_error();
          if (slice.EndDistance() != 0) {
            return parse_error();
          }
          break;
        }
        default:
          LOG(FATAL) << "Should not happen.";
      }

    } else {  // if variable length
      SmallVector<tstring> bytes_list;
      TensorVector<float> float_list;
      SmallVector<int64> int64_list;

      const size_t num_elements_divisor =
          is_dense ? config.dense[d].elements_per_stride : 1;
      size_t num_elements;

      if (is_dense) {
        // If feature was already visited, skip.
        // Compare comment at the beginning of the loop.
        if (dense_feature_already_seen[d]) {
          LogDenseFeatureDataLoss(feature_name);
          continue;
        }
        dense_feature_already_seen[d] = true;
        if (example_dtype != config.dense[d].dtype) {
          return example_error(strings::StrCat(
              "Data types don't match. Data type: ",
              DataTypeString(example_dtype),
              " but expected type: ", DataTypeString(config.dense[d].dtype)));
        }
      } else {
        // If feature was already visited, skip.
        // Compare comment at the beginning of the loop.
        if (sparse_feature_already_seen[d]) {
          LogSparseFeatureDataLoss(feature_name);
          continue;
        }
        sparse_feature_already_seen[d] = true;

        // Handle sparse features.
        if (example_dtype != DT_INVALID &&
            example_dtype != config.sparse[d].dtype) {
          return example_error(strings::StrCat(
              "Data types don't match. ",
              "Expected type: ", DataTypeString(config.sparse[d].dtype),
              ", Actual type: ", DataTypeString(example_dtype)));
        }
      }

      switch (example_dtype) {
        case DT_INT64: {
          // TODO(mrry): Use the fact that the `int64_list` is packed to read
          // out the length and pre-allocate the output tensor.
          if (!feature.ParseInt64List(&int64_list)) return parse_error();
          num_elements = int64_list.size();
          break;
        }
        case DT_FLOAT: {
          if (!feature.ParseFloatList(&float_list)) return parse_error();
          num_elements = float_list.size();
          break;
        }
        case DT_STRING: {
          int actual_num_elements = 0;
          if (!feature.GetNumElementsInBytesList(&actual_num_elements)) {
            return parse_error();
          }
          bytes_list.reserve(actual_num_elements);
          if (!feature.ParseBytesList(&bytes_list)) return parse_error();
          num_elements = bytes_list.size();
          break;
        }
        default:
          LOG(FATAL) << "Should not happen. " << DataTypeString(example_dtype);
      }

      if (num_elements % num_elements_divisor != 0) {
        return parse_error();
      }

      if (stats) {
        stats->feature_values_count += num_elements;
      }

      Tensor* out;
      DataType out_dtype;
      TensorShape out_shape;
      if (is_dense) {
        out_shape.AddDim(num_elements / num_elements_divisor);
        for (int i = 1; i < config.dense[d].shape.dims(); ++i) {
          out_shape.AddDim(config.dense[d].shape.dim_size(i));
        }

        out = &result->dense_values[d];
        out_dtype = config.dense[d].dtype;
      } else {
        Tensor* out_indices = &result->sparse_indices[d];
        Tensor* out_dense_shape = &result->sparse_shapes[d];

        // TODO(mrry): Investigate the possibility of not materializing
        // the indices (and perhaps dense_shape) until they are needed.
        *out_indices = Tensor(
            DT_INT64, TensorShape({static_cast<int64>(num_elements), 1}));
        auto indices_flat = out_indices->flat<int64>();
        for (size_t i = 0; i < num_elements; ++i) {
          indices_flat(i) = static_cast<int64>(i);
        }

        *out_dense_shape = Tensor(DT_INT64, TensorShape({1}));
        auto shapes_shape_t = out_dense_shape->vec<int64>();
        shapes_shape_t(0) = num_elements;

        out = &result->sparse_values[d];
        out_dtype = config.sparse[d].dtype;
        out_shape.AddDim(num_elements);
      }

      switch (example_dtype) {
        case DT_INT64: {
          *out = Tensor(out_dtype, out_shape);
          CopyOrMoveBlock(int64_list.begin(), int64_list.end(),
                          out->flat<int64>().data());
          break;
        }
        case DT_FLOAT: {
          if (!out->CopyFrom(float_list.tensor(), out_shape)) {
            return parse_error();
          }
          break;
        }
        case DT_STRING: {
          *out = Tensor(out_dtype, out_shape);
          CopyOrMoveBlock(bytes_list.begin(), bytes_list.end(),
                          out->flat<tstring>().data());
          break;
        }
        default:
          LOG(FATAL) << "Should not happen.";
      }
    }
  }

  // Handle missing dense features.
  for (size_t d = 0; d < config.dense.size(); ++d) {
    if (!dense_feature_already_seen[d]) {
      if (!config.dense[d].variable_length) {
        // Handle missing fixed-length dense feature.
        if (config.dense[d].default_value.NumElements() == 0) {
          return errors::InvalidArgument(
              "Feature: ", config.dense[d].feature_name,
              " (data type: ", DataTypeString(config.dense[d].dtype), ")",
              " is required but could not be found.");
        }
        result->dense_values[d] = config.dense[d].default_value;
      } else {
        // Handle missing varlen dense feature.
        TensorShape empty_shape;
        empty_shape.AddDim(0);
        for (int i = 1; i < config.dense[d].shape.dims(); ++i) {
          empty_shape.AddDim(config.dense[d].shape.dim_size(i));
        }
        result->dense_values[d] = Tensor(config.dense[d].dtype, empty_shape);
      }
    }
  }

  // Handle missing sparse features.
  for (size_t d = 0; d < config.sparse.size(); ++d) {
    if (!sparse_feature_already_seen[d]) {
      result->sparse_indices[d] = Tensor(DT_INT64, TensorShape({0, 1}));
      result->sparse_values[d] =
          Tensor(config.sparse[d].dtype, TensorShape({0}));
      result->sparse_shapes[d].vec<int64>()(0) = 0;
    }
  }

  return Status::OK();
}