host/cxpslib/session.h (292 lines of code) (raw):

/// /// \file session.h /// /// \brief manages sessions (connections) with a requester /// #ifndef SESSION_H #define SESSION_H #include <string> #include <cstddef> #include <boost/asio.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/filesystem/path.hpp> #include "connection.h" #include "requesthandler.h" #include "serveroptions.h" #include "cxps.h" #include "Telemetry/cxpstelemetrylogger.h" /// \brief basic session for managing requests for a given connection class BasicSession { public: /// \brief pointer type typedef boost::shared_ptr<BasicSession> ptr; ///< session pointer type /// \brief constructor explicit BasicSession(boost::asio::io_service& ioService, ///< io servie to use ConnectionAbc* connection, ///< connection to use serverOptionsPtr serverOptions ///< server options from conf file ); /// \brief constructor /// /// used when creating a new session from the connect back session explicit BasicSession(boost::asio::io_service& ioService, ///< io servie to use ConnectionAbc::ptr connection, ///< connection to be used serverOptionsPtr serverOptions, ///< server options from conf file std::string const& sessionId, ///< session id to be used std::string const& snonce, ///< server nonce to be used boost::uint32_t reqId, ///< request id to be used std::string const& cnonce, ///< client nonce to be used std::string const& hostId ///< host id to be used ); /// \brief destructor virtual ~BasicSession() { cancelTimeout(); if (!m_requestTelemetryData.IsEmpty()) { if (!CxpsTelemetry::CxpsTelemetryLogger::GetInstance().AreServersStopping()) { BOOST_ASSERT(false); m_requestTelemetryData.SetRequestFailure(RequestFailure_SessionDestroyed); } else { // If the server is stopping, currently ioService.stop() is invoked. // This drops all the handlers as soon as possible without signalling errors to // the ongoing and pending requests. m_requestTelemetryData.SetRequestFailure(RequestFailure_ServerStopping); } m_requestTelemetryData.SessionLoggingOut(); } } /// \brief get a reference to the lowest layer socket /// /// \return reference to boost::asio::ip::tcp::socket::lowest_layer_type boost::asio::ip::tcp::socket::lowest_layer_type& lowestLayerSocket() { return m_connection->lowestLayerSocket(); } /// \brief start the session processing virtual void start() = 0; /// \brief start async read from socket virtual void startReading() { setPeerIpAddress(); // TODO-SanKumar-1710: m_requestTelemetryData is not updated as this is only used for CfsConnectBacks. asyncReadSome(initialReadSize()); } /// \brief gets session id /// /// \return std::string that holds the session id std::string sessionId() { return m_sessionId; } /// \brief checks if passed in file is open /// /// \return /// \li \c true : file is open /// \li \c false: file is not open std::string isFileOpen(boost::filesystem::path const& fileName, ///< name of file to check if opened bool forceClose, ///< file to check bool checkGetFile ///< check getfile if true ) { return m_requestHandler->isFileOpen(fileName, forceClose, checkGetFile); } /// \brief make sure all timeout timers are stopped void stopTimeouts() { m_requestHandler->cancelTimeout(); cancelTimeout(); } /// \brief checks if a request is currently queued for aysnc request /// /// \return bool true: request is queue, false: request not queued bool isQueued() { return m_requestHandler->isQueued(); } /// \brief gets the endpoints of the connection void getConnectionEndpoints(ConnectionEndpoints& connectionEndpoints) ///< filled in with connection endpoints { connection()->connectionEndpoints(connectionEndpoints); } /// \brief gets the native socket /// /// \return boost::asio::ip::tcp::socket::native_type set to the connects native socket boost::asio::ip::tcp::socket::native_handle_type nativeSocket() { return connection()->lowestLayerSocket().native_handle(); } /// \brief assigns the native socket to the connection void setNativeSocket(boost::asio::ip::tcp::socket::native_handle_type nativeSocket) ///< native socket to assign to connection { connection()->lowestLayerSocket().assign(boost::asio::ip::tcp::v4(), nativeSocket); } /// \brief logs out the session void sessionLogout() { m_requestHandler->sessionLogout(); } /// \brief gets the session id /// /// \returns std::string that has the session id std::string sesionId() { return m_sessionId; } /// \brief get if ssl being used or not virtual bool usingSsl() = 0; /// \brief gets the peer ip address /// /// \return std::string that has the peer ip address std::string peerIpAddress() { return connection()->peerIpAddress(); } /// \brief gets endpoint info as a string /// /// \return std::tsring that has the connection info formated as string std::string endpointInfoAsString() { return connection()->endpointInfoAsString(); } /// \brief sends cfs connect back request bool sendCfsConnectBack(std::string const& cfsSessionId, ///< cfs session id making the request bool secure) ///< indicates if the connect back should use secure true: secure, false: non secure { return m_requestHandler->sendCfsConnectBack(cfsSessionId, secure); } /// \brief sends cfs heartbeat to prevent cfs session from closing during long periods of inactivity bool sendCfsHeartbeat() { return m_requestHandler->sendCfsHeartbeat(); } const std::string& peerHostId() { return m_requestHandler->peerHostId(); } protected: CxpsTelemetry::RequestTelemetryData m_requestTelemetryData; ///< Detailed telemetry data of the current request /// \brief get a shared pointer to the session object virtual ptr sharedPtr() = 0; /// \brief perfrom an syncrhonous hand shake void asyncHandshake(); /// \brief process the results of asyncHandshake /// \param error result of async handshale void handleAsyncHandshake(boost::system::error_code const & error); /// \brief read some data from the connection asynchornously /// /// \param size buffer size to use for initial read of a request void asyncReadSome(int size = -1); /// \brief process data that was read by an async read void processReadData(); /// \brief process the results of asyncReadSome /// /// this is the mian place in the session where requests are handled void handleAsyncReadSome(boost::system::error_code const & error, ///< result of async read size_t bytesTransferred ///< bytes transferred by the async read ); /// \brief process the request void processRequest(char * buffer, ///< pointer to buffer holding data that still needs to be processed std::size_t bytesRemaining, ///< bytes in buffer left to be processed bool eof = false ///< indicates if an socket eof was reached true: yes, false: no ); /// \brief function that is called back when a request completes successfully /// /// resets things and then start another asyncReadSome to get the next request void handleRequestDone(); /// \brief set the timer to timeout after timeout duration void setTimeout(bool handshake); /// \brief handle a timeout /// /// \param error result of the async timeout wait void handleTimeout(std::string const& type, boost::system::error_code const& error); /// \brief handle asyncHndshake timeout /// /// \param error result of the async timeout wait void handleAsyncHandshakeTimeout(const boost::system::error_code& error) { handleTimeout(std::string("asyncHandShake"), error); } /// \brief handle a asyncReadSome timeout /// /// \param error result of the async timeout wait void handleAsyncReadSomeTimeout(const boost::system::error_code& error) { handleTimeout(std::string("asyncReadSome"), error); } /// \brief cancel a set timeout void cancelTimeout() { boost::system::error_code ec; m_timer.cancel(ec); } /// \brief logs the creation of a session to the monitor log void logCreateSession(char const* sessionType) { CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "CREATE " << sessionType << '\t' << "(sid: " << sessionId() << ")\t" << m_connection->endpointInfoAsString()); } /// \brief logs the start of a session to the monitor log void logStartSession(char const* sessionType) { m_sessionStarted = true; CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "START " << sessionType << '\t' << "(sid: " << sessionId() << ")\t" << "socket: " << nativeSocket() << '\t' << m_connection->endpointInfoAsString()); } /// \brief logs the end of a session to the monitor log void logEndSession(char const* sessionType) { CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "END " << sessionType << '\t' << "(sid: " << sessionId() << ")\t" << m_connection->endpointInfoAsString() << (m_sessionStarted ? "" : "\t(not started)")); } /// \brief send error reply void sendError(ResponseCode::Codes result, ///< error result char const* data, ///< points to buffer holding error data to be sent (must be at least length bytes) std::size_t length ///< length of data ); /// \brief log xfer failed void logXferFailedError(boost::system::error_code const & error) { std::string errStr = boost::lexical_cast<std::string>(error); logXferFailed(errStr.c_str()); } /// \brief log xfer failed void logXferFailed(char const* reason) { m_requestHandler->logXferFailed(reason); } /// \brief gets the intial read size int initialReadSize() { return m_serverOptions->initialReadSize(); } /// \brief set the connection's peer ip address for internal tracking void setPeerIpAddress() { m_connection->setPeerIpAddress(); } /// \brief gets the connection /// /// \return ConnectionAbc::ptr holding the connection object virtual ConnectionAbc::ptr connection() { return m_connection; } /// \brief gets the request handler /// /// \return RequestHandler::ptr holding the requestHandler RequestHandler::ptr requestHandler() { return m_requestHandler; } private: ConnectionAbc::ptr m_connection; ///< connection to be used HttpTraits::protocolHandler_t m_protocolHandler; ///< protocol handler to use RequestHandler::ptr m_requestHandler; ///< request handler to use HttpTraits::reply_t::ptr m_reply; ///< reply to use std::size_t m_readBufferBytesRemaining; ///< number of bytes remaining in the read buffer to be processed std::size_t m_readBufferTotalBytes; ///< total bytes in the read buffer std::vector<char> m_readBuffer; ///< for reading data from the socket bool m_eof; //< indicates socket eof reached true: eof, false: not eof boost::asio::deadline_timer m_timer; ///< for detecting timeouts std::string m_sessionId; ///< session id bool m_sessionStarted; ///< tracks if the session was every started (i.e. client conected) serverOptionsPtr m_serverOptions; ///< server options from conf file }; /// \brief for managing sessions over non ssl connections class Session : public BasicSession, public boost::enable_shared_from_this<Session> { public: /// \brief constructor explicit Session(boost::asio::io_service& ioService, ///< io servie to use serverOptionsPtr serverOptions ///< server options from conf file ); /// \brief constructor /// /// used when creating a new session from the connect back session explicit Session(boost::asio::io_service& ioService, ///< io servie to be used ConnectionAbc::ptr connection, ///< connection to be used serverOptionsPtr serverOptions, ///< server options from conf file std::string const& sessionId, ///< session id to be used std::string const& snonce, ///< server nonce to be used boost::uint32_t reqId, ///< reqId to be used std::string const& cnonce, ///< client nonce to be used std::string const& hostId ///< host id to be used ); /// \brief destructor virtual ~Session() { logEndSession("SESSION"); } /// \brief start the session processing virtual void start() { setPeerIpAddress(); logStartSession("SESSION"); m_requestTelemetryData.StartingNwReadForNewRequest(); asyncReadSome(initialReadSize()); } /// \brief get a shared pointer for this session virtual BasicSession::ptr sharedPtr() { return shared_from_this(); } /// \brief get if ssl being used or not virtual bool usingSsl() { return false; } protected: private: }; /// \brief for managing sessions over an ssl connection class SslSession : public BasicSession, public boost::enable_shared_from_this<SslSession> { public: /// \brief constructor explicit SslSession(boost::asio::io_service& ioService, ///< io servie to use serverOptionsPtr serverOptions ///< server options from conf file ); // FIXME: is this correct for ssl? /// \brief constructor /// /// used when creating a new session from the connect back session explicit SslSession(boost::asio::io_service& ioService, ///< io servie to be used ConnectionAbc::ptr connection, ///< connection to be used serverOptionsPtr serverOptions, ///< server options from conf file std::string const& sessionId, ///< session id to be used std::string const& snonce, ///< server nonce to be used boost::uint32_t reqId, ///< reqId to be used std::string const& cnonce, ///< client nonce to be used std::string const& hostId ///< host id to be used ); /// \brief destructor virtual ~SslSession() { logEndSession("SSL SESSION"); } /// \brief start the session processing virtual void start() { setPeerIpAddress(); logStartSession("SSL SESSION"); asyncHandshake(); } /// \brief get a shared pointer to this session virtual BasicSession::ptr sharedPtr() { return shared_from_this(); } /// \brief get if ssl being used or not virtual bool usingSsl() { return true; } protected: private: }; #endif // SESSION_H