in tensorflow_io/core/kernels/avro/parse_avro_kernels.cc [169:401]
Status ParseAvro(const AvroParserConfig& config,
const AvroParserTree& parser_tree,
const avro::ValidSchema& reader_schema,
const gtl::ArraySlice<tstring>& serialized,
thread::ThreadPool* thread_pool, AvroResult* result) {
DCHECK(result != nullptr);
using clock = std::chrono::system_clock;
using ms = std::chrono::duration<double, std::milli>;
const auto before = clock::now();
// Allocate dense output for fixed length dense values
// (variable-length dense and sparse and ragged have to be buffered).
/* std::vector<Tensor> fixed_len_dense_values(config.dense.size());
for (size_t d = 0; d < config.dense.size(); ++d) {
if (config.dense[d].variable_length) continue;
TensorShape out_shape;
out_shape.AddDim(serialized.size());
for (const int64 dim : config.dense[d].shape.dim_sizes()) {
out_shape.AddDim(dim);
}
fixed_len_dense_values[d] = Tensor(config.dense[d].dtype, out_shape);
}*/
// This parameter affects performance in a big and data-dependent way.
const size_t kMiniBatchSizeBytes = 50000;
// avro_num_minibatches_ is int64 in the op interface. If not set
// the default value is 0.
size_t avro_num_minibatches_;
// Calculate number of minibatches.
// In main regime make each minibatch around kMiniBatchSizeBytes bytes.
// Apply 'special logic' below for small and big regimes.
const size_t num_minibatches = [&] {
size_t result = 0;
size_t minibatch_bytes = 0;
for (size_t i = 0; i < serialized.size(); i++) {
if (minibatch_bytes == 0) { // start minibatch
result++;
}
minibatch_bytes += serialized[i].size() + 1;
if (minibatch_bytes > kMiniBatchSizeBytes) {
minibatch_bytes = 0;
}
}
if (avro_num_minibatches_) {
VLOG(5) << "Overriding num_minibatches with " << avro_num_minibatches_;
result = avro_num_minibatches_;
}
// This is to ensure users can control the num minibatches all the way down
// to size of 1(no parallelism).
const size_t min_minibatches = std::min<size_t>(1, serialized.size());
const size_t max_minibatches = 64;
return std::max<size_t>(min_minibatches,
std::min<size_t>(max_minibatches, result));
}();
auto first_of_minibatch = [&](size_t minibatch) -> size_t {
return (serialized.size() * minibatch) / num_minibatches;
};
VLOG(5) << "Computed " << num_minibatches << " minibatches";
// TODO(lew): A big performance low-hanging fruit here is to improve
// num_minibatches calculation to take into account actual amount of work
// needed, as the size in bytes is not perfect. Linear combination of
// size in bytes and average number of features per example is promising.
// Even better: measure time instead of estimating, but this is too costly
// in small batches.
// Maybe accept outside parameter #num_minibatches?
// Do minibatches in parallel.
// TODO(fraudies): Convert dense tensor
// TODO(fraudies): Might be faster to reformat inside the process minibatch
// into vector
// Note, using vector here is thread safe since all operations inside the
// multi-threaded region for a vector are thread safe
std::vector<std::map<string, ValueStoreUniquePtr>> buffers(num_minibatches);
std::vector<Status> status_of_minibatch(num_minibatches);
const std::map<string, Tensor>& defaults = CreateTensorDefaults(config);
auto ProcessMiniBatch = [&](size_t minibatch) {
size_t start = first_of_minibatch(minibatch);
size_t end = first_of_minibatch(minibatch + 1);
StringDatumRangeReader range_reader(serialized, start, end);
auto read_value = [&](avro::GenericDatum& d) {
return range_reader.read(d);
};
VLOG(5) << "Processing minibatch " << minibatch;
status_of_minibatch[minibatch] = parser_tree.ParseValues(
&buffers[minibatch], read_value, reader_schema, defaults);
};
const auto before_parse = clock::now();
ParallelFor(ProcessMiniBatch, num_minibatches, thread_pool);
const auto after_parse = clock::now();
const ms parse_read_duration = after_parse - before_parse;
VLOG(5) << "PARSER_TIMING: Time spend reading and parsing "
<< parse_read_duration.count() << " ms ";
for (Status& status : status_of_minibatch) {
TF_RETURN_IF_ERROR(status);
}
result->sparse_indices.reserve(config.sparse.size());
result->sparse_values.reserve(config.sparse.size());
result->sparse_shapes.reserve(config.sparse.size());
result->dense_values.reserve(config.dense.size());
auto MergeSparseMinibatches = [&](size_t i_sparse) -> Status {
const AvroParserConfig::Sparse& sparse = config.sparse[i_sparse];
const string& feature_name = sparse.feature_name;
std::vector<ValueStoreUniquePtr> values(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
values[i] = std::move(buffers[i][feature_name]);
}
ValueStoreUniquePtr value_store;
TF_RETURN_IF_ERROR(MergeAs(value_store, values, sparse.dtype));
VLOG(5) << "Converting sparse feature " << feature_name;
VLOG(5) << "Contents of value store " << (*value_store).ToString(10);
TensorShape value_shape;
TF_RETURN_IF_ERROR((*value_store).GetSparseValueShape(&value_shape));
result->sparse_values.emplace_back(sparse.dtype, value_shape);
Tensor* sparse_tensor_values = &result->sparse_values.back();
TensorShape index_shape;
TF_RETURN_IF_ERROR((*value_store).GetSparseIndexShape(&index_shape));
result->sparse_indices.emplace_back(DT_INT64, index_shape);
Tensor* sparse_tensor_indices = &result->sparse_indices.back();
TF_RETURN_IF_ERROR(
(*value_store).MakeSparse(sparse_tensor_values, sparse_tensor_indices));
int64 rank = result->sparse_indices[i_sparse].dim_size(
1); // rank is the 2nd dimension of the index
result->sparse_shapes.emplace_back(DT_INT64, TensorShape({rank}));
Tensor* sparse_tensor_shapes = &result->sparse_shapes.back();
TF_RETURN_IF_ERROR(
(*value_store).GetDenseShapeForSparse(sparse_tensor_shapes));
VLOG(5) << "Sparse values: "
<< result->sparse_values[i_sparse].SummarizeValue(15);
VLOG(5) << "Sparse indices: "
<< result->sparse_indices[i_sparse].SummarizeValue(15);
VLOG(5) << "Sparse dense shapes: "
<< result->sparse_shapes[i_sparse].SummarizeValue(15);
VLOG(5) << "Value shape: " << value_shape;
VLOG(5) << "Index shape: " << index_shape;
VLOG(5) << "Sparse dense shapes shape: "
<< result->sparse_shapes[i_sparse].shape();
return Status::OK();
};
auto MergeDenseMinibatches = [&](size_t i_dense) -> Status {
// TODO(fraudies): Fixed allocation for fixed length
// if (!config.dense[d].variable_length) return;
const AvroParserConfig::Dense& dense = config.dense[i_dense];
const string& feature_name = dense.feature_name;
VLOG(5) << "Working on feature: '" << feature_name << "'";
std::vector<ValueStoreUniquePtr> values(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
values[i] = std::move(buffers[i][feature_name]);
VLOG(5) << "Value " << i << ": " << (*values[i]).ToString(10);
}
VLOG(5) << "Merge for dense type: " << DataTypeString(dense.dtype);
ValueStoreUniquePtr value_store;
TF_RETURN_IF_ERROR(MergeAs(value_store, values, dense.dtype));
VLOG(5) << "Merged value store: " << value_store->ToString(10);
size_t batch_size = serialized.size();
TensorShape default_shape;
Tensor default_value;
// If we can resolve the dense shape add batch, otherwise keep things as
// they are
if (ResolveDefaultShape(&default_shape, dense.default_value.shape(),
batch_size)) {
default_value = Tensor(dense.dtype, default_shape);
TF_RETURN_IF_ERROR(
tensor::Concat(std::vector<Tensor>(batch_size, dense.default_value),
&default_value));
} else {
default_value = dense.default_value;
default_shape = default_value.shape();
}
VLOG(5) << "Dense shape is " << dense.shape;
VLOG(5) << "Default shape is " << default_shape;
VLOG(5) << "Default value is " << default_value.SummarizeValue(9);
TensorShape resolved_shape;
TF_RETURN_IF_ERROR(
(*value_store)
.ResolveDenseShapeWithBatch(&resolved_shape, dense.shape,
default_shape, batch_size));
VLOG(5) << "Creating dense tensor for resolved shape: " << resolved_shape
<< " given the user shape " << dense.shape;
result->dense_values.emplace_back(dense.dtype, resolved_shape);
Tensor* dense_tensor = &result->dense_values.back();
TF_RETURN_IF_ERROR(
(*value_store).MakeDense(dense_tensor, resolved_shape, default_value));
VLOG(5) << "Dense tensor " << dense_tensor->SummarizeValue(9);
return Status::OK();
};
const auto before_sparse_merge = clock::now();
for (size_t d = 0; d < config.sparse.size(); ++d) {
TF_RETURN_IF_ERROR(MergeSparseMinibatches(d));
}
const auto after_sparse_merge = clock::now();
const ms s_merge_duration = after_sparse_merge - before_sparse_merge;
for (size_t d = 0; d < config.dense.size(); ++d) {
TF_RETURN_IF_ERROR(MergeDenseMinibatches(d));
}
const auto after_dense_merge = clock::now();
const ms d_merge_duration = after_dense_merge - after_sparse_merge;
VLOG(5) << "PARSER_TIMING: Sparse merge duration" << s_merge_duration.count()
<< " ms ";
VLOG(5) << "PARSER_TIMING: Dense merge duration" << d_merge_duration.count()
<< " ms ";
return Status::OK();
}