bool PriorityQueue::save()

in PriorityQueue.cpp [966:1209]


bool PriorityQueue::save(std::unique_lock<std::mutex>& lock, long save_delay, bool final_save) {
    update_min_seq();

    if (final_save) {
        // Flush non-empty current_buckets into unsaved
        flush_current_buckets();
    }

    struct statvfs st;
    ::memset(&st, 0, sizeof(st));

    uint64_t fs_bytes_allowed = 0;
    if (save_needed(save_delay)) {
        // Unlock while getting fs stats
        lock.unlock();

        if (statvfs(_data_dir.c_str(), &st) != 0) {
            Logger::Error("PriorityQueue::save(): statvfs(%s) failed: %s", _data_dir.c_str(),
                          std::strerror(errno));
            st.f_blocks = 0;
        }

        // Relock
        lock.lock();

        if (st.f_blocks > 0) {
            // Total filesystem size
            double fs_size = static_cast<double>(st.f_blocks) * static_cast<double>(st.f_frsize);
            // Amount of free space
            double fs_free = static_cast<double>(st.f_bavail) * static_cast<double>(st.f_bsize);
            // Percent of free space
            double pct_free = fs_free / fs_size;
            // Percent of fs that can be used (based on _min_fs_free_pct);
            double pct_free_avail = 0;
            if (pct_free > (_min_fs_free_pct/100)) {
                pct_free_avail = pct_free - (_min_fs_free_pct/100);
            }

            // Max space that can be used based on _max_fs_consumed_pct
            uint64_t max_allowed_fs = 0;
            // It is theoretically possible for fs_size to exceed the limits of uint64_t
            if (static_cast<double>(_max_fs_consumed_bytes) < fs_size * (_max_fs_consumed_pct / 100)) {
                max_allowed_fs = _max_fs_consumed_bytes;
            } else {
                max_allowed_fs = static_cast<uint64_t>(fs_size * (_max_fs_consumed_pct / 100));
            }
            // Max space that can be used based on _min_fs_free_pct
            uint64_t max_allowed_free = 0;
            // It is theoretically possible for fs_size to exceed the limits of uint64_t
            if (static_cast<double>(_max_fs_consumed_bytes) < fs_size * (pct_free_avail / 100)) {
                max_allowed_free = _max_fs_consumed_bytes;
            } else {
                max_allowed_free = static_cast<uint64_t>(fs_size * (pct_free_avail / 100));
            }
            // Minimum of all possible fs limits
            fs_bytes_allowed = std::min(max_allowed_fs, max_allowed_free);

            _stats._fs_size = fs_size;
            _stats._fs_free = fs_free;
            _stats._fs_allowed_bytes = fs_bytes_allowed;
        }
    }

    std::vector<std::shared_ptr<QueueFile>> to_remove;
    std::vector<std::shared_ptr<QueueFile>> can_remove;
    std::vector<std::shared_ptr<QueueFile>> unsaved_to_remove;

    uint64_t bytes_saved = 0;
    bool have_saved_data = false;
    // Find files that are no longer needed and count total bytes saved (excluding those that will be deleted)
    // Also fill in can_remove with items in the order they can be removed to make space for higher priority data
    for (int32_t p = _files.size()-1; p >= 0; --p) {
        auto min_seq = _min_seq[p];
        auto &pf = _files[p];
        for (auto& f : pf) {
            if (f.second->Saved()) {
                bytes_saved += f.second->FileSize();
                if (f.first <= min_seq) {
                    to_remove.emplace_back(f.second);
                } else {
                    have_saved_data = true;
                    can_remove.emplace_back(f.second);
                }
            } else {
                if (f.first <= min_seq) {
                    _unsaved[p].erase(f.second->Sequence());
                    unsaved_to_remove.emplace_back(f.second);
                }
            }
        }
    }

    // Remove unsaved files that are not needed
    for (auto& f : unsaved_to_remove) {
        _files[f->Priority()].erase(f->Sequence());
    }

    std::vector<_UnsavedEntry> to_save;

    auto now = std::chrono::steady_clock::now();
    auto min_age = now - std::chrono::milliseconds(save_delay);

    // Set min_age to now if closed
    if (_closed) {
        min_age = now;
    }

    // Get the list of buckets that can be saved, in the order they need to be saved.
    for (auto& p : _unsaved) {
        uint64_t last_seq = 0xFFFFFFFFFFFFFFFF;
        if (!p.empty()) {
            last_seq = p.rbegin()->first;
        }

        for (auto& f: p) {
            // If the entry is not the last or it is older than min_age then include in to_save
            if (f.first != last_seq || f.second._ts <= min_age) {
                to_save.emplace_back(f.second);
            }
        }
    }

    // Get cursors to save
    std::vector<QueueCursorFile> cursors_to_save;
    std::vector<QueueCursorFile> cursors_to_remove;
    if (have_saved_data || !to_save.empty()) {
        // Only save the cursors if there are files saved to disk
        for (auto &e : _cursors) {
            if (e.second->_need_save || !(e.second->_saved)) {
                cursors_to_save.emplace_back(e.second->_path, e.second->_committed);
                e.second->_need_save = false;
                e.second->_saved = true;
            }
        }
    } else {
        // Remove cursor files if there is no data saved to disk
        for (auto &e : _cursors) {
            if (e.second->_saved) {
                cursors_to_remove.emplace_back(e.second->_path);
                e.second->_saved = false;
            }
        }
    }

    // Unlock before doing IO
    lock.unlock();

    std::vector<std::shared_ptr<QueueFile>> removed;
    std::vector<std::shared_ptr<QueueFile>> saved;

    // Remove files that are not needed
    for (auto& f : to_remove) {
        if (f->Remove()) {
            removed.emplace_back(f);
            bytes_saved -= f->FileSize();
        }
    }

    int ridx = 0;
    int sidx = 0;
    uint64_t bytes_removed = 0;
    uint64_t cannot_save_bytes = 0;
    bool save_failed = false;

    // Iterate through to_save
    // for each bucket to save, if the save would exceed the quote, remove from can_remove until below quota
    for (; sidx < to_save.size(); ++sidx) {
        auto& ue = to_save[sidx];
        if (bytes_saved + ue._file->FileSize() > fs_bytes_allowed) {
            // Loop through can_remove, but stop at first higher priority file.
            while (ridx < can_remove.size() && bytes_saved + ue._file->FileSize() > fs_bytes_allowed && can_remove[ridx]->Priority() >= ue._file->Priority()) {
                auto& remove_target = can_remove[ridx];
                if (remove_target->Remove()) {
                    removed.emplace_back(remove_target);
                    bytes_saved -= remove_target->FileSize();
                    bytes_removed += remove_target->FileSize();
                    ridx += 1;
                    _stats._priority_stats[remove_target->Priority()]._bytes_dropped += remove_target->DataSize();
                } else {
                    // Remove failed, do not proceed
                    save_failed = true;
                    break;
                }
            }
        }
        if (bytes_saved + ue._file->FileSize() > fs_bytes_allowed) {
            // Either remove failed, or non enough lower priority data could be removed to make room for this file.
            break;
        } else {
            if (ue._file->Save()) {
                saved.emplace_back(ue._file);
                bytes_saved += ue._file->FileSize();
                _stats._priority_stats[ue._file->Priority()]._bytes_written += ue._file->FileSize();
            } else {
                // Save failed, do not proceed
                break;
            }
        }
    }

    // Tally up unsaved bytes count
    for (; sidx < to_save.size(); ++sidx) {
        cannot_save_bytes += to_save[sidx]._file->FileSize();
    }

    // Save (or remove) cursors
    for (auto &cfile : cursors_to_remove) {
        cfile.Remove();
    }
    for (auto &cfile : cursors_to_save) {
        cfile.Write();
    }

    // Relock before removing items from _unsaved;
    lock.lock();

    // erase from _files and _unsaved the files that where removed.
    for (auto& f : removed) {
        _files[f->Priority()].erase(f->Sequence());
        _unsaved[f->Priority()].erase(f->Sequence());
    }

    // erase from _unsaved the files that where saved.
    for (auto& f : saved) {
        _unsaved[f->Priority()].erase(f->Sequence());
    }

    if (bytes_removed > 0) {
        Logger::Warn("PriorityQueue: Removed (%ld) bytes of unconsumed lower priority data to make room for new higher priority data", bytes_removed);
    }

    if (cannot_save_bytes > 0) {
        if (now - _last_save_warning > std::chrono::milliseconds(MIN_SAVE_WARNING_GAP_MS)) {
            _last_save_warning = now;
            if (save_failed) {
                Logger::Warn("PriorityQueue: Errors encountered while saving data, (%ld) bytes left unsaved", cannot_save_bytes);
            } else {
                Logger::Warn("PriorityQueue: File System quota (%ld) would be exceeded, (%ld) bytes left unsaved", fs_bytes_allowed, cannot_save_bytes);
            }
        }
    }

    return cannot_save_bytes == 0;
}