in file_management/include/file_management/file_upload/file_upload_streamer.h [152:200]
inline void work() override {
if (!stored_task_) {
AWS_LOG_DEBUG(__func__,
"Waiting for files and work.");
auto wait_result = status_condition_monitor_.waitForWork(status_monitor_timeout_);
// is there data available?
if (wait_result == std::cv_status::timeout) {
if (!data_reader_->isDataAvailableToRead()) {
AWS_LOG_DEBUG(__func__, "Timed out when waiting for work, no data available to read");
return;
}
AWS_LOG_DEBUG(__func__, "Timed out when waiting for work, but data available to read: attempting to publish");
// otherwise attempt to publish as only the network is down but we have data to send
}
if (!OutputStage<TaskPtr<T>>::getSink()) {
AWS_LOG_WARN(__func__,
"No Sink Configured");
return;
}
AWS_LOG_DEBUG(__func__,
"Found work, batching");
FileObject<T> file_object = data_reader_->readBatch(batch_size_);
total_logs_uploaded += file_object.batch_size; // todo this is attempted, not truly uploaded
stored_task_ = std::make_shared<FileUploadTask<T>>(
std::move(file_object),
std::bind(
&FileUploadStreamer<T>::onComplete,
this,
std::placeholders::_1,
std::placeholders::_2));
} else {
AWS_LOG_DEBUG(__func__,
"Previous task found, retrying upload.");
}
auto is_accepted = OutputStage<TaskPtr<T>>::getSink()->tryEnqueue(stored_task_, kTimeout);
if (is_accepted) {
AWS_LOG_DEBUG(__func__,
"Enqueue_accepted");
stored_task_ = nullptr;
} else {
AWS_LOG_DEBUG(__func__,
"Enqueue failed");
}
data_reader_->deleteStaleData();
}