in tensorflow/tensorflow/core/util/example_proto_fast_parsing.cc [1276:1625]
Status FastParseSingleExample(const Config& config,
absl::string_view serialized, Result* result) {
DCHECK(result != nullptr);
// Check config so we can safely CHECK(false) in switches on config.*.dtype
for (auto& c : config.sparse) {
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
}
for (auto& c : config.dense) {
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
}
PerExampleFeatureStats* stats = nullptr;
if (config.collect_feature_stats) {
result->feature_stats.emplace_back();
stats = &result->feature_stats.back();
}
// TODO(mrry): Cache the construction of this map at Op construction time.
size_t config_size = config.dense.size() + config.sparse.size();
SeededHasher hasher;
// Build config index.
PresizedCuckooMap<std::pair<size_t, Type>> config_index(config_size);
bool ok = true;
for (size_t i = 0; i < 1000; ++i) {
for (size_t d = 0; d < config.dense.size(); ++d) {
ok &= config_index.InsertUnique(hasher(config.dense[d].feature_name),
{d, Type::Dense});
}
for (size_t d = 0; d < config.sparse.size(); ++d) {
ok &= config_index.InsertUnique(hasher(config.sparse[d].feature_name),
{d, Type::Sparse});
}
if (ok) break;
LOG(WARNING) << "Collision found. This should happen only if you have "
"around 2^32 entries in your config.";
hasher.seed++;
config_index.Clear(config_size);
ok = true;
}
if (!ok) {
return errors::Internal(
"Could not avoid collision. This should not happen.");
}
// Allocate dense output tensors.
for (size_t d = 0; d < config.dense.size(); ++d) {
if (!config.dense[d].variable_length) {
TensorShape values_shape;
if (!config.dense[d].shape.AsTensorShape(&values_shape)) {
return errors::Internal(
"Fixed-length shape was not a statically defined shape.");
}
result->dense_values.emplace_back(config.dense[d].dtype, values_shape);
} else {
// Variable-length tensor will be allocated later.
result->dense_values.emplace_back();
}
}
// Allocate sparse output tensors.
for (size_t d = 0; d < config.sparse.size(); ++d) {
// The dense_shape is always a vector of length 1.
result->sparse_shapes.emplace_back(DT_INT64, TensorShape({1}));
// Variable-length tensors will be allocated later.
result->sparse_indices.emplace_back();
result->sparse_values.emplace_back();
}
parsed::Example parsed_example;
if (!ParseExample(serialized, &parsed_example)) {
return errors::InvalidArgument("Could not parse example input, value: '",
serialized, "'");
}
std::vector<bool> sparse_feature_already_seen(config.sparse.size(), false);
std::vector<bool> dense_feature_already_seen(config.dense.size(), false);
if (stats) {
// TODO(b/111553342): This may over-count the number of features if there
// are duplicate keys in the feature map. Consider deduplicating the keys
// before computing the count.
stats->features_count = parsed_example.size();
}
// Handle features present in the example.
const size_t parsed_example_size = parsed_example.size();
for (size_t i = 0; i < parsed_example_size; ++i) {
// This is a logic that standard protobuf parsing is implementing.
// I.e. last entry in the map overwrites all the previous ones.
parsed::FeatureMapEntry& name_and_feature =
parsed_example[parsed_example_size - i - 1];
const StringPiece feature_name = name_and_feature.first;
parsed::Feature& feature = name_and_feature.second;
std::pair<size_t, Type> d_and_type;
uint64 h = hasher(feature_name);
if (!config_index.Find(h, &d_and_type)) continue;
size_t d = d_and_type.first;
bool is_dense = d_and_type.second == Type::Dense;
{
// Testing for PresizedCuckooMap collision.
// TODO(lew): Use dense_hash_map and avoid this and hasher creation.
const string& config_feature_name = is_dense
? config.dense[d].feature_name
: config.sparse[d].feature_name;
if (feature_name != config_feature_name) continue;
}
auto example_error = [feature_name](StringPiece suffix) {
return errors::InvalidArgument("Key: ", feature_name, ". ", suffix);
};
auto parse_error = [feature_name] {
return errors::InvalidArgument("Key: ", feature_name,
". Can't parse serialized Example.");
};
DataType example_dtype;
TF_RETURN_IF_ERROR(feature.ParseDataType(&example_dtype));
if (example_dtype == DT_INVALID) continue;
if (is_dense && !config.dense[d].variable_length) {
// If feature was already visited, skip.
// Compare comment at the beginning of the loop.
if (dense_feature_already_seen[d]) {
LogDenseFeatureDataLoss(feature_name);
continue;
}
dense_feature_already_seen[d] = true;
if (example_dtype != config.dense[d].dtype) {
return example_error(strings::StrCat(
"Data types don't match. Data type: ",
DataTypeString(example_dtype),
" but expected type: ", DataTypeString(config.dense[d].dtype)));
}
Tensor* out = &result->dense_values[d];
const std::size_t num_elements = config.dense[d].elements_per_stride;
if (stats) {
// TODO(b/111553342): If desirable, we could add support for counting
// elements in the features that aren't parsed, but this could add
// considerable runtime cost.
stats->feature_values_count += num_elements;
}
switch (example_dtype) {
case DT_INT64: {
auto out_p = out->flat<int64>().data();
LimitedArraySlice<int64> slice(out_p, num_elements);
if (!feature.ParseInt64List(&slice)) return parse_error();
if (slice.EndDistance() != 0) {
return parse_error();
}
break;
}
case DT_FLOAT: {
auto out_p = out->flat<float>().data();
LimitedArraySlice<float> slice(out_p, num_elements);
if (!feature.ParseFloatList(&slice)) return parse_error();
if (slice.EndDistance() != 0) {
return parse_error();
}
break;
}
case DT_STRING: {
auto out_p = out->flat<tstring>().data();
LimitedArraySlice<tstring> slice(out_p, num_elements);
if (!feature.ParseBytesList(&slice)) return parse_error();
if (slice.EndDistance() != 0) {
return parse_error();
}
break;
}
default:
LOG(FATAL) << "Should not happen.";
}
} else { // if variable length
SmallVector<tstring> bytes_list;
TensorVector<float> float_list;
SmallVector<int64> int64_list;
const size_t num_elements_divisor =
is_dense ? config.dense[d].elements_per_stride : 1;
size_t num_elements;
if (is_dense) {
// If feature was already visited, skip.
// Compare comment at the beginning of the loop.
if (dense_feature_already_seen[d]) {
LogDenseFeatureDataLoss(feature_name);
continue;
}
dense_feature_already_seen[d] = true;
if (example_dtype != config.dense[d].dtype) {
return example_error(strings::StrCat(
"Data types don't match. Data type: ",
DataTypeString(example_dtype),
" but expected type: ", DataTypeString(config.dense[d].dtype)));
}
} else {
// If feature was already visited, skip.
// Compare comment at the beginning of the loop.
if (sparse_feature_already_seen[d]) {
LogSparseFeatureDataLoss(feature_name);
continue;
}
sparse_feature_already_seen[d] = true;
// Handle sparse features.
if (example_dtype != DT_INVALID &&
example_dtype != config.sparse[d].dtype) {
return example_error(strings::StrCat(
"Data types don't match. ",
"Expected type: ", DataTypeString(config.sparse[d].dtype),
", Actual type: ", DataTypeString(example_dtype)));
}
}
switch (example_dtype) {
case DT_INT64: {
// TODO(mrry): Use the fact that the `int64_list` is packed to read
// out the length and pre-allocate the output tensor.
if (!feature.ParseInt64List(&int64_list)) return parse_error();
num_elements = int64_list.size();
break;
}
case DT_FLOAT: {
if (!feature.ParseFloatList(&float_list)) return parse_error();
num_elements = float_list.size();
break;
}
case DT_STRING: {
int actual_num_elements = 0;
if (!feature.GetNumElementsInBytesList(&actual_num_elements)) {
return parse_error();
}
bytes_list.reserve(actual_num_elements);
if (!feature.ParseBytesList(&bytes_list)) return parse_error();
num_elements = bytes_list.size();
break;
}
default:
LOG(FATAL) << "Should not happen. " << DataTypeString(example_dtype);
}
if (num_elements % num_elements_divisor != 0) {
return parse_error();
}
if (stats) {
stats->feature_values_count += num_elements;
}
Tensor* out;
DataType out_dtype;
TensorShape out_shape;
if (is_dense) {
out_shape.AddDim(num_elements / num_elements_divisor);
for (int i = 1; i < config.dense[d].shape.dims(); ++i) {
out_shape.AddDim(config.dense[d].shape.dim_size(i));
}
out = &result->dense_values[d];
out_dtype = config.dense[d].dtype;
} else {
Tensor* out_indices = &result->sparse_indices[d];
Tensor* out_dense_shape = &result->sparse_shapes[d];
// TODO(mrry): Investigate the possibility of not materializing
// the indices (and perhaps dense_shape) until they are needed.
*out_indices = Tensor(
DT_INT64, TensorShape({static_cast<int64>(num_elements), 1}));
auto indices_flat = out_indices->flat<int64>();
for (size_t i = 0; i < num_elements; ++i) {
indices_flat(i) = static_cast<int64>(i);
}
*out_dense_shape = Tensor(DT_INT64, TensorShape({1}));
auto shapes_shape_t = out_dense_shape->vec<int64>();
shapes_shape_t(0) = num_elements;
out = &result->sparse_values[d];
out_dtype = config.sparse[d].dtype;
out_shape.AddDim(num_elements);
}
switch (example_dtype) {
case DT_INT64: {
*out = Tensor(out_dtype, out_shape);
CopyOrMoveBlock(int64_list.begin(), int64_list.end(),
out->flat<int64>().data());
break;
}
case DT_FLOAT: {
if (!out->CopyFrom(float_list.tensor(), out_shape)) {
return parse_error();
}
break;
}
case DT_STRING: {
*out = Tensor(out_dtype, out_shape);
CopyOrMoveBlock(bytes_list.begin(), bytes_list.end(),
out->flat<tstring>().data());
break;
}
default:
LOG(FATAL) << "Should not happen.";
}
}
}
// Handle missing dense features.
for (size_t d = 0; d < config.dense.size(); ++d) {
if (!dense_feature_already_seen[d]) {
if (!config.dense[d].variable_length) {
// Handle missing fixed-length dense feature.
if (config.dense[d].default_value.NumElements() == 0) {
return errors::InvalidArgument(
"Feature: ", config.dense[d].feature_name,
" (data type: ", DataTypeString(config.dense[d].dtype), ")",
" is required but could not be found.");
}
result->dense_values[d] = config.dense[d].default_value;
} else {
// Handle missing varlen dense feature.
TensorShape empty_shape;
empty_shape.AddDim(0);
for (int i = 1; i < config.dense[d].shape.dims(); ++i) {
empty_shape.AddDim(config.dense[d].shape.dim_size(i));
}
result->dense_values[d] = Tensor(config.dense[d].dtype, empty_shape);
}
}
}
// Handle missing sparse features.
for (size_t d = 0; d < config.sparse.size(); ++d) {
if (!sparse_feature_already_seen[d]) {
result->sparse_indices[d] = Tensor(DT_INT64, TensorShape({0, 1}));
result->sparse_values[d] =
Tensor(config.sparse[d].dtype, TensorShape({0}));
result->sparse_shapes[d].vec<int64>()(0) = 0;
}
}
return Status::OK();
}