host/cxpslib/DiffResyncThrottlingHelper.cpp (459 lines of code) (raw):

#include "DiffResyncThrottlingHelper.h" #include "DirectorySizeCalculator.h" #include "cxps.h" // resync folder name for legacy stack #define RESYNC_FOLDER_PATH "resync" DiffResyncThrottlingHelper::DiffResyncThrottlingHelper(serverOptionsPtr serverOptions, boost::asio::io_service& ioservice) :m_diffResyncThrottleTimer(ioservice), m_serverOptions(serverOptions), m_quitRequested(false) {} void DiffResyncThrottlingHelper::start() { CXPS_LOG_ERROR_INFO("Starting DiffResync Throttling Helper"); m_cachedProtectedPairs.clear(); BOOST_ASSERT(!m_quitRequested); if (m_quitRequested) { throw std::logic_error("DiffResyncThrottlingHelper is started again after stopping"); } // according to boost documentation the handler to this timer will never be called from inside this function // so the service start will never execute the handler function. So we initialize the initial timer value to // zero to ensure that handler function runs immediately for the first time as soon as ioservice::run is // called on a thread. m_diffResyncThrottleTimer.expires_from_now(boost::asio::chrono::milliseconds::zero()); m_diffResyncThrottleTimer.async_wait(boost::bind(&DiffResyncThrottlingHelper::run, this, boost::asio::placeholders::error)); } void DiffResyncThrottlingHelper::run(const boost::system::error_code& error) { // When the timer is cancelled, this handler will be called with // operation_aborted error code SCOPE_GUARD updateTimerGuard = MAKE_SCOPE_GUARD(boost::bind(&DiffResyncThrottlingHelper::updateDiffResyncThrottleTimeoutAndWait, this)); if (error == boost::asio::error::operation_aborted) { CXPS_LOG_ERROR_INFO("Stopping DiffResync Throttling Helper"); updateTimerGuard.dismiss(); // Do nothing return; } if (!m_quitRequested) { if (!error) { updateProtectedPairInfo(); } else { CXPS_LOG_ERROR(AT_LOC << "Failed to update pairsettings due to error " << error); } } else { updateTimerGuard.dismiss(); } } void DiffResyncThrottlingHelper::stop() { m_quitRequested = true; m_diffResyncThrottleTimer.cancel(); } void DiffResyncThrottlingHelper::updateDiffResyncThrottleTimeoutAndWait() { updateDiffResyncThrottleTimeout(); m_diffResyncThrottleTimer.async_wait(boost::bind(&DiffResyncThrottlingHelper::run, this, boost::asio::placeholders::error)); } void DiffResyncThrottlingHelper::updateDiffResyncThrottleTimeout() { boost::unique_lock<boost::shared_mutex> wrlock(m_diffResyncThrottleTimerMutex); m_diffResyncThrottleTimer.expires_at(m_diffResyncThrottleTimer.expiry() + boost::asio::chrono::seconds(m_serverOptions->diffResyncThrottleTimeoutInSec())); // No need to unlock as unique lock is scoped } void DiffResyncThrottlingHelper::updateProtectedPairInfo() { pairsPtr protectedPairs = createReplicationPairInfo(); // for updating all the pairs we use unique lock at top level // change it to use 2 level locking boost::unique_lock<boost::shared_mutex> wrlock(m_diffResyncThrottleMutex); // We delete the required pairs during traversal to avoid 2 loops // Should we traverse the loop first and then delete required pairs? protectedPairInfo_t::iterator nextHostItr; for (protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.begin(), nextHostItr = hostitr; hostitr != m_cachedProtectedPairs.end(); hostitr = nextHostItr) { if (m_quitRequested) break; nextHostItr++; pairs_t::iterator protectedPairHostItr = protectedPairs->find(hostitr->first); if (protectedPairHostItr != protectedPairs->end()) { std::map<std::string, ReplicationPairInfo>::iterator nextDeviceItr; for (std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.begin(), nextDeviceItr = deviceitr; deviceitr != hostitr->second.end(); deviceitr = nextDeviceItr) { if (m_quitRequested) break; nextDeviceItr++; std::map<std::string, ReplicationPairPathAndThrottleLimitInfo>::iterator protectedPairDeviceItr = protectedPairHostItr->second.find(deviceitr->first); if (protectedPairDeviceItr != protectedPairHostItr->second.end()) { // pair is protected // To do: change this value based on replication state deviceitr->second.isProtected = true; deviceitr->second.IRFolderPath = protectedPairDeviceItr->second.IRFolderPath; deviceitr->second.DRSourceFolderPath = protectedPairDeviceItr->second.DRSourceFolderPath; deviceitr->second.DRTargetFolderPath = protectedPairDeviceItr->second.DRTargetFolderPath; deviceitr->second.IRPendingData = GetPendingResyncData(protectedPairDeviceItr->second.IRFolderPath); deviceitr->second.DRPendingData = GetPendingDiffData(protectedPairDeviceItr->second.DRSourceFolderPath, protectedPairDeviceItr->second.DRTargetFolderPath); deviceitr->second.lastUpdateTime = boost::chrono::steady_clock::now(); deviceitr->second.ReplicationState = protectedPairDeviceItr->second.ReplicationState; deviceitr->second.DiffThrottleLimit = protectedPairDeviceItr->second.DiffThrottleLimit; deviceitr->second.ResyncThrottleLimit = protectedPairDeviceItr->second.ResyncThrottleLimit; } else { deviceitr->second.isProtected = false; // if last update time for cache is greater than prune interval, then prune the pair from cache. if (boost::chrono::steady_clock::now() - deviceitr->second.lastUpdateTime > boost::chrono::seconds(m_serverOptions->nonProtectedPairPruneTimeoutInSec())) { hostitr->second.erase(deviceitr); } } } // Todo: sadewang : currently we are not updating the cache with the pairs that are present in protectedPairs but not in m_cachedProtectedPairs // It will get updated once they receive any putfile request. Should it be updated here? } else { // host is present in cache but not in protected pair list std::map<std::string, ReplicationPairInfo>::iterator nextDeviceItr; for (std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.begin(), nextDeviceItr = deviceitr; deviceitr != hostitr->second.end(); deviceitr = nextDeviceItr) { if (m_quitRequested) break; nextDeviceItr++; deviceitr->second.isProtected = false; // if last update time for cache is greater than prune interval, then prune the pair from cache. if (boost::chrono::steady_clock::now() - deviceitr->second.lastUpdateTime > boost::chrono::seconds(m_serverOptions->nonProtectedPairPruneTimeoutInSec())) { hostitr->second.erase(deviceitr); } } } // if there are no disks left for this host id , then prune the host if (hostitr->second.empty()) { m_cachedProtectedPairs.erase(hostitr); } } } unsigned long long DiffResyncThrottlingHelper::GetPendingData(const std::string & folderPath, const std::list<boost::filesystem::path> &exclusionList) { if (folderPath.empty()) return 0; try { return CalculateDirectorySize(folderPath, "*", true, exclusionList); } catch (const std::exception & ex) { CXPS_LOG_ERROR(AT_LOC << "Failed to calculate directory size for directory " << folderPath << " with exception " << ex.what()); return LLONG_MAX; } catch (const std::string &exStr) { CXPS_LOG_ERROR(AT_LOC << "Failed to calculate directory size for directory " << folderPath << " with string exception " << exStr); return LLONG_MAX; } catch (const char *exLiteral) { CXPS_LOG_ERROR(AT_LOC << "Failed to calculate directory size for directory " << folderPath << " with literal exception " << exLiteral); return LLONG_MAX; } catch (...) { CXPS_LOG_ERROR(AT_LOC << "Failed to calculate directory size for directory " << folderPath << " due to generic exception."); return LLONG_MAX; } } unsigned long long DiffResyncThrottlingHelper::GetPendingDiffData(const std::string & sourceDiffDirectory, const std::string & targetDiffDirectory) { unsigned long long sourceDiffPendingData = 0; unsigned long long targetDiffPendingData = 0; if (GetCSMode() == CS_MODE_LEGACY_CS) { boost::filesystem::path excludedFolders(RESYNC_FOLDER_PATH); std::list<boost::filesystem::path> exclusionList; exclusionList.push_back(excludedFolders); sourceDiffPendingData = GetPendingData(sourceDiffDirectory); targetDiffPendingData = GetPendingData(targetDiffDirectory, exclusionList); if (sourceDiffPendingData == LLONG_MAX || targetDiffPendingData == LLONG_MAX) { return LLONG_MAX; } else { return sourceDiffPendingData + targetDiffPendingData; } } else if (GetCSMode() == CS_MODE_RCM) { // To do: sadewang - change to model where we monitor list of directories return GetPendingData(targetDiffDirectory); } else { BOOST_ASSERT(false); return LLONG_MAX; } } unsigned long long DiffResyncThrottlingHelper::GetPendingResyncData(const std::string & resyncDirectory) { boost::system::error_code ec; if (!boost::filesystem::exists(resyncDirectory, ec)) { return 0; } return GetPendingData(resyncDirectory); } unsigned long long DiffResyncThrottlingHelper::getRemainingTimeInMs() { unsigned long long remainingTime; { boost::shared_lock<boost::shared_mutex> rdlock(m_diffResyncThrottleTimerMutex); remainingTime = getAbsoluteDurationInMs(boost::asio::chrono::duration_cast<boost::asio::chrono::milliseconds>( m_diffResyncThrottleTimer.expiry() - boost::asio::chrono::steady_clock::now())); // No need to unlock as shared lock is scoped } return remainingTime; } unsigned long long DiffResyncThrottlingHelper::getAbsoluteDurationInMs(const boost::asio::chrono::milliseconds &duration) { return (duration > boost::asio::chrono::milliseconds::zero()) ? duration.count() : 0 ; } bool DiffResyncThrottlingHelper::isProtectedPair(const std::string & hostId, const std::string & diskId) { bool isProtected = false; { boost::shared_lock<boost::shared_mutex> rdlock(m_diffResyncThrottleMutex); protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.find(hostId); if (hostitr != m_cachedProtectedPairs.end()) { std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.find(diskId); if (deviceitr != hostitr->second.end()) isProtected = true; } } return isProtected; } bool DiffResyncThrottlingHelper::checkForDiffThrottle(const std::string & hostId, const std::string & diskId, CxpsTelemetry::FileType filetype, unsigned long long filesize, const boost::filesystem::path & fullPathName) { return checkForThrottle(hostId, diskId, filetype, filesize, fullPathName); } bool DiffResyncThrottlingHelper::checkForResyncThrottle(const std::string & hostId, const std::string & diskId, CxpsTelemetry::FileType filetype, unsigned long long filesize, const boost::filesystem::path & fullPathName) { return checkForThrottle(hostId, diskId, filetype, filesize, fullPathName); } void DiffResyncThrottlingHelper::updatePairCacheData(ReplicationPairInfo & rpi) { boost::unique_lock<boost::shared_mutex> wrlock(*(rpi.mutexPtr)); rpi.IRPendingData = GetPendingResyncData(rpi.IRFolderPath); rpi.DRPendingData = GetPendingDiffData(rpi.DRSourceFolderPath, rpi.DRTargetFolderPath); rpi.lastUpdateTime = boost::chrono::steady_clock::now(); } bool DiffResyncThrottlingHelper::checkForThrottle(const std::string & hostId, const std::string & diskId, CxpsTelemetry::FileType filetype, unsigned long long filesize, const boost::filesystem::path & fullPathName) { BOOST_ASSERT(filetype == CxpsTelemetry::FileType_DiffSync || filetype == CxpsTelemetry::FileType_Resync); BOOST_ASSERT(filesize >= 0); boost::upgrade_lock<boost::shared_mutex> uplock(m_diffResyncThrottleMutex); protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.find(hostId); if (hostitr != m_cachedProtectedPairs.end()) { std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.find(diskId); if (deviceitr != hostitr->second.end()) { unsigned long long diffThrottleThresholdInBytes, resyncThrottleThresholdInBytes; if (deviceitr->second.isProtected) { diffThrottleThresholdInBytes = deviceitr->second.DiffThrottleLimit; resyncThrottleThresholdInBytes = deviceitr->second.ResyncThrottleLimit; } else { diffThrottleThresholdInBytes = m_serverOptions->defaultThrottleThresholdInBytes(); resyncThrottleThresholdInBytes = m_serverOptions->defaultThrottleThresholdInBytes(); if (filetype == CxpsTelemetry::FileType_DiffSync && deviceitr->second.DRTargetFolderPath.empty()) { BOOST_ASSERT(boost::filesystem::is_regular_file(fullPathName)); deviceitr->second.DRTargetFolderPath = (fullPathName.has_parent_path()) ? fullPathName.parent_path().string() : ""; updatePairCacheData(deviceitr->second); } if (filetype == CxpsTelemetry::FileType_Resync && deviceitr->second.IRFolderPath.empty()) { BOOST_ASSERT(boost::filesystem::is_regular_file(fullPathName)); deviceitr->second.IRFolderPath = (fullPathName.has_parent_path()) ? fullPathName.parent_path().string() : ""; updatePairCacheData(deviceitr->second); } } if (boost::chrono::steady_clock::now() - deviceitr->second.lastUpdateTime > boost::chrono::seconds(m_serverOptions->diffResyncThrottleCacheExpiryIntervalInSec())) { updatePairCacheData(deviceitr->second); } return checkForThrottle(filetype, deviceitr->second.DRPendingData, deviceitr->second.IRPendingData, diffThrottleThresholdInBytes, resyncThrottleThresholdInBytes, filesize, deviceitr->second.mutexPtr); } else { ReplicationPairInfo newPairInfo = PrepareToAddPairToCache(hostId, diskId, filetype, filesize, fullPathName); updatePairCacheData(newPairInfo); boost::upgrade_to_unique_lock<boost::shared_mutex> upLockObj(uplock); hostitr->second.insert(std::make_pair(diskId, newPairInfo)); return checkForThrottle(filetype, newPairInfo.DRPendingData, newPairInfo.IRPendingData, m_serverOptions->defaultThrottleThresholdInBytes(), m_serverOptions->defaultThrottleThresholdInBytes(), filesize, newPairInfo.mutexPtr); } } else { std::map<std::string, ReplicationPairInfo> hostinfo; ReplicationPairInfo newPairInfo = PrepareToAddPairToCache(hostId, diskId, filetype, filesize, fullPathName); updatePairCacheData(newPairInfo); hostinfo.insert(std::make_pair(diskId, newPairInfo)); boost::upgrade_to_unique_lock<boost::shared_mutex> upLockObj(uplock); m_cachedProtectedPairs.insert(std::make_pair(hostId, hostinfo)); return checkForThrottle(filetype, newPairInfo.DRPendingData, newPairInfo.IRPendingData, m_serverOptions->defaultThrottleThresholdInBytes(), m_serverOptions->defaultThrottleThresholdInBytes(), filesize, newPairInfo.mutexPtr); } return false; } bool DiffResyncThrottlingHelper::checkForThrottle(CxpsTelemetry::FileType filetype, unsigned long long &pendingDiffData, unsigned long long &pendingResyncData, const unsigned long long &diffThrottleLimit, const unsigned long long &resyncThrottleLimit, const unsigned long long &filesize, sharedMutexPtr mutexptr) { // can be optimised by taking read lock initially and then upgrading to write lock // but upgrading the lock may have more overhead then the code itself boost::unique_lock<boost::shared_mutex> wrlock(*mutexptr); if (filetype == CxpsTelemetry::FileType_DiffSync) { if (pendingDiffData > diffThrottleLimit) return true; else pendingDiffData += filesize; } if (filetype == CxpsTelemetry::FileType_Resync) { if (pendingResyncData > resyncThrottleLimit) return true; else pendingResyncData += filesize; } return false; } ReplicationPairInfo DiffResyncThrottlingHelper::PrepareToAddPairToCache(const std::string & hostId, const std::string & diskId, CxpsTelemetry::FileType filetype, unsigned long long filesize, const boost::filesystem::path & fullPathName) { BOOST_ASSERT(boost::filesystem::is_regular_file(fullPathName)); ReplicationPairInfo rpi; if (filetype == CxpsTelemetry::FileType_DiffSync) { rpi.DRPendingData = filesize; rpi.IRPendingData = 0; rpi.IRFolderPath.clear(); rpi.DRSourceFolderPath.clear(); // for legacy stack ps, diff files will never move to target folder, since there are no settings for this pair rpi.DRTargetFolderPath = (fullPathName.has_parent_path()) ? fullPathName.parent_path().string() : ""; } if (filetype == CxpsTelemetry::FileType_Resync) { rpi.IRPendingData = filesize; rpi.DRPendingData = 0; rpi.IRFolderPath = (fullPathName.has_parent_path()) ? fullPathName.parent_path().string() : ""; rpi.DRSourceFolderPath.clear(); rpi.DRTargetFolderPath.clear(); } rpi.ReplicationState.clear(); rpi.isProtected = false; rpi.lastUpdateTime = boost::chrono::steady_clock::now(); rpi.mutexPtr = boost::make_shared<boost::shared_mutex>(); return rpi; } void DiffResyncThrottlingHelper::reduceCachedPendingDataSize(const std::string & hostId, const std::string & diskId, CxpsTelemetry::FileType filetype, unsigned long long filesize) { // cache is maintained only for diff and resync files // no-op for other files BOOST_ASSERT(filesize >= 0); if ((filetype == CxpsTelemetry::FileType_DiffSync || filetype == CxpsTelemetry::FileType_Resync) && filesize >= 0) { boost::shared_lock<boost::shared_mutex> rdlock(m_diffResyncThrottleMutex); protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.find(hostId); if (hostitr != m_cachedProtectedPairs.end()) { std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.find(diskId); if (deviceitr != hostitr->second.end()) { if (filetype == CxpsTelemetry::FileType_DiffSync) { boost::unique_lock<boost::shared_mutex> wrlock(*(deviceitr->second.mutexPtr)); deviceitr->second.DRPendingData = std::max<unsigned long long>(0ull, deviceitr->second.DRPendingData - filesize); } else if (filetype == CxpsTelemetry::FileType_Resync) { boost::unique_lock<boost::shared_mutex> wrlock(*(deviceitr->second.mutexPtr)); deviceitr->second.IRPendingData = std::max<unsigned long long>(0ull, deviceitr->second.IRPendingData - filesize); } } } } } unsigned long long DiffResyncThrottlingHelper::getDiffsyncFolderSize(const std::string & hostId, const std::string & diskId) { boost::shared_lock<boost::shared_mutex> rdlock(m_diffResyncThrottleMutex); protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.find(hostId); if (hostitr != m_cachedProtectedPairs.end()) { std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.find(diskId); if (deviceitr != hostitr->second.end()) { boost::shared_lock<boost::shared_mutex> rdLock(*(deviceitr->second.mutexPtr)); return deviceitr->second.DRPendingData; } } // return 0 if the pair is not found in cache return 0; } unsigned long long DiffResyncThrottlingHelper::getResyncFolderSize(const std::string & hostId, const std::string & diskId) { boost::shared_lock<boost::shared_mutex> rdlock(m_diffResyncThrottleMutex); protectedPairInfo_t::iterator hostitr = m_cachedProtectedPairs.find(hostId); if (hostitr != m_cachedProtectedPairs.end()) { std::map<std::string, ReplicationPairInfo>::iterator deviceitr = hostitr->second.find(diskId); if (deviceitr != hostitr->second.end()) { boost::shared_lock<boost::shared_mutex> rdLock(*(deviceitr->second.mutexPtr)); return deviceitr->second.IRPendingData; } } //return 0 if the pair is not found in cache return 0; } pairsPtr DiffResyncThrottlingHelper::createReplicationPairInfo() { pairsPtr pairinfo = boost::make_shared<pairs_t>(); PSSettings::PSSettingsPtr psSettingsPtr = PSSettings::PSSettingsConfigurator::GetInstance().GetPSSettings(); if (psSettingsPtr != NULL) { boost::shared_ptr<PSSettings::PSSettingsHostwisePtrsMap> hostwisePtrsMap = psSettingsPtr->HostwiseSettings; if (hostwisePtrsMap != NULL) { for (PSSettings::PSSettingsHostwisePtrsMap::iterator hostitr = hostwisePtrsMap->begin(); hostitr != hostwisePtrsMap->end(); hostitr++) { // hostitr->first holds the hostid // hostitr->second holds all the details for that hostid PSSettings::PSSettingsHostwisePtr hostwisePtr = hostitr->second; if (hostwisePtr != NULL) { // only if there is some information about that host, it should be included in the cache. std::map<std::string, ReplicationPairPathAndThrottleLimitInfo> hostwiseinfo; boost::shared_ptr<PSSettings::PSSettingsPairwisePtrsMap> pairwisePtrsMap = hostwisePtr->PairwiseSettings; if (pairwisePtrsMap != NULL) { for (PSSettings::PSSettingsPairwisePtrsMap::iterator pairitr = pairwisePtrsMap->begin(); pairitr != pairwisePtrsMap->end(); pairitr++) { // pairitr->first holds the deviceid // pairitr->second holds the device settings PSSettings::PSSettingsPairwisePtr pairwisePtr = pairitr->second; if (pairwisePtr != NULL) { ReplicationPairPathAndThrottleLimitInfo repPairInfo; repPairInfo.IRFolderPath = pairwisePtr->ResyncFolder; repPairInfo.DRSourceFolderPath = pairwisePtr->SourceDiffFolder; repPairInfo.DRTargetFolderPath = pairwisePtr->TargetDiffFolder; repPairInfo.ReplicationState = pairwisePtr->ReplicationState; repPairInfo.DiffThrottleLimit = pairwisePtr->DiffThrottleLimit; repPairInfo.ResyncThrottleLimit = pairwisePtr->ResyncThrottleLimit; hostwiseinfo.insert(std::make_pair(pairitr->first, repPairInfo)); } } } // To do: sadewang: Replace by sharedptr if (!hostwiseinfo.empty()) { pairinfo->insert(std::make_pair(hostitr->first, hostwiseinfo)); } } } } } else { CXPS_LOG_ERROR("Failed to get latest settings"); } return pairinfo; }