core/checkpoint/CheckpointManagerV2.cpp (426 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "CheckpointManagerV2.h"
#include "leveldb/write_batch.h"
#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"
#include "common/Flags.h"
#include "common/ScopeInvoker.h"
#include "common/TimeUtil.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
DEFINE_FLAG_INT32(logtail_checkpoint_check_gc_interval_sec, "60 seconds", 60);
DEFINE_FLAG_INT32(logtail_checkpoint_gc_threshold_sec, "30 minutes", 30 * 60);
DEFINE_FLAG_DOUBLE(logtail_checkpoint_max_gc_count_ratio_per_round, "10%", 0.1);
DEFINE_FLAG_INT64(logtail_checkpoint_max_used_time_per_round_in_msec, "500ms", 500);
DEFINE_FLAG_INT32(logtail_checkpoint_expired_threshold_sec, "6 hours", 6 * 60 * 60);
DECLARE_FLAG_INT32(max_exactly_once_concurrency);
namespace logtail {
namespace detail {
std::string getDatabasePath() {
return GetExactlyOnceCheckpoint();
}
// Log error locally and send alarm.
void logDatabaseError(const std::string& op, const std::string& key, const leveldb::Status& s) {
const std::string title = "error when access checkpoint database";
std::string msg;
msg.append("op:").append(op).append(", key:").append(key).append(", status:").append(s.ToString());
LOG_ERROR(sLogger, (title, msg));
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_V2_ALARM, title + ", " + msg);
}
// Range key is represented by data pointer and size to avoid copy.
//
// @return empty if failed to extract.
std::string extractPrimaryKeyFromRangeKey(const char* data, size_t size) {
const int32_t kExepectedCount = 2;
int32_t underlineCount = 0;
for (; size > 0; --size) {
if (data[size - 1] != '_') {
continue;
}
if (++underlineCount == kExepectedCount) {
break;
}
}
if (size > 2) {
return std::string(data, size - 1);
}
return "";
}
void makeRangeKey(std::string& key, uint32_t idx) {
key.append("_").append(std::to_string(idx)).append("_r");
}
bool isRangeKey(const char* data, size_t len) {
return len > 0 && data[len - 1] == 'r';
}
} // namespace detail
#define ASSERT_LEVELDB_STATUS \
do { \
if (nullptr == mDatabase) { \
return false; \
} \
} while (0)
std::string CheckpointManagerV2::MakeRangeKey(const std::string& primaryKey, uint32_t idx) {
std::string key = primaryKey;
detail::makeRangeKey(key, idx);
return key;
}
CheckpointManagerV2::CheckpointManagerV2() {
mDefaultWriteOption.sync = AppConfig::GetInstance()->EnableCheckpointSyncWrite();
if (open()) {
mGCThreadPtr.reset(new std::thread([&]() { runGCLoop(); }));
}
}
CheckpointManagerV2::~CheckpointManagerV2() {
mStopGCThread = true;
if (mGCThreadPtr) {
mGCThreadPtr->join();
mGCThreadPtr.reset();
}
close();
}
void CheckpointManagerV2::AppendRangeKeys(const std::string& primaryKey,
uint32_t rangeCptCount,
std::vector<std::string>& keys) {
std::string rangeKey = primaryKey;
for (uint32_t idx = 0; idx < rangeCptCount; ++idx) {
rangeKey.resize(primaryKey.length());
detail::makeRangeKey(rangeKey, idx);
keys.push_back(rangeKey);
}
}
void CheckpointManagerV2::appendCheckpointKeys(const std::string& primaryKey,
uint32_t rgCptCount,
std::vector<std::string>& keys) {
keys.push_back(primaryKey);
AppendRangeKeys(primaryKey, rgCptCount, keys);
}
int64_t CheckpointManagerV2::scanCheckpoints(const std::vector<std::string>& exactlyOnceConfigs,
std::vector<std::pair<std::string, PrimaryCheckpointPB>>* checkpoints,
std::vector<std::string>& shouldDeleteCptKeys,
uint64_t limitScanTimeInMs) {
bool doFullScan = !exactlyOnceConfigs.empty();
if (doFullScan) {
limitScanTimeInMs = 0;
}
shouldDeleteCptKeys.clear();
std::set<std::string> configNameSet;
for (auto& cfg : exactlyOnceConfigs) {
configNameSet.insert(cfg);
}
leveldb::ReadOptions options;
options.snapshot = mDatabase->GetSnapshot();
auto iter = mDatabase->NewIterator(options);
ScopeInvoker invoker([&]() {
delete iter;
mDatabase->ReleaseSnapshot(options.snapshot);
});
static std::string sLastScannedKey;
auto initIterator = [&]() {
if (doFullScan || sLastScannedKey.empty()) {
iter->SeekToFirst();
} else {
iter->Seek(sLastScannedKey);
if (!iter->Valid()) {
iter->SeekToFirst();
}
}
};
auto const scanStartTime = GetCurrentTimeInMilliSeconds();
int64_t scannedCount = 0;
auto canAccessIterator = [&]() {
if (doFullScan || limitScanTimeInMs == 0) {
return iter->Valid();
}
auto r = (GetCurrentTimeInMilliSeconds() - scanStartTime > limitScanTimeInMs) ? false : iter->Valid();
if (!r) {
LOG_DEBUG(sLogger, ("scanned count", scannedCount));
}
return r;
};
auto forwardIterator = [&]() {
if (!doFullScan) {
sLastScannedKey.assign(iter->key().data(), iter->key().size());
}
++scannedCount;
iter->Next();
};
std::unordered_set<std::string> missingPrimaryKeysCache;
for (initIterator(); canAccessIterator(); forwardIterator()) {
const auto& key = iter->key();
// Range key: check if the primary key still alives, if not, delete.
if (detail::isRangeKey(key.data(), key.size())) {
auto primaryKey = detail::extractPrimaryKeyFromRangeKey(key.data(), key.size());
if (primaryKey.empty()) {
LOG_ERROR(sLogger, ("invalid range checkpoint, delete", key.ToString()));
shouldDeleteCptKeys.push_back(key.ToString());
continue;
}
bool missing = false;
auto iter = missingPrimaryKeysCache.find(primaryKey);
std::string ignoreVal;
if (iter != missingPrimaryKeysCache.end()) {
missing = true;
} else if (!readDatabase(primaryKey, ignoreVal)) {
const size_t kMissingCacheSize = 100;
missing = true;
if (missingPrimaryKeysCache.size() >= kMissingCacheSize) {
missingPrimaryKeysCache.erase(missingPrimaryKeysCache.begin());
}
missingPrimaryKeysCache.insert(primaryKey);
}
if (missing) {
LOG_ERROR(sLogger, ("primary key is missing, delete", key.ToString()));
shouldDeleteCptKeys.push_back(key.ToString());
}
continue;
}
// Read primary checkpoint and validate it.
PrimaryCheckpointPB cpt;
if (!cpt.ParseFromArray(iter->value().data(), iter->value().size())) {
LOG_ERROR(sLogger, ("parse primary checkpoint error, delete", key.ToString()));
appendCheckpointKeys(key.ToString(), INT32_FLAG(max_exactly_once_concurrency), shouldDeleteCptKeys);
continue;
}
// Only full scan should validate config and v1 checkpoint.
if (doFullScan) {
if (configNameSet.find(cpt.config_name()) == configNameSet.end()) {
LOG_INFO(sLogger, ("config name not found, delete", key.ToString())("checkpoint", cpt.DebugString()));
appendCheckpointKeys(key.ToString(), cpt.concurrency(), shouldDeleteCptKeys);
continue;
}
static auto sV1CptM = CheckPointManager::Instance();
{
CheckPointPtr v1Cpt;
if (sV1CptM->GetCheckPoint(DevInode(cpt.dev(), cpt.inode()), cpt.config_name(), v1Cpt)) {
LOG_DEBUG(sLogger, ("existing v1 checkpoint, skip", key.ToString()));
continue;
}
}
}
// Validate range checkpoints to confirm if the checkpoint should load.
// When primary checkpoint and all of its range checkpoint expire, delete them.
auto curTime = time(NULL);
if (curTime - cpt.update_time() >= INT32_FLAG(logtail_checkpoint_expired_threshold_sec)) {
int32_t aliveRangeCptCount = 0;
std::vector<std::string> rangeCptKeys;
AppendRangeKeys(key.ToString(), cpt.concurrency(), rangeCptKeys);
RangeCheckpointPB rgCpt;
for (auto& rangeKey : rangeCptKeys) {
if (!GetPB(rangeKey, rgCpt)) {
continue;
}
if (curTime - rgCpt.update_time() >= INT32_FLAG(logtail_checkpoint_expired_threshold_sec)) {
LOG_DEBUG(sLogger, ("range checkpoint expired", rangeKey)("update time", rgCpt.update_time()));
continue;
}
++aliveRangeCptCount;
}
if (0 == aliveRangeCptCount) {
LOG_INFO(sLogger, ("no more alive range checkpoint, delete", key.ToString()));
appendCheckpointKeys(key.ToString(), cpt.concurrency(), shouldDeleteCptKeys);
continue;
}
}
// Valid primary checkpoint.
if (checkpoints) {
checkpoints->resize(checkpoints->size() + 1);
checkpoints->back().first.assign(key.data(), key.size());
checkpoints->back().second.Swap(&cpt);
}
}
return GetCurrentTimeInMilliSeconds() - scanStartTime;
}
std::vector<std::pair<std::string, PrimaryCheckpointPB>>
CheckpointManagerV2::ScanCheckpoints(const std::vector<std::string>& exactlyOnceConfigs) {
std::vector<std::pair<std::string, PrimaryCheckpointPB>> checkpoints;
LOG_INFO(sLogger,
("begin to scan checkpoints for exactly once configs", "")("config count", exactlyOnceConfigs.size()));
if (nullptr == mDatabase) {
LOG_ERROR(sLogger, ("scan exacly once checkpoints error", "uninitialized"));
return checkpoints;
}
std::vector<std::string> toDeleteKeys;
auto scanUsedTimeInMs = scanCheckpoints(exactlyOnceConfigs, &checkpoints, toDeleteKeys);
auto deleteUsedTimeInMs = DeleteCheckpoints(toDeleteKeys);
LOG_INFO(sLogger,
("finish scanning checkpoints for exactly once configs", "")("checkpoint count", checkpoints.size())(
"scan used time", scanUsedTimeInMs)("delete count", toDeleteKeys.size())("delete used time",
deleteUsedTimeInMs));
return checkpoints;
}
uint64_t CheckpointManagerV2::DeleteCheckpoints(const std::vector<std::string>& keys) {
if (keys.empty()) {
return 0;
}
auto const startTimeInMs = GetCurrentTimeInMilliSeconds();
leveldb::WriteBatch batch;
for (auto& k : keys) {
batch.Delete(k);
}
auto status = mDatabase->Write(mDefaultWriteOption, &batch);
auto const usedTimeInMs = GetCurrentTimeInMilliSeconds() - startTimeInMs;
if (status.ok()) {
LOG_DEBUG(sLogger, ("delete checkpoints, count", keys.size()));
} else {
detail::logDatabaseError("batch_delete", std::to_string(keys.size()), status);
}
return usedTimeInMs;
}
uint64_t CheckpointManagerV2::UpdatePrimaryCheckpoints(
const std::vector<std::pair<std::string, PrimaryCheckpointPB>*>& checkpoints) {
#define METHOD_LOG_PATTERN ("method", "UpdatePrimaryCheckpoints")("count", checkpoints.size())
auto const startTimeInMs = GetCurrentTimeInMilliSeconds();
leveldb::WriteBatch batch;
for (auto& cptPair : checkpoints) {
auto& key = cptPair->first;
auto& cpt = cptPair->second;
std::string data;
if (!cpt.SerializeToString(&data)) {
LOG_ERROR(sLogger, METHOD_LOG_PATTERN("serialize error", key)("checkpoint", cpt.DebugString()));
continue;
}
batch.Put(key, data);
}
auto status = mDatabase->Write(mDefaultWriteOption, &batch);
if (status.ok()) {
return GetCurrentTimeInMilliSeconds() - startTimeInMs;
} else {
detail::logDatabaseError("batch_update", std::to_string(checkpoints.size()), status);
return 0;
}
#undef METHOD_LOG_PATTERN
}
uint64_t CheckpointManagerV2::DeletePrimaryCheckpoints(
const std::vector<std::pair<std::string, PrimaryCheckpointPB>*>& checkpoints) {
std::vector<std::string> keys;
for (auto& cptPair : checkpoints) {
appendCheckpointKeys(cptPair->first, cptPair->second.concurrency(), keys);
}
return DeleteCheckpoints(keys);
}
bool CheckpointManagerV2::open() {
const auto databasePath = detail::getDatabasePath();
#define METHOD_LOG_PATTERN ("path", databasePath)("method", "open")
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, databasePath, &mDatabase);
if (!status.ok()) {
detail::logDatabaseError("open", databasePath, status);
mDatabase = nullptr;
return false;
}
LOG_DEBUG(sLogger, METHOD_LOG_PATTERN("checkpoint database opened", ""));
return true;
#undef METHOD_LOG_PATTERN
}
bool CheckpointManagerV2::close() {
bool opened = mDatabase != nullptr;
if (opened) {
delete mDatabase;
mDatabase = nullptr;
}
return opened;
}
bool CheckpointManagerV2::readDatabase(const std::string& key, std::string& value) {
ASSERT_LEVELDB_STATUS;
leveldb::Status s = mDatabase->Get(leveldb::ReadOptions(), key, &value);
if (s.ok()) {
return true;
}
LOG_DEBUG(sLogger, ("not ok when read checkpoint", key)("status", s.ToString()));
if (!s.IsNotFound()) {
detail::logDatabaseError("read", key, s);
}
return false;
}
bool CheckpointManagerV2::read(const std::string& key, std::string& value) {
if (!readDatabase(key, value)) {
return false;
}
// Check if the primary key is marked, bring it back.
if (detail::isRangeKey(key.data(), key.size())) {
return true;
}
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mGCItems.find(key);
if (iter != mGCItems.end()) {
mGCItems.erase(iter);
LOG_DEBUG(sLogger, ("bring checkpoint back from GC", key));
}
return true;
}
bool CheckpointManagerV2::write(const std::string& key, const std::string& value) {
ASSERT_LEVELDB_STATUS;
leveldb::Status s = mDatabase->Put(mDefaultWriteOption, key, value);
if (s.ok()) {
return true;
}
detail::logDatabaseError("write", key, s);
return false;
}
void CheckpointManagerV2::MarkGC(const std::string& primaryKey) {
{
std::lock_guard<std::mutex> lock(mMutex);
if (mGCItems.find(primaryKey) != mGCItems.end()) {
return;
}
mGCItems[primaryKey] = time(NULL);
}
LOG_DEBUG(sLogger, ("checkpoint", "mark gc")("key", primaryKey));
}
void CheckpointManagerV2::checkGCItems() {
const auto curTime = time(NULL);
const auto startTimeInMs = GetCurrentTimeInMilliSeconds();
int32_t deletedCount = 0;
PrimaryCheckpointPB cpt;
std::vector<std::string> keys;
std::lock_guard<std::mutex> lock(mMutex);
const int32_t maxDeleteCount
= 1 + static_cast<int32_t>(mGCItems.size() * DOUBLE_FLAG(logtail_checkpoint_max_gc_count_ratio_per_round));
auto iter = mGCItems.begin();
while (iter != mGCItems.end()) {
if (mStopGCThread
|| (GetCurrentTimeInMilliSeconds() - startTimeInMs
>= static_cast<uint64_t>(INT64_FLAG(logtail_checkpoint_max_used_time_per_round_in_msec)))
|| (deletedCount >= maxDeleteCount)) {
break;
}
auto& key = iter->first;
auto& createTime = iter->second;
if (!(curTime >= createTime && curTime - createTime >= INT32_FLAG(logtail_checkpoint_gc_threshold_sec))) {
++iter;
continue;
}
// GC primary checkpiont and corresponding range checkpoints.
do {
std::string value;
if (!readDatabase(key, value) || !cpt.ParseFromArray(value.data(), value.size())) {
LOG_ERROR(sLogger, ("read or parse error when GC checkpoint", key));
break;
}
keys.clear();
keys.reserve(1 + cpt.concurrency());
appendCheckpointKeys(key, cpt.concurrency(), keys);
DeleteCheckpoints(keys);
LOG_INFO(sLogger, ("GC checkpoint", key)("time", curTime - createTime)("checkpoint", cpt.DebugString()));
++deletedCount;
} while (0);
iter = mGCItems.erase(iter);
}
}
void CheckpointManagerV2::runGCLoop() {
if (nullptr == mDatabase) {
LOG_ERROR(sLogger, ("runGCLoop exit", "checkpoint database is closed"));
return;
}
while (!mStopGCThread) {
std::this_thread::sleep_for(std::chrono::seconds(INT32_FLAG(logtail_checkpoint_check_gc_interval_sec)));
checkGCItems();
std::vector<std::string> toDeleteCptKeys;
auto scanUsedTimeInMs = scanCheckpoints(std::vector<std::string>(), nullptr, toDeleteCptKeys, 100);
auto deleteUsedTimeInMs = DeleteCheckpoints(toDeleteCptKeys);
if (!toDeleteCptKeys.empty()) {
LOG_INFO(sLogger,
("delete checkpoints", toDeleteCptKeys.size())("scan used time", scanUsedTimeInMs)(
"delete used time", deleteUsedTimeInMs));
}
}
LOG_INFO(sLogger, ("runGCLoop exit", "done"));
}
#ifdef APSARA_UNIT_TEST_MAIN
void CheckpointManagerV2::rebuild() {
bool opened = close();
leveldb::DestroyDB(detail::getDatabasePath(), leveldb::Options());
if (opened) {
open();
}
}
#endif
} // namespace logtail