lib/stats/ProducerStatsImpl.cc (97 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include "ProducerStatsImpl.h"
#include <array>
#include "lib/ExecutorService.h"
#include "lib/LogUtils.h"
#include "lib/Utils.h"
namespace pulsar {
DECLARE_LOG_OBJECT();
static const std::array<double, 4> probs = {{0.5, 0.9, 0.99, 0.999}};
std::string ProducerStatsImpl::latencyToString(const LatencyAccumulator& obj) {
    boost::accumulators::detail::extractor_result<
        LatencyAccumulator, boost::accumulators::tag::extended_p_square>::type latencies =
        boost::accumulators::extended_p_square(obj);
    std::stringstream os;
    os << "Latencies [ 50pct: " << latencies[0] / 1e3 << "ms"
       << ", 90pct: " << latencies[1] / 1e3 << "ms"
       << ", 99pct: " << latencies[2] / 1e3 << "ms"
       << ", 99.9pct: " << latencies[3] / 1e3 << "ms"
       << "]";
    return os.str();
}
ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, ExecutorServicePtr executor,
                                     unsigned int statsIntervalInSeconds)
    : producerStr_(producerStr),
      latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
      totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
      timer_(executor->createDeadlineTimer()),
      statsIntervalInSeconds_(statsIntervalInSeconds) {}
ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
    : producerStr_(stats.producerStr_),
      numMsgsSent_(stats.numMsgsSent_),
      numBytesSent_(stats.numBytesSent_),
      sendMap_(stats.sendMap_),
      latencyAccumulator_(stats.latencyAccumulator_),
      totalMsgsSent_(stats.totalMsgsSent_),
      totalBytesSent_(stats.totalBytesSent_),
      totalSendMap_(stats.totalSendMap_),
      totalLatencyAccumulator_(stats.totalLatencyAccumulator_),
      statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}
void ProducerStatsImpl::start() { scheduleTimer(); }
void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
    if (ec) {
        LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
        return;
    }
    std::unique_lock<std::mutex> lock(mutex_);
    std::ostringstream oss;
    oss << *this;
    numMsgsSent_ = 0;
    numBytesSent_ = 0;
    sendMap_.clear();
    latencyAccumulator_ =
        LatencyAccumulator(boost::accumulators::tag::extended_p_square::probabilities = probs);
    lock.unlock();
    scheduleTimer();
    LOG_INFO(oss.str());
}
void ProducerStatsImpl::messageSent(const Message& msg) {
    std::lock_guard<std::mutex> lock(mutex_);
    numMsgsSent_++;
    totalMsgsSent_++;
    numBytesSent_ += msg.getLength();
    totalBytesSent_ += msg.getLength();
}
void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) {
    boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time();
    double diffInMicros = (currentTime - publishTime).total_microseconds();
    std::lock_guard<std::mutex> lock(mutex_);
    totalLatencyAccumulator_(diffInMicros);
    latencyAccumulator_(diffInMicros);
    sendMap_[res] += 1;       // Value will automatically be initialized to 0 in the constructor
    totalSendMap_[res] += 1;  // Value will automatically be initialized to 0 in the constructor
}
ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
void ProducerStatsImpl::scheduleTimer() {
    timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
    std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
    timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
        auto self = weakSelf.lock();
        if (!self) {
            return;
        }
        flushAndReset(ec);
    });
}
std::ostream& operator<<(std::ostream& os, const ProducerStatsImpl& obj) {
    os << "Producer " << obj.producerStr_ << ", ProducerStatsImpl ("
       << "numMsgsSent_ = " << obj.numMsgsSent_ << ", numBytesSent_ = " << obj.numBytesSent_
       << ", sendMap_ = " << obj.sendMap_
       << ", latencyAccumulator_ = " << ProducerStatsImpl::latencyToString(obj.latencyAccumulator_)
       << ", totalMsgsSent_ = " << obj.totalMsgsSent_ << ", totalBytesSent_ = " << obj.totalBytesSent_
       << ", totalAcksReceived_ = "
       << ", totalSendMap_ = " << obj.totalSendMap_
       << ", totalLatencyAccumulator_ = " << ProducerStatsImpl::latencyToString(obj.totalLatencyAccumulator_)
       << ")";
    return os;
}
}  // namespace pulsar