host/cxpslib/cfsmanager.cpp (364 lines of code) (raw):

/// /// \file cfsmanager.cpp /// /// \brief /// #include <ctime> #include <boost/lexical_cast.hpp> #include "cfsmanager.h" #include "sessiontracker.h" #include "scopeguard.h" #include "csclient.h" #include "transportprotocols.h" CxpsCfsControlSession::CxpsCfsControlSession(std::string const& cxpsIpAddress, BasicSession::ptr session, serverOptionsPtr serverOptions) : m_ok(true), m_ready(false), m_ipAddress(cxpsIpAddress), m_session(session), m_serverOptions(serverOptions), m_quit(false) { } void CxpsCfsControlSession::start(bool delayCfsFwdConnect) { m_quit = false; m_workerThread.reset(new boost::thread(boost::bind(&CxpsCfsControlSession::worker, this, delayCfsFwdConnect))); waitUntilReady(); } void CxpsCfsControlSession::stop() { m_quit = true; m_workerCond.notify_one(); m_workerThread->join(); } void CxpsCfsControlSession::postFwdConnectRequest(bool secure, std::string const& cfsSessionId, CfsSession::ptr cfsSession) { { boost::mutex::scoped_lock guard(m_workerMutex); m_requests.push_back(Request(secure, cfsSessionId, cfsSession)); } m_workerCond.notify_one(); } void CxpsCfsControlSession::removeStaleCompleteFwdConnectRequests(bool ignoreRequestTime) { completeFwdConnectRequests_t::iterator iter(m_completeFwdConnectRequests.begin()); completeFwdConnectRequests_t::iterator iterEnd(m_completeFwdConnectRequests.end()); while (iter != iterEnd) { if (ignoreRequestTime || (std::time(0) - (*iter).second.m_requestTime > m_serverOptions->sessionTimeoutSeconds())) { CXPS_LOG_ERROR(CATCH_LOC << "SEND CFS CONNECT BACK TIMED OUT: " << "cfs session id: " << (*iter).first << ", session id: " << (*iter).second.m_sessionId << ", secure: " << (*iter).second.m_secure << ", request time: " << boost::posix_time::to_simple_string(boost::posix_time::from_time_t((*iter).second.m_requestTime)) << " utc"); m_completeFwdConnectRequests.erase(iter++); } else { ++iter; } } } bool CxpsCfsControlSession::completeFwdConnectRequest(std::string const& cfsSessionId, boost::asio::ip::tcp::socket::native_handle_type nativeSocket) { boost::mutex::scoped_lock guard(m_workerMutex); completeFwdConnectRequests_t::iterator findIter(m_completeFwdConnectRequests.find(cfsSessionId)); if (m_completeFwdConnectRequests.end() != findIter) { (*findIter).second.m_cfsSession->completeCfsConnectRequest(nativeSocket); m_completeFwdConnectRequests.erase(findIter); return true; } return false; } void CxpsCfsControlSession::fwdConnectRequestFailed(CfsFwdConnectInfo const& cfsFwdConnectInfo) { completeFwdConnectRequests_t::iterator findIter(m_completeFwdConnectRequests.find(cfsFwdConnectInfo.m_sessionId)); if (m_completeFwdConnectRequests.end() != findIter) { m_completeFwdConnectRequests.erase(findIter); } cfsFwdConnectInfo.m_cfsSession->completeCfsConnectRequestFailed(); } void CxpsCfsControlSession::worker(bool delayCfsFwdConnect) { try { int waitInterval = m_serverOptions->cfsGetWorkerIntervalSeconds(); boost::system_time waitUntilTime = boost::get_system_time() + boost::posix_time::seconds(waitInterval); boost::unique_lock<boost::mutex> lock(m_workerMutex); m_ready = true; while (!m_quit) { if (m_workerCond.timed_wait(lock, waitUntilTime)) { if (!m_quit) { while (!m_requests.empty()) { // NOTE: do not send replies for CFSFWDCONNECT requests, just process and be done Request request = m_requests.front(); if (REQUEST_CFSFWDCONNECT == request.m_request) { if (delayCfsFwdConnect) { boost::this_thread::sleep(boost::posix_time::seconds(1)); } if (m_session->sendCfsConnectBack(request.m_cfsFwdConnectInfo.m_sessionId, request.m_cfsFwdConnectInfo.m_secure)) { request.m_cfsFwdConnectInfo.m_requestTime = std::time(0); m_completeFwdConnectRequests.insert(std::make_pair(request.m_cfsFwdConnectInfo.m_sessionId, request.m_cfsFwdConnectInfo)); } else { fwdConnectRequestFailed(request.m_cfsFwdConnectInfo); m_ok = false; } m_requests.pop_front(); } } } } else if (!m_quit && 0 != m_session.get()) { if (!m_session->sendCfsHeartbeat()) { m_ok = false; } } removeStaleCompleteFwdConnectRequests(!m_ok); if (!m_ok) { g_sessionTracker->stopTracking(m_session->sessionId()); return; } waitUntilTime = boost::get_system_time() + boost::posix_time::seconds(waitInterval); } } catch (std::exception const& e) { CXPS_LOG_ERROR(CATCH_LOC << "failed: " << e.what()); } m_quit = true; } CfsManager::CfsManager(boost::asio::io_service& ioService, serverOptionsPtr serverOptions) : m_ioService(ioService), m_timer(ioService), m_serverOptions(serverOptions), m_quit(false) { } void CfsManager::run() { m_work.reset(new boost::asio::io_service::work(m_ioService)); setTimeout(1); // the very first time set timeout low so that it checks quickly, then use normal check threadPtr ioServiceThread(new boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService))); CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "CFS MANAGER WORKER THREAD STARTED: " << ioServiceThread->get_id()); CXPS_LOG_ERROR_INFO("CFS MANAGER WORKER THREAD STARTED: " << ioServiceThread->get_id()); m_monitorThread.reset(new boost::thread(boost::bind(&CfsManager::monitor, this))); ioServiceThread->join(); m_quit = true; m_monitorCond.notify_one(); m_monitorThread->join(); boost::mutex::scoped_lock guard(m_mutex); controlSessions_t::iterator iter(m_controlSessions.begin()); controlSessions_t::iterator iterEnd(m_controlSessions.end()); for (/*empty*/; iter != iterEnd; ++iter) { (*iter).second->stop(); } } void CfsManager::stop() { m_ioService.stop(); } bool CfsManager::addControlSession(std::string const& sessionId) { BasicSession::ptr session = g_sessionTracker->findBySessionId(sessionId); if (!session) { return false; } boost::mutex::scoped_lock guard(m_mutex); if (!m_quit) { controlSessions_t::iterator findIter = m_controlSessions.find(session->peerHostId()); if (m_controlSessions.end() != findIter) { (*findIter).second->stop(); m_controlSessions.erase(findIter); } CxpsCfsControlSession::ptr cxpsCfsControlSession(new CxpsCfsControlSession(session->peerIpAddress(), session, m_serverOptions)); cxpsCfsControlSession->start(m_serverOptions->delayCfsFwdConnect()); m_controlSessions.insert(std::make_pair(session->peerHostId(), cxpsCfsControlSession)); return true; } return false; } bool CfsManager::postFwdConnectRequest(std::string const& psId, bool secure, std::string const& cfsSessionId, CfsSession::ptr cfsSession) { boost::mutex::scoped_lock guard(m_mutex); if (!m_quit) { controlSessions_t::iterator findIter(m_controlSessions.find(psId)); if (m_controlSessions.end() == findIter) { CXPS_LOG_ERROR(AT_LOC << psId << " not found in controlSessions"); } else { if ((*findIter).second->ok()) { (*findIter).second->postFwdConnectRequest(secure, cfsSessionId, cfsSession); return true; } else { (*findIter).second->stop(); m_controlSessions.erase(findIter); } } } return false; } void CfsManager::completeFwdConnectRequest(std::string const& cfsSessionId, boost::asio::ip::tcp::socket::native_handle_type nativeSocket) { boost::mutex::scoped_lock guard(m_mutex); if (!m_quit) { controlSessions_t::iterator iter(m_controlSessions.begin()); controlSessions_t::iterator iterEnd(m_controlSessions.end()); for (/* emtpy */; iter != iterEnd; ++iter) { if ((*iter).second->completeFwdConnectRequest(cfsSessionId, nativeSocket)) { return; } } CXPS_LOG_ERROR(AT_LOC << cfsSessionId << " not found, unable to complete fwd connect request"); } } void CfsManager::getCfsConnectInfo() { CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_2, "CFS GET CONNECTION INFO STARTED"); CsClient client(m_serverOptions->csUseSecure(), m_serverOptions->csIpAddress(), m_serverOptions->csUseSecure() ? m_serverOptions->csSslPort() : m_serverOptions->csPort(), m_serverOptions->id(), m_serverOptions->password(), m_serverOptions->csCertFile().string()); CsError csError; cfsConnectInfos_t cfsConnectInfos; if (!client.getCfsConnectInfo(m_serverOptions->csUrl(), csError, cfsConnectInfos)) { CXPS_LOG_ERROR(AT_LOC << "Failed: reason - " << csError.reason << ", data: " << (csError.data.empty() ? "" : csError.data.c_str())); CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_2, "CFS GET CONNECTION INFO FAILED loading cached cfs connect info"); loadCachedCfsConnectInfo(cfsConnectInfos); } else { cacheCfsConnectInfo(cfsConnectInfos); CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_2, "CFS GET CONNECTION INFO DONE"); } boost::unique_lock<boost::shared_mutex> guard(m_cfsConnectInfosMutex); // exclusive lock needed to update the m_cfsConnectInfos m_cfsConnectInfos = cfsConnectInfos; } void CfsManager::cacheCfsConnectInfo(cfsConnectInfos_t& cfsConnectInfos) { std::string cfsCache = m_serverOptions->cfsCacheFile().string(); std::ofstream file(cfsCache.c_str()); if (!file.good()) { CXPS_LOG_ERROR(AT_LOC << "failed to open " << cfsCache << ": " << errno); return; } cfsConnectInfos_t::const_iterator iter(cfsConnectInfos.begin()); cfsConnectInfos_t::const_iterator iterEnd(cfsConnectInfos.end()); for (/* empty*/; iter != iterEnd; ++iter) { file << (*iter).second.m_id << ' ' << (*iter).second.m_ipAddress << ' ' << (*iter).second.m_port << ' ' << (*iter).second.m_sslPort << '\n'; } } void CfsManager::loadCachedCfsConnectInfo(cfsConnectInfos_t& cfsConnectInfos) { std::string cfsCache = m_serverOptions->cfsCacheFile().string(); std::ifstream file(cfsCache.c_str()); if (!file.good()) { CXPS_LOG_ERROR(AT_LOC << "failed to open " << cfsCache << ": " << errno); return; } while (!file.eof()) { CfsConnectInfo connectInfo; file >> connectInfo.m_id >> connectInfo.m_ipAddress >> connectInfo.m_port >> connectInfo.m_sslPort; if (!connectInfo.m_id.empty()) { cfsConnectInfos.insert(std::make_pair(connectInfo.m_id, connectInfo)); } } } bool CfsManager::cfsControlLogin(CfsConnectInfo const& cfsConnectInfo, std::string& sessionId) { CfsControlClient::ptr cfsControlClient(new CfsControlClient(cfsConnectInfo, m_ioService, m_serverOptions)); return cfsControlClient->login(sessionId); } void CfsManager::addCfsId(std::string const& cfsId, CfsConnectInfo const& cfsConnectInfo, std::string const& sessionId) { boost::mutex::scoped_lock guard(m_cfsIdsMutex); m_cfsIds.insert(std::make_pair(cfsId, CfsConnectedControlSession(time(0), cfsConnectInfo, sessionId))); } bool CfsManager::cfsControlSessionNeedsReconnect(CfsConnectedControlSession const& cfsConnectedControlSession, CfsConnectInfo const& cfsConnectInfo) { // require a reconnect if the connection information changed or // there has been no activity for more then timeout + 90 seconds // 90 seconds it just in case it is in the process of timing out return !(cfsConnectedControlSession.m_cfsConnectInfo == cfsConnectInfo) || (time(0) - cfsConnectedControlSession.m_lastActivity) > (m_serverOptions->sessionTimeoutSeconds() + 90); } void CfsManager::removeCfsId(std::string const& cfsId) { boost::mutex::scoped_lock guard(m_cfsIdsMutex); m_cfsIds.erase(cfsId); CfsConnectInfo cfsConnectInfo; cfsConnectInfo.m_id = cfsId; g_cfsManager->logRequest(MONITOR_LOG_LEVEL_2, "CFS CONTROL CONNECTION CLOSED", __FUNCTION__, cfsConnectInfo, m_serverOptions->cfsSecureLogin()); } void CfsManager::updateLastActivity(std::string const& id) { boost::mutex::scoped_lock guard(m_cfsIdsMutex); cfsIds_t::iterator findIter(m_cfsIds.find(id)); if (m_cfsIds.end() != findIter) { (*findIter).second.m_lastActivity = time(0); } } void CfsManager::sendCfsLoginAsNeeded() { if (!m_quit) { getCfsConnectInfo(); cfsConnectInfos_t::iterator iter(m_cfsConnectInfos.begin()); cfsConnectInfos_t::iterator iterEnd(m_cfsConnectInfos.end()); for (/* empty*/; iter != iterEnd; ++iter) { if (!(boost::algorithm::equals((*iter).second.m_id, "na") || boost::algorithm::equals((*iter).second.m_ipAddress, "na"))) { cfsIds_t::iterator findIter(m_cfsIds.find((*iter).first)); if (m_cfsIds.end() == findIter || cfsControlSessionNeedsReconnect((*findIter).second, (*iter).second)) { if (m_cfsIds.end() != findIter) { g_sessionTracker->stopTracking((*findIter).second.m_sessionId); } std::string sessionId; if (cfsControlLogin((*iter).second, sessionId)) { addCfsId((*iter).first, (*iter).second, sessionId); } } } } setTimeout(m_serverOptions->cfsGetConnectInfoIntervalSeconds()); } } unsigned short CfsManager::getCfsConnectBackPort(std::string const& cfsId) { // when PS connects back to cxps:cfs it always needs to use the non ssl port // if secure is requested it will still do ssl hand shake and be secure boost::shared_lock<boost::shared_mutex> guard(m_cfsConnectInfosMutex); // shared lock to allow multiple readers cfsConnectInfos_t::iterator iter(m_cfsConnectInfos.find(cfsId)); if (m_cfsConnectInfos.end() != iter) { try { return boost::lexical_cast<unsigned short>((*iter).second.m_port); } catch (...) { CXPS_LOG_ERROR(AT_LOC << "Invalid port found for cfs host id " << cfsId); } } return 0; } void CfsManager::monitor() { try { bool cfsMode = m_serverOptions->cfsMode(); CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "CFS MONITOR STARTED " << (cfsMode ? "CFS ENABLED" : "")); CXPS_LOG_ERROR_INFO("CFS MONITOR STARTED " << (cfsMode ? "CFS ENABLED" : "")); int m_monitorInterval = m_serverOptions->cfsMonitorIntervalSeconds(); boost::system_time waitUntilTime = boost::get_system_time(); waitUntilTime += boost::posix_time::seconds(m_monitorInterval); boost::unique_lock<boost::mutex> lock(m_monitorMutex); while (!m_quit) { if (!m_monitorCond.timed_wait(lock, waitUntilTime)) { waitUntilTime += boost::posix_time::seconds(m_monitorInterval); if (cfsMode) { try { sendCsCfsHeartbeat(); } catch (std::exception const& e) { CXPS_LOG_ERROR(AT_LOC << e.what()); } } } } CXPS_LOG_MONITOR(MONITOR_LOG_LEVEL_1, "CFS MONITOR STOPPED " << (cfsMode ? "CFS ENABLED" : "")); CXPS_LOG_ERROR_INFO("CFS MONITOR STOPPED " << (cfsMode ? "CFS ENABLED" : "")); } catch (...) { // nothing to do } } void CfsManager::sendCsCfsHeartbeat() { CsClient client(m_serverOptions->csUseSecure(), m_serverOptions->csIpAddress(), m_serverOptions->csUseSecure() ? m_serverOptions->csSslPort() : m_serverOptions->csPort(), m_serverOptions->id(), m_serverOptions->password(), m_serverOptions->csCertFile().string()); CsError csError; if (!client.sendCfsHeartbeat(m_serverOptions->csUrl(), csError)) { CXPS_LOG_ERROR(AT_LOC << "Failed: reason - " << csError.reason << ", data: " << (csError.data.empty() ? "" : csError.data.c_str())); } }