in cloudwatch_logs_common/src/utils/log_file_manager.cpp [36:90]
FileObject<LogCollection> LogFileManager::readBatch(
size_t batch_size)
{
FileManagement::DataToken data_token;
AWS_LOG_INFO(__func__, "Reading Logbatch");
std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>> pq;
for (size_t i = 0; i < batch_size; ++i) {
std::string line;
if (!file_manager_strategy_->isDataAvailable()) {
break;
}
data_token = read(line);
Aws::String aws_line(line.c_str());
Aws::Utils::Json::JsonValue value(aws_line);
Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
pq.push(std::make_tuple(input_event.GetTimestamp(), line, data_token));
}
latestTime = std::get<0>(pq.top());
LogCollection log_data;
std::list<FileManagement::DataToken> data_tokens;
while(!pq.empty()){
Timestamp curTime = std::get<0>(pq.top());
std::string line = std::get<1>(pq.top());
FileManagement::DataToken new_data_token = std::get<2>(pq.top());
if(latestTime - curTime < ONE_DAY_IN_MILLISEC){
Aws::String aws_line(line.c_str());
Aws::Utils::Json::JsonValue value(aws_line);
Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
log_data.push_front(input_event);
data_tokens.push_back(new_data_token);
}
else if(file_manager_strategy_->isDeleteStaleData() && latestTime - curTime > TWO_WEEK_IN_MILLISEC){
{
std::lock_guard<std::mutex> lock(active_delete_stale_data_mutex_);
stale_data_.push_back(new_data_token);
}
}
pq.pop();
}
if(batch_size != log_data.size()){
AWS_LOG_WARN(__func__, "%d logs were not batched since the time"
" difference was > 24 hours. Will try again in a separate batch."
, batch_size - log_data.size()
);
}
FileObject<LogCollection> file_object;
file_object.batch_data = log_data;
file_object.batch_size = log_data.size();
file_object.data_tokens = data_tokens;
return file_object;
}