Status ParseAvro()

in tensorflow_io/core/kernels/avro/parse_avro_kernels.cc [169:401]


Status ParseAvro(const AvroParserConfig& config,
                 const AvroParserTree& parser_tree,
                 const avro::ValidSchema& reader_schema,
                 const gtl::ArraySlice<tstring>& serialized,
                 thread::ThreadPool* thread_pool, AvroResult* result) {
  DCHECK(result != nullptr);
  using clock = std::chrono::system_clock;
  using ms = std::chrono::duration<double, std::milli>;
  const auto before = clock::now();
  // Allocate dense output for fixed length dense values
  // (variable-length dense and sparse and ragged have to be buffered).
  /*  std::vector<Tensor> fixed_len_dense_values(config.dense.size());
    for (size_t d = 0; d < config.dense.size(); ++d) {
      if (config.dense[d].variable_length) continue;
      TensorShape out_shape;
      out_shape.AddDim(serialized.size());
      for (const int64 dim : config.dense[d].shape.dim_sizes()) {
        out_shape.AddDim(dim);
      }
      fixed_len_dense_values[d] = Tensor(config.dense[d].dtype, out_shape);
    }*/

  // This parameter affects performance in a big and data-dependent way.
  const size_t kMiniBatchSizeBytes = 50000;

  // avro_num_minibatches_ is int64 in the op interface. If not set
  // the default value is 0.
  size_t avro_num_minibatches_;

  // Calculate number of minibatches.
  // In main regime make each minibatch around kMiniBatchSizeBytes bytes.
  // Apply 'special logic' below for small and big regimes.
  const size_t num_minibatches = [&] {
    size_t result = 0;
    size_t minibatch_bytes = 0;
    for (size_t i = 0; i < serialized.size(); i++) {
      if (minibatch_bytes == 0) {  // start minibatch
        result++;
      }
      minibatch_bytes += serialized[i].size() + 1;
      if (minibatch_bytes > kMiniBatchSizeBytes) {
        minibatch_bytes = 0;
      }
    }
    if (avro_num_minibatches_) {
      VLOG(5) << "Overriding num_minibatches with " << avro_num_minibatches_;
      result = avro_num_minibatches_;
    }
    // This is to ensure users can control the num minibatches all the way down
    // to size of 1(no parallelism).
    const size_t min_minibatches = std::min<size_t>(1, serialized.size());
    const size_t max_minibatches = 64;
    return std::max<size_t>(min_minibatches,
                            std::min<size_t>(max_minibatches, result));
  }();

  auto first_of_minibatch = [&](size_t minibatch) -> size_t {
    return (serialized.size() * minibatch) / num_minibatches;
  };

  VLOG(5) << "Computed " << num_minibatches << " minibatches";

  // TODO(lew): A big performance low-hanging fruit here is to improve
  //   num_minibatches calculation to take into account actual amount of work
  //   needed, as the size in bytes is not perfect. Linear combination of
  //   size in bytes and average number of features per example is promising.
  //   Even better: measure time instead of estimating, but this is too costly
  //   in small batches.
  //   Maybe accept outside parameter #num_minibatches?

  // Do minibatches in parallel.
  // TODO(fraudies): Convert dense tensor
  // TODO(fraudies): Might be faster to reformat inside the process minibatch
  // into vector

  // Note, using vector here is thread safe since all operations inside the
  // multi-threaded region for a vector are thread safe
  std::vector<std::map<string, ValueStoreUniquePtr>> buffers(num_minibatches);

  std::vector<Status> status_of_minibatch(num_minibatches);

  const std::map<string, Tensor>& defaults = CreateTensorDefaults(config);

  auto ProcessMiniBatch = [&](size_t minibatch) {
    size_t start = first_of_minibatch(minibatch);
    size_t end = first_of_minibatch(minibatch + 1);
    StringDatumRangeReader range_reader(serialized, start, end);
    auto read_value = [&](avro::GenericDatum& d) {
      return range_reader.read(d);
    };
    VLOG(5) << "Processing minibatch " << minibatch;
    status_of_minibatch[minibatch] = parser_tree.ParseValues(
        &buffers[minibatch], read_value, reader_schema, defaults);
  };
  const auto before_parse = clock::now();
  ParallelFor(ProcessMiniBatch, num_minibatches, thread_pool);
  const auto after_parse = clock::now();
  const ms parse_read_duration = after_parse - before_parse;
  VLOG(5) << "PARSER_TIMING: Time spend reading and parsing "
          << parse_read_duration.count() << " ms ";
  for (Status& status : status_of_minibatch) {
    TF_RETURN_IF_ERROR(status);
  }

  result->sparse_indices.reserve(config.sparse.size());
  result->sparse_values.reserve(config.sparse.size());
  result->sparse_shapes.reserve(config.sparse.size());
  result->dense_values.reserve(config.dense.size());

  auto MergeSparseMinibatches = [&](size_t i_sparse) -> Status {
    const AvroParserConfig::Sparse& sparse = config.sparse[i_sparse];
    const string& feature_name = sparse.feature_name;

    std::vector<ValueStoreUniquePtr> values(buffers.size());
    for (size_t i = 0; i < buffers.size(); ++i) {
      values[i] = std::move(buffers[i][feature_name]);
    }
    ValueStoreUniquePtr value_store;
    TF_RETURN_IF_ERROR(MergeAs(value_store, values, sparse.dtype));

    VLOG(5) << "Converting sparse feature " << feature_name;
    VLOG(5) << "Contents of value store " << (*value_store).ToString(10);

    TensorShape value_shape;
    TF_RETURN_IF_ERROR((*value_store).GetSparseValueShape(&value_shape));
    result->sparse_values.emplace_back(sparse.dtype, value_shape);
    Tensor* sparse_tensor_values = &result->sparse_values.back();
    TensorShape index_shape;
    TF_RETURN_IF_ERROR((*value_store).GetSparseIndexShape(&index_shape));
    result->sparse_indices.emplace_back(DT_INT64, index_shape);
    Tensor* sparse_tensor_indices = &result->sparse_indices.back();
    TF_RETURN_IF_ERROR(
        (*value_store).MakeSparse(sparse_tensor_values, sparse_tensor_indices));

    int64 rank = result->sparse_indices[i_sparse].dim_size(
        1);  // rank is the 2nd dimension of the index
    result->sparse_shapes.emplace_back(DT_INT64, TensorShape({rank}));
    Tensor* sparse_tensor_shapes = &result->sparse_shapes.back();
    TF_RETURN_IF_ERROR(
        (*value_store).GetDenseShapeForSparse(sparse_tensor_shapes));

    VLOG(5) << "Sparse values: "
            << result->sparse_values[i_sparse].SummarizeValue(15);
    VLOG(5) << "Sparse indices: "
            << result->sparse_indices[i_sparse].SummarizeValue(15);
    VLOG(5) << "Sparse dense shapes: "
            << result->sparse_shapes[i_sparse].SummarizeValue(15);
    VLOG(5) << "Value shape: " << value_shape;
    VLOG(5) << "Index shape: " << index_shape;
    VLOG(5) << "Sparse dense shapes shape: "
            << result->sparse_shapes[i_sparse].shape();

    return Status::OK();
  };

  auto MergeDenseMinibatches = [&](size_t i_dense) -> Status {
    // TODO(fraudies): Fixed allocation for fixed length
    // if (!config.dense[d].variable_length) return;
    const AvroParserConfig::Dense& dense = config.dense[i_dense];
    const string& feature_name = dense.feature_name;

    VLOG(5) << "Working on feature: '" << feature_name << "'";

    std::vector<ValueStoreUniquePtr> values(buffers.size());
    for (size_t i = 0; i < buffers.size(); ++i) {
      values[i] = std::move(buffers[i][feature_name]);
      VLOG(5) << "Value " << i << ": " << (*values[i]).ToString(10);
    }

    VLOG(5) << "Merge for dense type: " << DataTypeString(dense.dtype);

    ValueStoreUniquePtr value_store;
    TF_RETURN_IF_ERROR(MergeAs(value_store, values, dense.dtype));

    VLOG(5) << "Merged value store: " << value_store->ToString(10);

    size_t batch_size = serialized.size();
    TensorShape default_shape;
    Tensor default_value;
    // If we can resolve the dense shape add batch, otherwise keep things as
    // they are
    if (ResolveDefaultShape(&default_shape, dense.default_value.shape(),
                            batch_size)) {
      default_value = Tensor(dense.dtype, default_shape);
      TF_RETURN_IF_ERROR(
          tensor::Concat(std::vector<Tensor>(batch_size, dense.default_value),
                         &default_value));
    } else {
      default_value = dense.default_value;
      default_shape = default_value.shape();
    }

    VLOG(5) << "Dense shape is " << dense.shape;
    VLOG(5) << "Default shape is " << default_shape;
    VLOG(5) << "Default value is " << default_value.SummarizeValue(9);

    TensorShape resolved_shape;
    TF_RETURN_IF_ERROR(
        (*value_store)
            .ResolveDenseShapeWithBatch(&resolved_shape, dense.shape,
                                        default_shape, batch_size));

    VLOG(5) << "Creating dense tensor for resolved shape: " << resolved_shape
            << " given the user shape " << dense.shape;

    result->dense_values.emplace_back(dense.dtype, resolved_shape);
    Tensor* dense_tensor = &result->dense_values.back();

    TF_RETURN_IF_ERROR(
        (*value_store).MakeDense(dense_tensor, resolved_shape, default_value));

    VLOG(5) << "Dense tensor " << dense_tensor->SummarizeValue(9);

    return Status::OK();
  };
  const auto before_sparse_merge = clock::now();
  for (size_t d = 0; d < config.sparse.size(); ++d) {
    TF_RETURN_IF_ERROR(MergeSparseMinibatches(d));
  }
  const auto after_sparse_merge = clock::now();
  const ms s_merge_duration = after_sparse_merge - before_sparse_merge;
  for (size_t d = 0; d < config.dense.size(); ++d) {
    TF_RETURN_IF_ERROR(MergeDenseMinibatches(d));
  }
  const auto after_dense_merge = clock::now();
  const ms d_merge_duration = after_dense_merge - after_sparse_merge;
  VLOG(5) << "PARSER_TIMING: Sparse merge duration" << s_merge_duration.count()
          << " ms ";

  VLOG(5) << "PARSER_TIMING: Dense merge duration" << d_merge_duration.count()
          << " ms ";
  return Status::OK();
}