platform/consensus/recovery/recovery.h (70 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <thread>
#include "chain/storage/storage.h"
#include "platform/config/resdb_config.h"
#include "platform/consensus/checkpoint/checkpoint.h"
#include "platform/consensus/execution/system_info.h"
#include "platform/networkstrate/server_comm.h"
#include "platform/proto/resdb.pb.h"
#include "platform/proto/system_info_data.pb.h"
namespace resdb {
class Recovery {
public:
Recovery(const ResDBConfig& config, CheckPoint* checkpoint,
SystemInfo* system_info, Storage* storage);
virtual ~Recovery();
void Init();
virtual void AddRequest(const Context* context, const Request* request);
void ReadLogs(std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back);
int64_t GetMaxSeq();
int64_t GetMinSeq();
private:
struct RecoveryData {
std::unique_ptr<Context> context;
std::unique_ptr<Request> request;
};
void WriteLog(const Context* context, const Request* request);
void AppendData(const std::string& data);
std::vector<std::unique_ptr<RecoveryData>> ParseData(const std::string& data);
std::vector<std::string> ParseRawData(const std::string& data);
void Flush();
void MayFlush();
void Write(const char* data, size_t len);
bool Read(int fd, size_t len, char* data);
std::string GenerateFile(int64_t seq, int64_t min_seq, int64_t max_seq);
void GetLastFile();
void WriteSystemInfo();
void OpenFile(const std::string& path);
void FinishFile(int64_t seq);
void SwitchFile(const std::string& path);
void UpdateStableCheckPoint();
std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
GetRecoveryFiles();
void ReadLogsFromFiles(
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
call_back);
protected:
ResDBConfig config_;
CheckPoint* checkpoint_;
std::thread ckpt_thread_;
bool recovery_enabled_ = false;
std::string buffer_;
std::string file_path_, base_file_path_;
size_t buffer_size_ = 0;
int fd_;
std::mutex mutex_;
int64_t last_ckpt_;
int64_t min_seq_, max_seq_;
std::mutex ckpt_mutex_;
std::atomic<bool> stop_;
int recovery_ckpt_time_s_;
SystemInfo* system_info_;
Storage* storage_;
};
} // namespace resdb