platform/consensus/ordering/pbft/viewchange_manager.h (118 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #pragma once #include <semaphore.h> #include "common/crypto/signature_verifier.h" #include "platform/config/resdb_config.h" #include "platform/consensus/execution/system_info.h" #include "platform/consensus/ordering/pbft/checkpoint_manager.h" #include "platform/consensus/ordering/pbft/message_manager.h" #include "platform/networkstrate/replica_communicator.h" #include "platform/proto/viewchange_message.pb.h" #include "platform/statistic/stats.h" namespace resdb { enum ViewChangeTimerType { TYPE_COMPLAINT, TYPE_VIEWCHANGE, TYPE_NEWVIEW }; class ViewChangeTimeout { public: ViewChangeTimeout(ViewChangeTimerType type, uint64_t view, uint64_t proxy_id, std::string hash, uint64_t start_time, uint64_t timeout_length_) : type(type), view(view), proxy_id(proxy_id), hash(hash), start_time(start_time), timeout_time(start_time + timeout_length_) {} ViewChangeTimerType type; uint64_t view; uint64_t proxy_id; std::string hash; uint64_t start_time; uint64_t timeout_time; bool operator<(const ViewChangeTimeout& other) const { return timeout_time > other.timeout_time; } }; class ComplaningClients { public: ComplaningClients(); ComplaningClients(uint64_t proxy_id); std::shared_ptr<ViewChangeTimeout> SetComplaining(std::string hash, uint64_t view); void ReleaseComplaining(std::string hash); void set_proxy_id(uint64_t proxy_id) { this->proxy_id = proxy_id; } uint CountViewChangeTimeout(std::string hash); void EraseViewChangeTimeout(std::string hash); protected: uint64_t proxy_id; bool is_complaining; uint64_t timeout_length_; std::mutex complain_state_lock; std::set<std::string> viewchange_timeout_set; }; class ViewChangeManager { public: ViewChangeManager(const ResDBConfig& config, CheckPointManager* checkpoint_manager, MessageManager* message_manager, SystemInfo* system_info, ReplicaCommunicator* replica_communicator, SignatureVerifier* verifier); virtual ~ViewChangeManager(); int ProcessViewChange(std::unique_ptr<Context> context, std::unique_ptr<Request> request); int ProcessNewView(std::unique_ptr<Context> context, std::unique_ptr<Request> request); bool IsInViewChange(); // If the monitor is not running, start to monitor. void MayStart(); enum ViewChangeStatus { NONE = 0, READY_VIEW_CHANGE = 1, READY_NEW_VIEW = 2, VIEW_CHANGE_FAIL = 3, }; void AddComplaintTimer(uint64_t proxy_id, std::string hash); void AddViewChangeTimer(); void AddNewViewTimer(); void CheckComplaintTimeout(); void SetDuplicateManager(DuplicateManager* manager); private: void SendViewChangeMsg(); void SendNewViewMsg(uint64_t view_number); bool IsValidViewChangeMsg(const ViewChangeMessage& view_change_message); uint32_t AddRequest(const ViewChangeMessage& viewchange_message, uint32_t sender); bool IsNextPrimary(uint64_t view_number); void SetCurrentViewAndNewPrimary(uint64_t view_number); std::vector<std::unique_ptr<Request>> GetPrepareMsg( const NewViewMessage& new_view_message, bool need_sign = true); bool ChangeStatue(ViewChangeStatus status); void MonitoringViewChangeTimeOut(); bool CheckTimeOut(ViewChangeTimeout& info); void MonitoringCheckpointState(); protected: ResDBConfig config_; CheckPointManager* checkpoint_manager_; MessageManager* message_manager_; Stats* global_stats_; SystemInfo* system_info_; ReplicaCommunicator* replica_communicator_; SignatureVerifier* verifier_; std::thread monitor_thread_; std::map<uint64_t, std::map<uint32_t, ViewChangeMessage>> viewchange_request_; std::mutex mutex_, status_mutex_; bool new_view_is_sent_ = false; ViewChangeStatus status_; std::atomic<bool> started_; uint32_t view_change_counter_; std::mutex vc_mutex_; std::thread server_checking_timeout_thread_; std::thread checkpoint_state_thread_; sem_t timeout_cnt_; sem_t viewchange_timer_signal_; // LockFreeQueue<ViewChangeTimeout> timeout_info_queue; std::map<uint64_t, std::priority_queue<std::shared_ptr<ViewChangeTimeout>>> viewchange_timeout_min_heap_; std::map<uint64_t, ComplaningClients> complaining_clients_; std::atomic<bool> stop_; uint64_t timeout_length_ = 10000000; LockFreeCollectorPool* collector_pool_; DuplicateManager* duplicate_manager_; }; } // namespace resdb