in cloudwatch_logs_common/src/log_batcher.cpp [45:116]
bool LogBatcher::publishBatchedData() {
static const char * const kFuncName = __func__;
std::lock_guard<std::recursive_mutex> lk(mtx);
// is there anything to send?
if (getCurrentBatchSize() == 0) {
AWS_LOGSTREAM_DEBUG(__func__, "LogBatcher: nothing batched to publish");
return false;
}
std::shared_ptr<LogCollection> log_type = this->batched_data_;
std::shared_ptr<Aws::DataFlow::BasicTask<LogCollection>> log_task = std::make_shared<Aws::DataFlow::BasicTask<LogCollection>>(log_type);
// connect to the log_file_manager_ to write to disk on task failure
if (log_file_manager_) {
// register the task failure function
auto function = [&log_file_manager = this->log_file_manager_](const DataFlow::UploadStatus &upload_status,
const LogCollection &log_messages)
{
if (!log_messages.empty()) {
if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
// publish indicated the task data was bad, this task should be discarded
AWS_LOG_WARN(kFuncName, "LogBatcher: Task failed due to invalid log data, dropping");
} else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
AWS_LOG_INFO(kFuncName, "LogBatcher: Task failed to upload: writing logs to file. Status = %d", upload_status);
log_file_manager->write(log_messages);
} else {
AWS_LOG_DEBUG(kFuncName, "LogBatcher: Task log upload successful");
}
} else {
AWS_LOG_INFO(kFuncName, "LogBatcher: not publishing task as log_messages is empty");
}
};
log_task->setOnCompleteFunction(function);
}
// dont attempt to queue if not started
if(ServiceState::STARTED != this->getState()) {
AWS_LOG_WARN(__func__, "current service state is not Started, canceling task: %s", Service::getStatusString().c_str());
log_task->cancel();
return false;
}
bool enqueue_success = false;
if (getSink()) {
enqueue_success = getSink()->tryEnqueue(log_task, this->getTryEnqueueDuration());
if (!enqueue_success) {
AWS_LOG_WARN(__func__, "Unable to enqueue log data, canceling task");
}
} else {
// if we can't queue, then cancel (write to disk)
AWS_LOGSTREAM_WARN(__func__, "Unable to obtain queue, canceling task");
}
if (!enqueue_success) {
log_task->cancel();
}
this->resetBatchedData();
return enqueue_success;
}