src/transport/TcpRemotingClient.h (88 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef __TCPREMOTINGCLIENT_H__ #define __TCPREMOTINGCLIENT_H__ #include <map> #include <mutex> #include <boost/asio.hpp> #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include "ClientRemotingProcessor.h" #include "RemotingCommand.h" #include "ResponseFuture.h" #include "SocketUtil.h" #include "TcpTransport.h" namespace rocketmq { //<!************************************************************************ class TcpRemotingClient { public: TcpRemotingClient(bool enableSsl, const string& sslPropertyFile); TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout, bool enableSsl, const string& sslPropertyFile); virtual ~TcpRemotingClient(); virtual void stopAllTcpTransportThread(); virtual void updateNameServerAddressList(const string& addrs); virtual bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000); // delete outsite; virtual RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000); virtual bool invokeAsync(const string& addr, RemotingCommand& request, std::shared_ptr<AsyncCallbackWrap> cbw, int64 timeoutMilliseconds, int maxRetrySendTimes = 1, int retrySendTimes = 1); virtual void invokeOneway(const string& addr, RemotingCommand& request); virtual void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor); private: static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr); void messageReceived(const MemoryBlock& mem, const string& addr); void ProcessData(const MemoryBlock& mem, const string& addr); void processRequestCommand(RemotingCommand* pCmd, const string& addr); void processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture); void handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque); std::shared_ptr<TcpTransport> GetTransport(const string& addr, bool needResponse); std::shared_ptr<TcpTransport> CreateTransport(const string& addr, bool needResponse); std::shared_ptr<TcpTransport> CreateNameServerTransport(bool needResponse); bool CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp); bool CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp); bool SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg); void addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture); std::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque); void addTimerCallback(boost::asio::deadline_timer* t, int opaque); void eraseTimerCallback(int opaque); void cancelTimerCallback(int opaque); void removeAllTimerCallback(); void boost_asio_work(); private: using RequestMap = map<int, ClientRemotingProcessor*>; using TcpMap = map<string, std::shared_ptr<TcpTransport>>; using ResMap = map<int, std::shared_ptr<ResponseFuture>>; using AsyncTimerMap = map<int, boost::asio::deadline_timer*>; RequestMap m_requestTable; TcpMap m_tcpTable; //<! addr->tcp; std::timed_mutex m_tcpTableLock; ResMap m_futureTable; //<! id->future; std::mutex m_futureTableLock; AsyncTimerMap m_asyncTimerTable; std::mutex m_asyncTimerTableLock; int m_dispatchThreadNum; int m_pullThreadNum; uint64_t m_tcpConnectTimeout; // ms uint64_t m_tcpTransportTryLockTimeout; // s bool m_enableSsl; std::string m_sslPropertyFile; //<! NameServer std::timed_mutex m_namesrvLock; vector<string> m_namesrvAddrList; string m_namesrvAddrChoosed; unsigned int m_namesrvIndex; boost::asio::io_service m_dispatchService; boost::asio::io_service::work m_dispatchServiceWork; boost::thread_group m_dispatchThreadPool; boost::asio::io_service m_handleService; boost::asio::io_service::work m_handleServiceWork; boost::thread_group m_handleThreadPool; boost::asio::io_service m_timerService; unique_ptr<boost::thread> m_timerServiceThread; }; //<!************************************************************************ } // namespace rocketmq #endif