void KinesisProducer::on_metrics_request()

in aws/kinesis/core/kinesis_producer.cc [282:346]


void KinesisProducer::on_metrics_request(
    const aws::kinesis::protobuf::Message& m) {
  auto req = m.metrics_request();
  std::vector<std::shared_ptr<aws::metrics::Metric>> metrics;

  // filter by name, if necessary
  if (req.has_name()) {
    for (auto& metric : metrics_manager_->all_metrics()) {
      auto dims = metric->all_dimensions();

      assert(!dims.empty());
      assert(dims.at(0).first == "MetricName");

      if (dims.at(0).second == req.name()) {
        metrics.push_back(metric);
      }
    }
  } else {
    metrics = metrics_manager_->all_metrics();
  }

  // convert the data into protobuf
  aws::kinesis::protobuf::Message reply;
  reply.set_id(::rand());
  reply.set_source_id(m.id());
  auto res = reply.mutable_metrics_response();

  for (auto& metric : metrics) {
    auto dims = metric->all_dimensions();

    assert(!dims.empty());
    assert(dims.at(0).first == "MetricName");

    auto pm = res->add_metrics();
    pm->set_name(dims.at(0).second);

    for (size_t i = 1; i < dims.size(); i++) {
      auto d = pm->add_dimensions();
      d->set_key(dims[i].first);
      d->set_value(dims[i].second);
    }

    auto& accum = metric->accumulator();
    auto stats = pm->mutable_stats();

    if (req.has_seconds()) {
      auto s = req.seconds();
      stats->set_count(accum.count(s));
      stats->set_sum(accum.sum(s));
      stats->set_min(accum.min(s));
      stats->set_max(accum.max(s));
      stats->set_mean(accum.mean(s));
      pm->set_seconds(s);
    } else {
      stats->set_count(accum.count());
      stats->set_sum(accum.sum());
      stats->set_min(accum.min());
      stats->set_max(accum.max());
      stats->set_mean(accum.mean());
      pm->set_seconds(accum.elapsed<std::chrono::seconds>());
    }
  }

  ipc_manager_->put(reply.SerializeAsString());
}