in PriorityQueue.cpp [966:1209]
bool PriorityQueue::save(std::unique_lock<std::mutex>& lock, long save_delay, bool final_save) {
update_min_seq();
if (final_save) {
// Flush non-empty current_buckets into unsaved
flush_current_buckets();
}
struct statvfs st;
::memset(&st, 0, sizeof(st));
uint64_t fs_bytes_allowed = 0;
if (save_needed(save_delay)) {
// Unlock while getting fs stats
lock.unlock();
if (statvfs(_data_dir.c_str(), &st) != 0) {
Logger::Error("PriorityQueue::save(): statvfs(%s) failed: %s", _data_dir.c_str(),
std::strerror(errno));
st.f_blocks = 0;
}
// Relock
lock.lock();
if (st.f_blocks > 0) {
// Total filesystem size
double fs_size = static_cast<double>(st.f_blocks) * static_cast<double>(st.f_frsize);
// Amount of free space
double fs_free = static_cast<double>(st.f_bavail) * static_cast<double>(st.f_bsize);
// Percent of free space
double pct_free = fs_free / fs_size;
// Percent of fs that can be used (based on _min_fs_free_pct);
double pct_free_avail = 0;
if (pct_free > (_min_fs_free_pct/100)) {
pct_free_avail = pct_free - (_min_fs_free_pct/100);
}
// Max space that can be used based on _max_fs_consumed_pct
uint64_t max_allowed_fs = 0;
// It is theoretically possible for fs_size to exceed the limits of uint64_t
if (static_cast<double>(_max_fs_consumed_bytes) < fs_size * (_max_fs_consumed_pct / 100)) {
max_allowed_fs = _max_fs_consumed_bytes;
} else {
max_allowed_fs = static_cast<uint64_t>(fs_size * (_max_fs_consumed_pct / 100));
}
// Max space that can be used based on _min_fs_free_pct
uint64_t max_allowed_free = 0;
// It is theoretically possible for fs_size to exceed the limits of uint64_t
if (static_cast<double>(_max_fs_consumed_bytes) < fs_size * (pct_free_avail / 100)) {
max_allowed_free = _max_fs_consumed_bytes;
} else {
max_allowed_free = static_cast<uint64_t>(fs_size * (pct_free_avail / 100));
}
// Minimum of all possible fs limits
fs_bytes_allowed = std::min(max_allowed_fs, max_allowed_free);
_stats._fs_size = fs_size;
_stats._fs_free = fs_free;
_stats._fs_allowed_bytes = fs_bytes_allowed;
}
}
std::vector<std::shared_ptr<QueueFile>> to_remove;
std::vector<std::shared_ptr<QueueFile>> can_remove;
std::vector<std::shared_ptr<QueueFile>> unsaved_to_remove;
uint64_t bytes_saved = 0;
bool have_saved_data = false;
// Find files that are no longer needed and count total bytes saved (excluding those that will be deleted)
// Also fill in can_remove with items in the order they can be removed to make space for higher priority data
for (int32_t p = _files.size()-1; p >= 0; --p) {
auto min_seq = _min_seq[p];
auto &pf = _files[p];
for (auto& f : pf) {
if (f.second->Saved()) {
bytes_saved += f.second->FileSize();
if (f.first <= min_seq) {
to_remove.emplace_back(f.second);
} else {
have_saved_data = true;
can_remove.emplace_back(f.second);
}
} else {
if (f.first <= min_seq) {
_unsaved[p].erase(f.second->Sequence());
unsaved_to_remove.emplace_back(f.second);
}
}
}
}
// Remove unsaved files that are not needed
for (auto& f : unsaved_to_remove) {
_files[f->Priority()].erase(f->Sequence());
}
std::vector<_UnsavedEntry> to_save;
auto now = std::chrono::steady_clock::now();
auto min_age = now - std::chrono::milliseconds(save_delay);
// Set min_age to now if closed
if (_closed) {
min_age = now;
}
// Get the list of buckets that can be saved, in the order they need to be saved.
for (auto& p : _unsaved) {
uint64_t last_seq = 0xFFFFFFFFFFFFFFFF;
if (!p.empty()) {
last_seq = p.rbegin()->first;
}
for (auto& f: p) {
// If the entry is not the last or it is older than min_age then include in to_save
if (f.first != last_seq || f.second._ts <= min_age) {
to_save.emplace_back(f.second);
}
}
}
// Get cursors to save
std::vector<QueueCursorFile> cursors_to_save;
std::vector<QueueCursorFile> cursors_to_remove;
if (have_saved_data || !to_save.empty()) {
// Only save the cursors if there are files saved to disk
for (auto &e : _cursors) {
if (e.second->_need_save || !(e.second->_saved)) {
cursors_to_save.emplace_back(e.second->_path, e.second->_committed);
e.second->_need_save = false;
e.second->_saved = true;
}
}
} else {
// Remove cursor files if there is no data saved to disk
for (auto &e : _cursors) {
if (e.second->_saved) {
cursors_to_remove.emplace_back(e.second->_path);
e.second->_saved = false;
}
}
}
// Unlock before doing IO
lock.unlock();
std::vector<std::shared_ptr<QueueFile>> removed;
std::vector<std::shared_ptr<QueueFile>> saved;
// Remove files that are not needed
for (auto& f : to_remove) {
if (f->Remove()) {
removed.emplace_back(f);
bytes_saved -= f->FileSize();
}
}
int ridx = 0;
int sidx = 0;
uint64_t bytes_removed = 0;
uint64_t cannot_save_bytes = 0;
bool save_failed = false;
// Iterate through to_save
// for each bucket to save, if the save would exceed the quote, remove from can_remove until below quota
for (; sidx < to_save.size(); ++sidx) {
auto& ue = to_save[sidx];
if (bytes_saved + ue._file->FileSize() > fs_bytes_allowed) {
// Loop through can_remove, but stop at first higher priority file.
while (ridx < can_remove.size() && bytes_saved + ue._file->FileSize() > fs_bytes_allowed && can_remove[ridx]->Priority() >= ue._file->Priority()) {
auto& remove_target = can_remove[ridx];
if (remove_target->Remove()) {
removed.emplace_back(remove_target);
bytes_saved -= remove_target->FileSize();
bytes_removed += remove_target->FileSize();
ridx += 1;
_stats._priority_stats[remove_target->Priority()]._bytes_dropped += remove_target->DataSize();
} else {
// Remove failed, do not proceed
save_failed = true;
break;
}
}
}
if (bytes_saved + ue._file->FileSize() > fs_bytes_allowed) {
// Either remove failed, or non enough lower priority data could be removed to make room for this file.
break;
} else {
if (ue._file->Save()) {
saved.emplace_back(ue._file);
bytes_saved += ue._file->FileSize();
_stats._priority_stats[ue._file->Priority()]._bytes_written += ue._file->FileSize();
} else {
// Save failed, do not proceed
break;
}
}
}
// Tally up unsaved bytes count
for (; sidx < to_save.size(); ++sidx) {
cannot_save_bytes += to_save[sidx]._file->FileSize();
}
// Save (or remove) cursors
for (auto &cfile : cursors_to_remove) {
cfile.Remove();
}
for (auto &cfile : cursors_to_save) {
cfile.Write();
}
// Relock before removing items from _unsaved;
lock.lock();
// erase from _files and _unsaved the files that where removed.
for (auto& f : removed) {
_files[f->Priority()].erase(f->Sequence());
_unsaved[f->Priority()].erase(f->Sequence());
}
// erase from _unsaved the files that where saved.
for (auto& f : saved) {
_unsaved[f->Priority()].erase(f->Sequence());
}
if (bytes_removed > 0) {
Logger::Warn("PriorityQueue: Removed (%ld) bytes of unconsumed lower priority data to make room for new higher priority data", bytes_removed);
}
if (cannot_save_bytes > 0) {
if (now - _last_save_warning > std::chrono::milliseconds(MIN_SAVE_WARNING_GAP_MS)) {
_last_save_warning = now;
if (save_failed) {
Logger::Warn("PriorityQueue: Errors encountered while saving data, (%ld) bytes left unsaved", cannot_save_bytes);
} else {
Logger::Warn("PriorityQueue: File System quota (%ld) would be exceeded, (%ld) bytes left unsaved", fs_bytes_allowed, cannot_save_bytes);
}
}
}
return cannot_save_bytes == 0;
}