in aws/kinesis/core/kinesis_producer.cc [282:346]
void KinesisProducer::on_metrics_request(
const aws::kinesis::protobuf::Message& m) {
auto req = m.metrics_request();
std::vector<std::shared_ptr<aws::metrics::Metric>> metrics;
// filter by name, if necessary
if (req.has_name()) {
for (auto& metric : metrics_manager_->all_metrics()) {
auto dims = metric->all_dimensions();
assert(!dims.empty());
assert(dims.at(0).first == "MetricName");
if (dims.at(0).second == req.name()) {
metrics.push_back(metric);
}
}
} else {
metrics = metrics_manager_->all_metrics();
}
// convert the data into protobuf
aws::kinesis::protobuf::Message reply;
reply.set_id(::rand());
reply.set_source_id(m.id());
auto res = reply.mutable_metrics_response();
for (auto& metric : metrics) {
auto dims = metric->all_dimensions();
assert(!dims.empty());
assert(dims.at(0).first == "MetricName");
auto pm = res->add_metrics();
pm->set_name(dims.at(0).second);
for (size_t i = 1; i < dims.size(); i++) {
auto d = pm->add_dimensions();
d->set_key(dims[i].first);
d->set_value(dims[i].second);
}
auto& accum = metric->accumulator();
auto stats = pm->mutable_stats();
if (req.has_seconds()) {
auto s = req.seconds();
stats->set_count(accum.count(s));
stats->set_sum(accum.sum(s));
stats->set_min(accum.min(s));
stats->set_max(accum.max(s));
stats->set_mean(accum.mean(s));
pm->set_seconds(s);
} else {
stats->set_count(accum.count());
stats->set_sum(accum.sum());
stats->set_min(accum.min());
stats->set_max(accum.max());
stats->set_mean(accum.mean());
pm->set_seconds(accum.elapsed<std::chrono::seconds>());
}
}
ipc_manager_->put(reply.SerializeAsString());
}