in tensorflow/tensorflow/core/util/example_proto_fast_parsing.cc [1820:2499]
Status FastParseSequenceExample(
const FastParseExampleConfig& context_config,
const FastParseExampleConfig& feature_list_config,
gtl::ArraySlice<tstring> serialized, gtl::ArraySlice<tstring> example_names,
thread::ThreadPool* thread_pool, Result* context_result,
Result* feature_list_result, std::vector<Tensor>* dense_feature_lengths) {
int num_examples = serialized.size();
DCHECK(context_result != nullptr);
DCHECK(feature_list_result != nullptr);
DCHECK(dense_feature_lengths != nullptr);
size_t num_context_features =
context_config.sparse.size() + context_config.dense.size();
absl::flat_hash_map<StringPiece, bool> context_is_sparse;
context_is_sparse.reserve(num_context_features);
absl::flat_hash_map<StringPiece, std::pair<DataType, size_t>>
context_feature_type_and_lengths;
context_feature_type_and_lengths.reserve(num_context_features);
if (!example_names.empty() && example_names.size() != num_examples) {
return errors::InvalidArgument(
"example_names must be empty or have the correct number of elements");
}
for (auto& c : context_config.sparse) {
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
context_feature_type_and_lengths[c.feature_name] =
std::make_pair(c.dtype, 0);
context_is_sparse[c.feature_name] = true;
}
for (auto& c : context_config.dense) {
if (context_is_sparse[c.feature_name]) {
return errors::InvalidArgument("Context feature " + c.feature_name +
" cannot be both dense and sparse");
}
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
context_feature_type_and_lengths[c.feature_name] =
std::make_pair(c.dtype, c.default_value.NumElements());
if (c.default_value.NumElements() > 0) {
if (!c.shape.IsCompatibleWith(c.default_value.shape())) {
return errors::InvalidArgument("Default value for context feature ",
c.feature_name,
" has an incorrect shape: saw ",
c.default_value.shape().DebugString(),
" but expected ", c.shape.DebugString());
}
}
}
size_t num_sequence_features =
feature_list_config.sparse.size() + feature_list_config.dense.size();
absl::flat_hash_map<StringPiece, bool> sequence_is_sparse;
sequence_is_sparse.reserve(num_sequence_features);
absl::flat_hash_map<StringPiece, std::pair<DataType, size_t>>
sequence_feature_type_and_lengths;
sequence_feature_type_and_lengths.reserve(num_sequence_features);
for (auto& c : feature_list_config.sparse) {
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
sequence_feature_type_and_lengths[c.feature_name] =
std::make_pair(c.dtype, 0);
sequence_is_sparse[c.feature_name] = true;
}
for (auto& c : feature_list_config.dense) {
if (sequence_is_sparse[c.feature_name]) {
return errors::InvalidArgument("Sequence feature " + c.feature_name +
" cannot be both dense and sparse");
}
TF_RETURN_IF_ERROR(CheckConfigDataType(c.dtype));
sequence_feature_type_and_lengths[c.feature_name] =
std::make_pair(c.dtype, 0);
}
std::vector<absl::flat_hash_map<StringPiece, StringPiece>>
all_context_features(num_examples);
std::vector<absl::flat_hash_map<StringPiece, StringPiece>>
all_sequence_features(num_examples);
const tstring kUnknown = "<unknown>";
for (int d = 0; d < num_examples; d++) {
const tstring& example = serialized[d];
const tstring& example_name =
example_names.empty() ? kUnknown : example_names[d];
auto* context_features = &all_context_features[d];
auto* sequence_features = &all_sequence_features[d];
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(example.data()), example.size());
// Not clear what this does. Why not stream.EnableAliasing()?
EnableAliasing(&stream);
// Extract pointers to all features within this serialized example.
while (!stream.ExpectAtEnd()) {
absl::flat_hash_map<StringPiece, StringPiece>* features = nullptr;
const absl::flat_hash_map<StringPiece, std::pair<DataType, size_t>>*
config = nullptr;
if (stream.ExpectTag(kDelimitedTag(1))) {
// Context
features = context_features;
config = &context_feature_type_and_lengths;
} else if (stream.ExpectTag(kDelimitedTag(2))) {
// Sequence
features = sequence_features;
config = &sequence_feature_type_and_lengths;
} else if (!SkipExtraneousTag(&stream)) {
return errors::InvalidArgument(
"Invalid protocol message input, example id: ", example_name);
}
if (features != nullptr) {
uint32 length;
if (!stream.ReadVarint32(&length)) {
return errors::InvalidArgument(
"Invalid protocol message input, example id: ", example_name);
}
auto limit = stream.PushLimit(length);
while (!stream.ExpectAtEnd()) {
StringPiece key, value;
uint32 length;
if (!stream.ExpectTag(kDelimitedTag(1)) ||
!stream.ReadVarint32(&length)) {
return errors::InvalidArgument(
"Invalid protocol message input, example id: ", example_name);
}
auto limit = stream.PushLimit(length);
if (!stream.ExpectTag(kDelimitedTag(1)) ||
!ParseString(&stream, &key) ||
!stream.ExpectTag(kDelimitedTag(2)) ||
!ParseString(&stream, &value) || !stream.ExpectAtEnd()) {
return errors::InvalidArgument(
"Invalid protocol message input, example id: ", example_name);
}
stream.PopLimit(limit);
// Only save if this feature was requested.
if (config->count(key) > 0) {
(*features)[key] = value;
}
}
stream.PopLimit(limit);
}
}
for (const auto& c : *context_features) {
size_t num_elements = 0;
if (!c.second.empty()) {
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(c.second.data()), c.second.size());
EnableAliasing(&stream);
DataType dtype = context_feature_type_and_lengths[c.first].first;
int64 num;
switch (dtype) {
case DT_STRING:
num = ParseBytesFeature(&stream, nullptr);
break;
case DT_FLOAT:
num = ParseFloatFeature(&stream, nullptr);
break;
case DT_INT64:
num = ParseInt64Feature(&stream, nullptr);
break;
default:
num = -1;
break;
}
if (num == -1) {
return errors::InvalidArgument("Error in context feature ", c.first,
" in example ", example_name);
}
num_elements += num;
}
if (context_is_sparse[c.first]) {
context_feature_type_and_lengths[c.first].second += num_elements;
} else {
size_t current_max = context_feature_type_and_lengths[c.first].second;
context_feature_type_and_lengths[c.first].second =
std::max(current_max, num_elements);
}
}
for (const auto& c : *sequence_features) {
size_t num_elements = 0;
if (!c.second.empty()) {
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(c.second.data()), c.second.size());
EnableAliasing(&stream);
DataType dtype = sequence_feature_type_and_lengths[c.first].first;
while (!stream.ExpectAtEnd()) {
uint32 feature_length;
if (!stream.ExpectTag(kDelimitedTag(1)) ||
!stream.ReadVarint32(&feature_length)) {
return errors::InvalidArgument("Error in sequence feature ",
c.first, " in example ",
example_name);
}
if (feature_length > 2) {
auto limit = stream.PushLimit(feature_length);
int64 num;
switch (dtype) {
case DT_STRING:
num = ParseBytesFeature(&stream, nullptr);
break;
case DT_FLOAT:
num = ParseFloatFeature(&stream, nullptr);
break;
case DT_INT64:
num = ParseInt64Feature(&stream, nullptr);
break;
default:
num = -1;
break;
}
if (num == -1) {
return errors::InvalidArgument("Error in sequence feature ",
c.first, " in example ",
example_name);
}
num_elements += num;
stream.PopLimit(limit);
} else if (feature_length == 2) {
if (!SkipEmptyFeature(&stream, dtype)) {
return errors::InvalidArgument("Error in sequence feature ",
c.first, " in example ",
example_name);
}
} else if (feature_length != 0) {
return errors::InvalidArgument("Error in sequence feature ",
c.first, " in example ",
example_name);
}
}
}
if (sequence_is_sparse[c.first]) {
sequence_feature_type_and_lengths[c.first].second += num_elements;
} else {
size_t current_max = sequence_feature_type_and_lengths[c.first].second;
sequence_feature_type_and_lengths[c.first].second =
std::max(current_max, num_elements);
}
}
}
// Allocate memory.
context_result->sparse_values.resize(context_config.sparse.size());
context_result->sparse_indices.resize(context_config.sparse.size());
context_result->sparse_shapes.resize(context_config.sparse.size());
context_result->dense_values.resize(context_config.dense.size());
feature_list_result->sparse_values.resize(feature_list_config.sparse.size());
feature_list_result->sparse_indices.resize(feature_list_config.sparse.size());
feature_list_result->sparse_shapes.resize(feature_list_config.sparse.size());
feature_list_result->dense_values.resize(feature_list_config.dense.size());
dense_feature_lengths->resize(feature_list_config.dense.size());
// NOTE(mrry): Cache the CPU allocator here and use it in Tensor construction,
// to avoid lock contention in `tensorflow::cpu_allocator()`.
Allocator* allocator = tensorflow::cpu_allocator();
int t = 0;
for (const auto& c : context_config.dense) {
TensorShape dense_shape, example_shape;
DataType dtype = c.dtype;
const size_t expected_max_elements =
context_feature_type_and_lengths[c.feature_name].second;
if (!c.shape.AsTensorShape(&example_shape) ||
expected_max_elements != example_shape.num_elements()) {
return errors::InvalidArgument(
"Inconsistent number of elements for feature ", c.feature_name, ": ",
expected_max_elements, " vs ", dense_shape.num_elements());
}
dense_shape.AddDim(num_examples);
for (const int dim : c.shape.dim_sizes()) {
dense_shape.AddDim(dim);
}
context_result->dense_values[t] = Tensor(allocator, dtype, dense_shape);
// TODO(sundberg): Refactor to reduce code duplication, and add bounds
// checking for the outputs.
tstring* out_bytes = nullptr;
float* out_float = nullptr;
int64* out_int64 = nullptr;
switch (dtype) {
case DT_STRING:
out_bytes = context_result->dense_values[t].flat<tstring>().data();
break;
case DT_FLOAT:
out_float = context_result->dense_values[t].flat<float>().data();
break;
case DT_INT64:
out_int64 = context_result->dense_values[t].flat<int64>().data();
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in feature ", c.feature_name);
}
t++;
// Fill in the values.
for (int e = 0; e < num_examples; e++) {
size_t num_elements = 0;
const auto feature_iter = all_context_features[e].find(c.feature_name);
const tstring& example_name =
example_names.empty() ? kUnknown : example_names[e];
if (feature_iter == all_context_features[e].end()) {
// Copy the default value, if present. If not, return an error.
if (c.default_value.NumElements() == 0) {
return errors::InvalidArgument(
"Feature: ", c.feature_name,
" (data type: ", DataTypeString(c.dtype), ")",
" is required but could not be found.");
}
const tstring* in_bytes = nullptr;
const float* in_float = nullptr;
const int64* in_int64 = nullptr;
size_t num = 0;
switch (dtype) {
case DT_STRING:
in_bytes = c.default_value.flat<tstring>().data();
num = c.default_value.NumElements();
for (int p = 0; p < num; p++) {
*out_bytes++ = *in_bytes++;
}
break;
case DT_FLOAT:
in_float = c.default_value.flat<float>().data();
num = c.default_value.NumElements();
for (int p = 0; p < num; p++) {
*out_float++ = *in_float++;
}
break;
case DT_INT64:
in_int64 = c.default_value.flat<int64>().data();
num = c.default_value.NumElements();
for (int p = 0; p < num; p++) {
*out_int64++ = *in_int64++;
}
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
num_elements += num;
} else if (!feature_iter->second.empty()) {
const auto& feature = feature_iter->second;
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(feature.data()), feature.size());
EnableAliasing(&stream);
size_t num_added;
switch (dtype) {
case DT_STRING:
num_added = ParseBytesFeature(&stream, out_bytes);
out_bytes += num_added;
break;
case DT_FLOAT:
num_added = ParseFloatFeature(&stream, out_float);
out_float += num_added;
break;
case DT_INT64:
num_added = ParseInt64Feature(&stream, out_int64);
out_int64 += num_added;
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
num_elements += num_added;
}
if (num_elements != expected_max_elements) {
return errors::InvalidArgument(
"Unexpected number of elements in example ", example_name);
}
}
}
t = 0;
for (const auto& c : context_config.sparse) {
TensorShape indices_shape, values_shape;
DataType dtype = c.dtype;
size_t expected_num_elements =
context_feature_type_and_lengths[c.feature_name].second;
indices_shape.AddDim(expected_num_elements);
indices_shape.AddDim(2);
values_shape.AddDim(expected_num_elements);
context_result->sparse_indices[t] =
Tensor(allocator, DT_INT64, indices_shape);
context_result->sparse_values[t] = Tensor(allocator, dtype, values_shape);
context_result->sparse_shapes[t] =
Tensor(allocator, DT_INT64, TensorShape({2}));
// TODO(sundberg): Refactor to reduce code duplication, and add bounds
// checking for the outputs.
tstring* out_bytes = nullptr;
float* out_float = nullptr;
int64* out_int64 = nullptr;
switch (dtype) {
case DT_STRING:
out_bytes = context_result->sparse_values[t].flat<tstring>().data();
break;
case DT_FLOAT:
out_float = context_result->sparse_values[t].flat<float>().data();
break;
case DT_INT64:
out_int64 = context_result->sparse_values[t].flat<int64>().data();
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in feature ", c.feature_name);
}
int64* out_indices = context_result->sparse_indices[t].flat<int64>().data();
auto out_shape = context_result->sparse_shapes[t].vec<int64>();
t++;
// Fill in the values.
size_t num_elements = 0;
size_t max_num_cols = 0;
for (int e = 0; e < num_examples; e++) {
const auto& feature = all_context_features[e][c.feature_name];
const tstring& example_name =
example_names.empty() ? kUnknown : example_names[e];
if (!feature.empty()) {
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(feature.data()), feature.size());
EnableAliasing(&stream);
size_t num_added;
switch (dtype) {
case DT_STRING:
num_added = ParseBytesFeature(&stream, out_bytes);
out_bytes += num_added;
break;
case DT_FLOAT:
num_added = ParseFloatFeature(&stream, out_float);
out_float += num_added;
break;
case DT_INT64:
num_added = ParseInt64Feature(&stream, out_int64);
out_int64 += num_added;
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
num_elements += num_added;
max_num_cols = std::max(max_num_cols, num_added);
for (int i = 0; i < num_added; i++) {
*out_indices++ = e;
*out_indices++ = i;
}
}
}
if (num_elements != expected_num_elements) {
return errors::InvalidArgument(
"Unexpected total number of elements in feature ", c.feature_name);
}
out_shape(0) = num_examples;
out_shape(1) = max_num_cols;
}
t = 0;
TensorShape dense_length_shape({num_examples});
for (const auto& c : feature_list_config.dense) {
TensorShape dense_shape, row_shape;
DataType dtype = c.dtype;
const size_t expected_max_elements =
sequence_feature_type_and_lengths[c.feature_name].second;
if (!c.shape.AsTensorShape(&row_shape) ||
expected_max_elements !=
(expected_max_elements / row_shape.num_elements()) *
row_shape.num_elements()) {
return errors::InvalidArgument("Unexpected shape error in feature ",
c.feature_name);
}
int64 expected_max_rows = expected_max_elements / row_shape.num_elements();
dense_shape.AddDim(num_examples);
dense_shape.AddDim(expected_max_rows);
for (const int dim : feature_list_config.dense[t].shape.dim_sizes()) {
dense_shape.AddDim(dim);
}
feature_list_result->dense_values[t] =
Tensor(allocator, dtype, dense_shape);
(*dense_feature_lengths)[t] =
Tensor(allocator, DT_INT64, dense_length_shape);
int64* out_lengths = (*dense_feature_lengths)[t].flat<int64>().data();
tstring* out_bytes = nullptr;
float* out_float = nullptr;
int64* out_int64 = nullptr;
switch (dtype) {
case DT_STRING:
out_bytes = feature_list_result->dense_values[t].flat<tstring>().data();
break;
case DT_FLOAT:
out_float = feature_list_result->dense_values[t].flat<float>().data();
break;
case DT_INT64:
out_int64 = feature_list_result->dense_values[t].flat<int64>().data();
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in feature ", c.feature_name);
}
t++;
// Fill in the values.
for (int e = 0; e < num_examples; e++) {
size_t num_elements = 0, num_rows = 0;
const auto feature_iter = all_sequence_features[e].find(c.feature_name);
const tstring& example_name =
example_names.empty() ? kUnknown : example_names[e];
if (feature_iter == all_sequence_features[e].end()) {
// Return an error if this feature was not allowed to be missing.
// Otherwise, we'll pad as needed below.
if (!c.variable_length) {
return errors::InvalidArgument("Missing feature ", c.feature_name,
" in example ", example_name);
}
} else if (!feature_iter->second.empty()) {
const auto& feature = feature_iter->second;
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(feature.data()), feature.size());
EnableAliasing(&stream);
while (!stream.ExpectAtEnd()) {
uint32 feature_length;
if (!stream.ExpectTag(kDelimitedTag(1)) ||
!stream.ReadVarint32(&feature_length)) {
return errors::InvalidArgument("Error in sequence feature ",
c.feature_name, " in example ",
example_name);
}
auto limit = stream.PushLimit(feature_length);
size_t num_added;
switch (dtype) {
case DT_STRING:
num_added = ParseBytesFeature(&stream, out_bytes);
out_bytes += num_added;
break;
case DT_FLOAT:
num_added = ParseFloatFeature(&stream, out_float);
out_float += num_added;
break;
case DT_INT64:
num_added = ParseInt64Feature(&stream, out_int64);
out_int64 += num_added;
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
num_elements += num_added;
num_rows++;
if (num_added != row_shape.num_elements()) {
return errors::InvalidArgument(
"Unexpected number of elements in feature ", c.feature_name,
", example ", example_name);
}
stream.PopLimit(limit);
}
}
*out_lengths++ = num_rows;
// Pad as necessary.
int num_to_pad = expected_max_elements - num_elements;
switch (dtype) {
case DT_STRING:
out_bytes += num_to_pad;
break;
case DT_FLOAT:
PadFloatFeature(num_to_pad, out_float);
out_float += num_to_pad;
break;
case DT_INT64:
PadInt64Feature(num_to_pad, out_int64);
out_int64 += num_to_pad;
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
}
}
t = 0;
for (const auto& c : feature_list_config.sparse) {
TensorShape indices_shape, values_shape;
DataType dtype = c.dtype;
size_t expected_num_elements =
sequence_feature_type_and_lengths[c.feature_name].second;
indices_shape.AddDim(expected_num_elements);
indices_shape.AddDim(3);
values_shape.AddDim(expected_num_elements);
feature_list_result->sparse_indices[t] =
Tensor(allocator, DT_INT64, indices_shape);
feature_list_result->sparse_values[t] =
Tensor(allocator, dtype, values_shape);
feature_list_result->sparse_shapes[t] =
Tensor(allocator, DT_INT64, TensorShape({3}));
tstring* out_bytes = nullptr;
float* out_float = nullptr;
int64* out_int64 = nullptr;
switch (dtype) {
case DT_STRING:
out_bytes =
feature_list_result->sparse_values[t].flat<tstring>().data();
break;
case DT_FLOAT:
out_float = feature_list_result->sparse_values[t].flat<float>().data();
break;
case DT_INT64:
out_int64 = feature_list_result->sparse_values[t].flat<int64>().data();
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in feature ", c.feature_name);
}
int64* out_indices =
feature_list_result->sparse_indices[t].flat<int64>().data();
auto out_shape = feature_list_result->sparse_shapes[t].vec<int64>();
t++;
// Fill in the values.
size_t num_elements = 0;
size_t max_num_rows = 0;
size_t max_num_cols = 0;
for (int e = 0; e < num_examples; e++) {
const auto& feature = all_sequence_features[e][c.feature_name];
const tstring& example_name =
example_names.empty() ? kUnknown : example_names[e];
if (!feature.empty()) {
protobuf::io::CodedInputStream stream(
reinterpret_cast<const uint8*>(feature.data()), feature.size());
EnableAliasing(&stream);
size_t num_rows = 0;
while (!stream.ExpectAtEnd()) {
uint32 feature_length;
if (!stream.ExpectTag(kDelimitedTag(1)) ||
!stream.ReadVarint32(&feature_length)) {
return errors::InvalidArgument("Error in sequence feature ",
c.feature_name, " in example ",
example_name);
}
if (feature_length > 2) {
auto limit = stream.PushLimit(feature_length);
size_t num_added;
switch (dtype) {
case DT_STRING:
num_added = ParseBytesFeature(&stream, out_bytes);
out_bytes += num_added;
break;
case DT_FLOAT:
num_added = ParseFloatFeature(&stream, out_float);
out_float += num_added;
break;
case DT_INT64:
num_added = ParseInt64Feature(&stream, out_int64);
out_int64 += num_added;
break;
default:
return errors::InvalidArgument("Unexpected dtype ", dtype,
" in example ", example_name);
}
num_elements += num_added;
max_num_cols = std::max(max_num_cols, num_added);
for (int i = 0; i < num_added; i++) {
*out_indices++ = e;
*out_indices++ = num_rows;
*out_indices++ = i;
}
stream.PopLimit(limit);
} else if (feature_length == 2) {
if (!SkipEmptyFeature(&stream, dtype)) {
return errors::InvalidArgument("Error in sequence feature ",
c.feature_name, " in example ",
example_name);
}
} else if (feature_length != 0) {
return errors::InvalidArgument("Error in sequence feature ",
c.feature_name, " in example ",
example_name);
}
num_rows++;
}
max_num_rows = std::max(max_num_rows, num_rows);
}
}
if (num_elements != expected_num_elements) {
return errors::InvalidArgument(
"Unexpected number of elements in feature ", c.feature_name);
}
out_shape(0) = num_examples;
out_shape(1) = max_num_rows;
out_shape(2) = max_num_cols;
}
return Status::OK();
}