in cloudwatch_metrics_common/src/metric_batcher.cpp [46:117]
bool MetricBatcher::publishBatchedData() {
static const char * const kFuncName = __func__;
std::lock_guard<std::recursive_mutex> lk(mtx);
// is there anything to send?
if (getCurrentBatchSize() == 0) {
AWS_LOGSTREAM_DEBUG(__func__, "Nothing batched to publish");
return false;
}
auto metrics_to_publish = this->batched_data_;
std::shared_ptr<Aws::DataFlow::BasicTask<MetricDatumCollection>> metric_task = std::make_shared<Aws::DataFlow::BasicTask<MetricDatumCollection>>(metrics_to_publish);
// connect to the file manager to write to disk on fail / cancel
if (metric_file_manager_ ) {
// register the task failure function
auto function = [&metric_file_manager = this->metric_file_manager_](const DataFlow::UploadStatus &upload_status,
const MetricDatumCollection &metrics_to_publish)
{
if (!metrics_to_publish.empty()) {
if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
// publish indicated the task data was bad, this task should be discarded
AWS_LOG_WARN(kFuncName, "MetricBatcher: Task failed due to invalid metric data, dropping");
} else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
AWS_LOG_INFO(kFuncName, "MetricBatcher: Task failed: writing metrics to file");
metric_file_manager->write(metrics_to_publish);
} else {
AWS_LOG_DEBUG(kFuncName, "MetricBatcher: Task metric upload successful");
}
} else {
AWS_LOG_INFO(kFuncName, "MetricBatcher: not publishing task as metrics_to_publish is empty");
}
};
metric_task->setOnCompleteFunction(function);
}
// dont attempt to queue if not started
if(ServiceState::STARTED != this->getState()) {
AWS_LOG_WARN(__func__, "current service state is not Started, canceling task: %s", Service::getStatusString().c_str());
metric_task->cancel();
return false;
}
bool enqueue_success = false;
// try to enqueue
if (getSink()) {
enqueue_success = getSink()->tryEnqueue(metric_task, this->getTryEnqueueDuration());
if (!enqueue_success) {
AWS_LOG_WARN(__func__, "Unable to enqueue data, canceling task");
}
} else {
AWS_LOGSTREAM_WARN(__func__, "Unable to obtain queue, canceling task");
}
if (!enqueue_success) {
metric_task->cancel();
}
this->resetBatchedData();
return enqueue_success;
}