platform/consensus/execution/transaction_executor.h (94 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <functional>
#include <thread>
#include "executor/common/transaction_manager.h"
#include "platform/common/queue/lock_free_queue.h"
#include "platform/config/resdb_config.h"
#include "platform/consensus/execution/duplicate_manager.h"
#include "platform/consensus/execution/system_info.h"
#include "platform/proto/resdb.pb.h"
#include "platform/statistic/stats.h"
namespace resdb {
// Execute the requests that may contain system information or user requests.
class TransactionExecutor {
public:
typedef std::function<void(std::unique_ptr<Request>,
std::unique_ptr<BatchUserResponse> resp)>
PostExecuteFunc;
typedef std::function<void(Request*)> PreExecuteFunc;
typedef std::function<void(uint64_t seq)> SeqUpdateNotifyFunc;
TransactionExecutor(const ResDBConfig& config, PostExecuteFunc post_exec_func,
SystemInfo* system_info,
std::unique_ptr<TransactionManager> transaction_manager);
~TransactionExecutor();
void Stop();
bool NeedResponse();
int Commit(std::unique_ptr<Request> request);
// The max seq S that can be executed (have received all the seq before S).
uint64_t GetMaxPendingExecutedSeq();
// When a transaction is ready to be executed (have received all the seq
// before Txn) PreExecute func will be called.
void SetPreExecuteFunc(PreExecuteFunc func);
void SetSeqUpdateNotifyFunc(SeqUpdateNotifyFunc func);
void SetDuplicateManager(DuplicateManager* manager);
void AddExecuteMessage(std::unique_ptr<Request> message);
Storage* GetStorage();
void RegisterExecute(int64_t seq);
void WaitForExecute(int64_t seq);
void FinishExecute(int64_t seq);
void Prepare(std::unique_ptr<Request> request);
private:
void Execute(std::unique_ptr<Request> request, bool need_execute = true);
void OnlyExecute(std::unique_ptr<Request> request);
std::unique_ptr<std::string> DoExecute(const Request& request);
void OrderMessage();
void ExecuteMessage();
void ExecuteMessageOutOfOrder();
void AddNewData(std::unique_ptr<Request> message);
std::unique_ptr<Request> GetNextData();
bool IsStop();
void UpdateMaxExecutedSeq(uint64_t seq);
bool SetFlag(uint64_t uid, int f);
void ClearPromise(uint64_t uid);
void PrepareMessage();
bool AddFuture(uint64_t uid);
std::unique_ptr<std::future<int>> GetFuture(uint64_t uid);
std::promise<int>* GetPromise(uint64_t uid);
protected:
ResDBConfig config_;
private:
std::atomic<uint64_t> next_execute_seq_ = 1;
PreExecuteFunc pre_exec_func_ = nullptr;
SeqUpdateNotifyFunc seq_update_notify_func_ = nullptr;
PostExecuteFunc post_exec_func_ = nullptr;
SystemInfo* system_info_ = nullptr;
std::unique_ptr<TransactionManager> transaction_manager_ = nullptr;
std::map<uint64_t, std::unique_ptr<Request>> candidates_;
std::thread ordering_thread_, execute_OOO_thread_;
std::vector<std::thread> execute_thread_;
LockFreeQueue<Request> commit_queue_, execute_queue_, execute_OOO_queue_;
std::atomic<bool> stop_;
Stats* global_stats_ = nullptr;
DuplicateManager* duplicate_manager_;
int execute_thread_num_ = 10;
static const int blucket_num_ = 1024;
int blucket_[blucket_num_];
std::condition_variable cv_;
std::mutex mutex_;
enum PrepareType {
Start_Prepare = 1,
Start_Execute = 2,
End_Prepare = 4,
};
std::vector<std::thread> prepare_thread_;
static const int mod = 2048;
std::mutex f_mutex_[mod], fd_mutex_[mod];
LockFreeQueue<Request> prepare_queue_;
LockFreeQueue<int64_t> gc_queue_;
typedef std::unique_ptr<std::promise<int>> PromiseType;
std::map<uint64_t, PromiseType> pre_[mod];
std::map<uint64_t, std::unique_ptr<std::future<int>>> pre_f_[mod];
std::map<uint64_t, int> flag_[mod];
std::map<uint64_t, std::unique_ptr<BatchUserRequest>> req_[mod];
std::unordered_map<
uint64_t,
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>>
data_[mod];
};
} // namespace resdb