platform/consensus/recovery/recovery.cpp (434 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/recovery/recovery.h"
#include <fcntl.h>
#include <glog/logging.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <filesystem>
#include "common/utils/utils.h"
namespace resdb {
Recovery::Recovery(const ResDBConfig& config, CheckPoint* checkpoint,
SystemInfo* system_info, Storage* storage)
: config_(config),
checkpoint_(checkpoint),
system_info_(system_info),
storage_(storage) {
recovery_enabled_ = config_.GetConfigData().recovery_enabled();
file_path_ = config_.GetConfigData().recovery_path();
if (file_path_.empty()) {
file_path_ = "./wal_log/log";
}
base_file_path_ = file_path_;
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
recovery_enabled_ = false;
}
if (recovery_enabled_ == false) {
return;
}
buffer_size_ = config_.GetConfigData().recovery_buffer_size();
if (buffer_size_ == 0) {
buffer_size_ = 1024;
}
LOG(INFO) << "file path:" << file_path_
<< " dir:" << std::filesystem::path(file_path_).parent_path();
recovery_ckpt_time_s_ = config_.GetConfigData().recovery_ckpt_time_s();
if (recovery_ckpt_time_s_ == 0) {
recovery_ckpt_time_s_ = 60;
}
int ret =
mkdir(std::filesystem::path(file_path_).parent_path().c_str(), 0777);
if (ret) {
LOG(INFO) << "mkdir fail:" << ret << " error:" << strerror(errno);
}
fd_ = -1;
stop_ = false;
Init();
}
void Recovery::Init() {
GetLastFile();
SwitchFile(file_path_);
ckpt_thread_ = std::thread(&Recovery::UpdateStableCheckPoint, this);
}
Recovery::~Recovery() {
if (recovery_enabled_ == false) {
return;
}
Flush();
close(fd_);
stop_ = true;
if (ckpt_thread_.joinable()) {
ckpt_thread_.join();
}
}
int64_t Recovery::GetMaxSeq() { return max_seq_; }
int64_t Recovery::GetMinSeq() { return min_seq_; }
void Recovery::UpdateStableCheckPoint() {
if (checkpoint_ == nullptr) {
return;
}
while (!stop_) {
int64_t latest_ckpt = checkpoint_->GetStableCheckpoint();
LOG(ERROR) << "get stable ckpt:" << latest_ckpt;
if (last_ckpt_ == latest_ckpt) {
sleep(recovery_ckpt_time_s_);
continue;
}
last_ckpt_ = latest_ckpt;
FinishFile(latest_ckpt);
}
}
void Recovery::GetLastFile() {
std::string dir = std::filesystem::path(file_path_).parent_path();
last_ckpt_ = -1;
int m_time_s = 0;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
std::string dir = std::filesystem::path(entry.path()).parent_path();
std::string file_name = std::filesystem::path(entry.path()).stem();
std::string ext = std::filesystem::path(entry.path()).extension();
if (ext != ".log") continue;
int pos = file_name.rfind("_");
int64_t ckpt = std::stoll(file_name.substr(pos + 1));
int max_seq_pos = file_name.rfind("_", pos - 1);
int min_seq_pos = file_name.rfind("_", max_seq_pos - 1);
int time_pos = file_name.rfind("_", min_seq_pos - 1);
int64_t min_seq = std::stoll(
file_name.substr(min_seq_pos + 1, max_seq_pos - min_seq_pos - 1));
int64_t time_s =
std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
if (min_seq == -1) {
if (last_ckpt_ == -1 || m_time_s < time_s) {
file_path_ = entry.path();
last_ckpt_ = ckpt;
m_time_s = time_s;
LOG(ERROR) << "get last path:" << file_name << " min:" << min_seq;
}
}
}
if (last_ckpt_ == -1) {
last_ckpt_ = 0;
file_path_ = GenerateFile(last_ckpt_, -1, -1);
}
}
std::string Recovery::GenerateFile(int64_t seq, int64_t min_seq,
int64_t max_seq) {
std::string dir = std::filesystem::path(file_path_).parent_path();
std::string file_name = std::filesystem::path(base_file_path_).stem();
int64_t time = GetCurrentTime();
file_name = file_name + "_" + std::to_string(time) + "_" +
std::to_string(min_seq) + "_" + std::to_string(max_seq) + "_" +
std::to_string(seq);
std::string ext = std::filesystem::path(base_file_path_).extension();
if (ext == "") ext = "log";
return dir + "/" + file_name + "." + ext;
}
void Recovery::FinishFile(int64_t seq) {
std::unique_lock<std::mutex> lk(mutex_);
Flush();
if (storage_) {
if (!storage_->Flush()) {
return;
}
}
std::string new_file_path = GenerateFile(seq, min_seq_, max_seq_);
close(fd_);
min_seq_ = -1;
max_seq_ = -1;
std::rename(file_path_.c_str(), new_file_path.c_str());
LOG(INFO) << "rename:" << file_path_ << " to:" << new_file_path;
std::string next_file_path = GenerateFile(seq, -1, -1);
file_path_ = next_file_path;
OpenFile(file_path_);
}
void Recovery::SwitchFile(const std::string& file_path) {
std::unique_lock<std::mutex> lk(mutex_);
min_seq_ = -1;
max_seq_ = -1;
ReadLogsFromFiles(
file_path, 0, 0, [&](const SystemInfoData& data) {},
[&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
min_seq_ == -1
? min_seq_ = request->seq()
: std::min(min_seq_, static_cast<int64_t>(request->seq()));
max_seq_ = std::max(max_seq_, static_cast<int64_t>(request->seq()));
});
OpenFile(file_path);
LOG(INFO) << "switch to file:" << file_path << " seq:"
<< "[" << min_seq_ << "," << max_seq_ << "]";
}
void Recovery::OpenFile(const std::string& path) {
if (fd_ >= 0) {
close(fd_);
}
fd_ = open(path.c_str(), O_CREAT | O_WRONLY, 0666);
if (fd_ < 0) {
LOG(ERROR) << "open file fail:" << path << " error:" << strerror(errno);
}
int pos = lseek(fd_, 0, SEEK_END);
LOG(INFO) << "file path:" << path << " len:" << pos << " fd:" << fd_;
if (pos == 0) {
WriteSystemInfo();
}
lseek(fd_, 0, SEEK_END);
LOG(INFO) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR)
<< " fd:" << fd_;
assert(fd_ >= 0);
}
void Recovery::WriteSystemInfo() {
int view = system_info_->GetCurrentView();
int primary_id = system_info_->GetPrimaryId();
SystemInfoData data;
data.set_view(view);
data.set_primary_id(primary_id);
std::string data_str;
data.SerializeToString(&data_str);
AppendData(data_str);
Flush();
}
void Recovery::AddRequest(const Context* context, const Request* request) {
if (recovery_enabled_ == false) {
return;
}
switch (request->type()) {
case Request::TYPE_PRE_PREPARE:
case Request::TYPE_PREPARE:
case Request::TYPE_COMMIT:
case Request::TYPE_CHECKPOINT:
case Request::TYPE_NEWVIEW:
return WriteLog(context, request);
default:
break;
}
}
void Recovery::WriteLog(const Context* context, const Request* request) {
std::string data;
if (request) {
request->SerializeToString(&data);
}
std::string sig;
if (context) {
context->signature.SerializeToString(&sig);
}
std::unique_lock<std::mutex> lk(mutex_);
min_seq_ = min_seq_ == -1
? request->seq()
: std::min(min_seq_, static_cast<int64_t>(request->seq()));
max_seq_ = std::max(max_seq_, static_cast<int64_t>(request->seq()));
AppendData(data);
AppendData(sig);
Flush();
}
void Recovery::AppendData(const std::string& data) {
size_t len = data.size();
buffer_.append(reinterpret_cast<const char*>(&len), sizeof(len));
buffer_.append(data);
}
std::vector<std::unique_ptr<Recovery::RecoveryData>> Recovery::ParseData(
const std::string& data) {
std::vector<std::unique_ptr<RecoveryData>> request_list;
std::vector<std::string> data_list;
int pos = 0;
while (pos < data.size()) {
size_t len;
memcpy(&len, data.c_str() + pos, sizeof(len));
pos += sizeof(len);
std::string item = data.substr(pos, len);
pos += len;
data_list.push_back(item);
}
for (size_t i = 0; i < data_list.size(); i += 2) {
std::unique_ptr<RecoveryData> recovery_data =
std::make_unique<RecoveryData>();
recovery_data->request = std::make_unique<Request>();
recovery_data->context = std::make_unique<Context>();
if (!recovery_data->request->ParseFromString(data_list[i])) {
LOG(ERROR) << "Parse from data fail";
break;
}
if (!recovery_data->context->signature.ParseFromString(data_list[i + 1])) {
LOG(ERROR) << "Parse from data fail";
break;
}
request_list.push_back(std::move(recovery_data));
}
return request_list;
}
std::vector<std::string> Recovery::ParseRawData(const std::string& data) {
std::vector<std::string> data_list;
int pos = 0;
while (pos < data.size()) {
size_t len;
memcpy(&len, data.c_str() + pos, sizeof(len));
pos += sizeof(len);
std::string item = data.substr(pos, len);
pos += len;
data_list.push_back(item);
}
return data_list;
}
void Recovery::MayFlush() {
if (buffer_.size() > buffer_size_) {
Flush();
}
}
void Recovery::Flush() {
size_t len = buffer_.size();
if (len == 0) {
return;
}
Write(reinterpret_cast<const char*>(&len), sizeof(len));
Write(reinterpret_cast<const char*>(buffer_.c_str()), len);
buffer_.clear();
fsync(fd_);
}
void Recovery::Write(const char* data, size_t len) {
int pos = 0;
while (len > 0) {
int write_len = write(fd_, data + pos, len);
len -= write_len;
pos += write_len;
}
}
bool Recovery::Read(int fd, size_t len, char* data) {
int pos = 0;
while (len > 0) {
int read_len = read(fd, data + pos, len);
if (read_len <= 0) {
return false;
}
len -= read_len;
pos += read_len;
}
return true;
}
std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
Recovery::GetRecoveryFiles() {
std::string dir = std::filesystem::path(file_path_).parent_path();
int64_t last_ckpt = 0;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
std::string dir = std::filesystem::path(entry.path()).parent_path();
std::string file_name = std::filesystem::path(entry.path()).stem();
std::string ext = std::filesystem::path(entry.path()).extension();
if (ext != ".log") continue;
int pos = file_name.rfind("_");
int64_t ckpt = std::stoll(file_name.substr(pos + 1));
if (ckpt > last_ckpt) {
last_ckpt = ckpt;
}
}
std::vector<std::pair<int64_t, std::string>> list;
std::vector<std::pair<int64_t, std::string>> e_list;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
std::string dir = std::filesystem::path(entry.path()).parent_path();
std::string file_name = std::filesystem::path(entry.path()).stem();
std::string ext = std::filesystem::path(entry.path()).extension();
if (ext != ".log") continue;
int pos = file_name.rfind("_");
int max_seq_pos = file_name.rfind("_", pos - 1);
int64_t max_seq =
std::stoll(file_name.substr(max_seq_pos + 1, pos - max_seq_pos - 1));
int min_seq_pos = file_name.rfind("_", max_seq_pos - 1);
int64_t min_seq = std::stoll(
file_name.substr(min_seq_pos + 1, max_seq_pos - min_seq_pos - 1));
int time_pos = file_name.rfind("_", min_seq_pos - 1);
int64_t time =
std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
if (min_seq == -1) {
e_list.push_back(std::make_pair(time, entry.path()));
} else if ((min_seq <= last_ckpt && max_seq >= last_ckpt)) {
list.push_back(std::make_pair(time, entry.path()));
}
}
sort(e_list.begin(), e_list.end());
list.push_back(e_list.back());
sort(list.begin(), list.end());
return std::make_pair(list, last_ckpt);
}
void Recovery::ReadLogs(
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
if (recovery_enabled_ == false) {
return;
}
std::unique_lock<std::mutex> lk(mutex_);
auto recovery_files_pair = GetRecoveryFiles();
int64_t ckpt = recovery_files_pair.second;
int idx = 0;
for (auto path : recovery_files_pair.first) {
ReadLogsFromFiles(path.second, ckpt, idx++, system_callback, call_back);
}
}
void Recovery::ReadLogsFromFiles(
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
int fd = open(path.c_str(), O_CREAT | O_RDONLY, 0666);
if (fd < 0) {
LOG(ERROR) << " open file fail:" << path;
}
LOG(INFO) << "read logs:" << path << " pos:" << lseek(fd, 0, SEEK_CUR);
assert(fd >= 0);
size_t data_len = 0;
Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len));
{
std::string data;
char* buf = new char[data_len];
if (!Read(fd, data_len, buf)) {
LOG(ERROR) << "Read system info fail";
return;
}
data = std::string(buf, data_len);
delete buf;
std::vector<std::string> data_list = ParseRawData(data);
SystemInfoData info;
if (data_list.empty() || !info.ParseFromString(data_list[0])) {
LOG(ERROR) << "parse info fail:" << data.size();
return;
}
LOG(INFO) << "read system info:" << info.DebugString();
if (file_idx == 0) {
system_callback(info);
}
}
std::vector<std::unique_ptr<RecoveryData>> request_list;
while (Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len))) {
std::string data;
char* buf = new char[data_len];
if (!Read(fd, data_len, buf)) {
LOG(ERROR) << "Read data log fail";
break;
}
data = std::string(buf, data_len);
delete buf;
std::vector<std::unique_ptr<RecoveryData>> list = ParseData(data);
if (list.size() == 0) {
request_list.clear();
break;
}
for (auto& l : list) {
request_list.push_back(std::move(l));
}
}
if (request_list.size() == 0) {
ftruncate(fd, 0);
}
uint64_t max_seq = 0;
for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
if (ckpt < recovery_data->request->seq()) {
recovery_data->request->set_is_recovery(true);
max_seq = recovery_data->request->seq();
call_back(std::move(recovery_data->context),
std::move(recovery_data->request));
}
}
LOG(ERROR) << "read log from files:" << path << " done"
<< " recovery max seq:" << max_seq;
close(fd);
}
} // namespace resdb