bool MetricBatcher::publishBatchedData()

in cloudwatch_metrics_common/src/metric_batcher.cpp [46:117]


bool MetricBatcher::publishBatchedData() {
  static const char * const kFuncName = __func__;

  std::lock_guard<std::recursive_mutex> lk(mtx);

  // is there anything to send?
  if (getCurrentBatchSize() == 0) {
    AWS_LOGSTREAM_DEBUG(__func__, "Nothing batched to publish");
    return false;
  }

  auto metrics_to_publish = this->batched_data_;
  std::shared_ptr<Aws::DataFlow::BasicTask<MetricDatumCollection>> metric_task = std::make_shared<Aws::DataFlow::BasicTask<MetricDatumCollection>>(metrics_to_publish);

  // connect to the file manager to write to disk on fail / cancel
  if (metric_file_manager_ ) {

    // register the task failure function
    auto function = [&metric_file_manager = this->metric_file_manager_](const DataFlow::UploadStatus &upload_status,
                                                                        const MetricDatumCollection &metrics_to_publish)
    {
        if (!metrics_to_publish.empty()) {

          if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {

            // publish indicated the task data was bad, this task should be discarded
            AWS_LOG_WARN(kFuncName, "MetricBatcher: Task failed due to invalid metric data, dropping");

          } else if (DataFlow::UploadStatus::SUCCESS != upload_status) {

            AWS_LOG_INFO(kFuncName, "MetricBatcher: Task failed: writing metrics to file");
            metric_file_manager->write(metrics_to_publish);

          } else {
            AWS_LOG_DEBUG(kFuncName, "MetricBatcher: Task metric upload successful");
          }
        } else {
          AWS_LOG_INFO(kFuncName, "MetricBatcher: not publishing task as metrics_to_publish is empty");
        }
    };

    metric_task->setOnCompleteFunction(function);
  }

  // dont attempt to queue if not started
  if(ServiceState::STARTED != this->getState()) {
    AWS_LOG_WARN(__func__, "current service state is not Started, canceling task: %s", Service::getStatusString().c_str());
    metric_task->cancel();
    return false;
  }

  bool enqueue_success = false;

  // try to enqueue
  if (getSink()) {

    enqueue_success = getSink()->tryEnqueue(metric_task, this->getTryEnqueueDuration());

    if (!enqueue_success) {
      AWS_LOG_WARN(__func__, "Unable to enqueue data, canceling task");
    }

  } else {
    AWS_LOGSTREAM_WARN(__func__, "Unable to obtain queue, canceling task");
  }

  if (!enqueue_success) {
    metric_task->cancel();
  }
  this->resetBatchedData();
  return enqueue_success;
}