in platform/consensus/recovery/recovery.cpp [453:528]
void Recovery::ReadLogsFromFiles(
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back) {
int fd = open(path.c_str(), O_CREAT | O_RDONLY, 0666);
if (fd < 0) {
LOG(ERROR) << " open file fail:" << path;
}
LOG(INFO) << "read logs:" << path << " pos:" << lseek(fd, 0, SEEK_CUR);
assert(fd >= 0);
size_t data_len = 0;
Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len));
{
std::string data;
char* buf = new char[data_len];
if (!Read(fd, data_len, buf)) {
LOG(ERROR) << "Read system info fail";
return;
}
data = std::string(buf, data_len);
delete buf;
std::vector<std::string> data_list = ParseRawData(data);
SystemInfoData info;
if (data_list.empty() || !info.ParseFromString(data_list[0])) {
LOG(ERROR) << "parse info fail:" << data.size();
return;
}
LOG(INFO) << "read system info:" << info.DebugString();
if (file_idx == 0) {
system_callback(info);
}
}
std::vector<std::unique_ptr<RecoveryData>> request_list;
while (Read(fd, sizeof(data_len), reinterpret_cast<char*>(&data_len))) {
std::string data;
char* buf = new char[data_len];
if (!Read(fd, data_len, buf)) {
LOG(ERROR) << "Read data log fail";
break;
}
data = std::string(buf, data_len);
delete buf;
std::vector<std::unique_ptr<RecoveryData>> list = ParseData(data);
if (list.size() == 0) {
request_list.clear();
break;
}
for (auto& l : list) {
request_list.push_back(std::move(l));
}
}
if (request_list.size() == 0) {
ftruncate(fd, 0);
}
uint64_t max_seq = 0;
for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
if (ckpt < recovery_data->request->seq()) {
recovery_data->request->set_is_recovery(true);
max_seq = recovery_data->request->seq();
call_back(std::move(recovery_data->context),
std::move(recovery_data->request));
}
}
LOG(ERROR) << "read log from files:" << path << " done"
<< " recovery max seq:" << max_seq;
close(fd);
}