mcrouter/ProxyDestination.h (91 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <array> #include <memory> #include <string> #include <folly/Range.h> #include <folly/SpinLock.h> #include "mcrouter/ProxyDestinationBase.h" #include "mcrouter/TkoLog.h" #include "mcrouter/config.h" #include "mcrouter/lib/Reply.h" #include "mcrouter/lib/mc/msg.h" #include "mcrouter/lib/network/Transport.h" namespace facebook { namespace memcache { struct AccessPoint; struct RpcStatsContext; namespace mcrouter { class ProxyBase; class ProxyDestinationMap; class TkoTracker; struct DestinationRequestCtx { int64_t startTime{0}; int64_t endTime{0}; explicit DestinationRequestCtx(int64_t now) : startTime(now) {} }; template <class Transport> class ProxyDestination : public ProxyDestinationBase { public: using ConnectionStatusCallbacks = typename Transport::ConnectionStatusCallbacks; using RequestStatusCallbacks = typename Transport::RequestStatusCallbacks; using AuthorizationCallbacks = typename Transport::AuthorizationCallbacks; using RequestQueueStats = typename Transport::RequestQueueStats; ~ProxyDestination(); /** * Sends a request to this destination. * NOTE: This is a blocking call that will return reply, once it's ready. * * @param request The request to send. * @param requestContext Context about this request. * @param timeout The timeout of this call. * @param rpcStatsContext Output argument with stats about the RPC */ template <class Request> ReplyT<Request> send( const Request& request, DestinationRequestCtx& requestContext, std::chrono::milliseconds timeout, RpcStatsContext& rpcStatsContext); void resetInactive() override final; /** * Gracefully closes the connection, allowing it to properly drain if * possible. */ void closeGracefully(); RequestQueueStats getRequestStats() const override final; protected: void updateTransportTimeoutsIfShorter( std::chrono::milliseconds shortestConnectTimeout, std::chrono::milliseconds shortestWriteTimeout) override final; carbon::Result sendProbe() override final; std::weak_ptr<ProxyDestinationBase> selfPtr() override final { return selfPtr_; } std::shared_ptr<const AccessPoint> replaceAP( std::shared_ptr<const AccessPoint> newAccessPoint) { auto ret = ProxyDestinationBase::replaceAP(newAccessPoint); closeGracefully(); return ret; } private: std::unique_ptr<Transport, typename Transport::Destructor> transport_; // Ensure proxy thread doesn't reset the Transport // while config and stats threads may be accessing it mutable folly::SpinLock transportLock_; // Retransmits control information uint64_t lastRetransCycles_{0}; // Cycles when restransmits were last fetched uint64_t rxmitsToCloseConnection_{0}; uint64_t lastConnCloseCycles_{0}; // Cycles when connection was last closed /** * Creates a new ProxyDestination. * * @throws std::logic_error If Transport is not compatible with * AccessPoint::getProtocol(). */ static std::shared_ptr<ProxyDestination> create( ProxyBase& proxy, std::shared_ptr<AccessPoint> ap, std::chrono::milliseconds timeout, uint32_t qosClass, uint32_t qosPath); Transport& getTransport(); void initializeTransport(); ProxyDestination( ProxyBase& proxy, std::shared_ptr<AccessPoint> ap, std::chrono::milliseconds timeout, uint32_t qosClass, uint32_t qosPath); // Process tko, stats and duration timer. void onReply( const carbon::Result result, DestinationRequestCtx& destreqCtx, const RpcStatsContext& rpcStatsContext, bool isRequestBufferDirty); void handleRxmittingConnection(const carbon::Result result, uint64_t latency); bool latencyAboveThreshold(uint64_t latency); std::weak_ptr<ProxyDestination> selfPtr_; friend class ProxyDestinationMap; }; } // namespace mcrouter } // namespace memcache } // namespace facebook #include "ProxyDestination-inl.h"