host/cxpslib/pssettingsconfigurator.cpp (274 lines of code) (raw):
#include <boost/filesystem/string_file.hpp>
#include <boost/make_shared.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/interprocess/sync/sharable_lock.hpp>
#include "Telemetry/TelemetryCommon.h"
#include "pssettingsconfigurator.h"
#include "serveroptions.h"
using namespace PSSettings;
PSSettingsConfigurator PSSettingsConfigurator::s_instance;
const char* CacheDataHeader::CHECKSUM_TYPE_MD5 = "MD5";
const char* CacheDataHeader::CURRENT_CACHED_DATA_VERSION = "1.10";
const int CacheDataHeader::CURRENT_CACHED_DATA_MAJOR_VERSION = 1;
const int CacheDataHeader::CURRENT_CACHED_DATA_MINOR_VERSION = 10;
PSSettingsPtr PSSettingsConfigurator::ReadSettingsFromFile(
bool& fileUnavailable,
std::string& knownToFailHeader,
std::string& knownToFailContent)
{
fileUnavailable = true;
if (!boost::filesystem::exists(m_settingsLckFilePath) ||
!boost::filesystem::exists(m_settingsFilePath))
{
return PSSettingsPtr();
}
fileUnavailable = false;
boost::system::error_code ec;
time_t t = boost::filesystem::last_write_time(m_settingsFilePath, ec);
if (ec)
{
t = 0;
}
if (!t && m_MTime == t)
{
return PSSettingsPtr();
}
std::string headerStr;
std::string settingsFileContent;
// TODO-SanKumar-2002: This constructor throws, if the lock file is not
// present. Instead only expect the actual setting file to present, while
// creating this file, if not present.
boost::interprocess::file_lock flock(m_settingsLckFilePath.string().c_str());
{
boost::interprocess::sharable_lock<boost::interprocess::file_lock> sharableFlock(flock);
// TODO-SanKumar-2002: This lock is not cancellable, which could be
// replaced with Timed sharable_lock. The problem is that the lock seems
// to be spinning repeatedly instead of blocking. Gotta check before usage.
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(m_settingsFilePath.string().c_str(), std::ios_base::binary);
if (m_serverOptions->psSettingsIncludesHeader())
{
std::getline(file, headerStr);
}
std::size_t sz = static_cast<std::size_t>(boost::filesystem::file_size(m_settingsFilePath));
sz -= file.tellg();
settingsFileContent.resize(sz, '\0');
file.read(&settingsFileContent[0], sz);
}
if (headerStr == knownToFailHeader &&
settingsFileContent == knownToFailContent)
{
// By doing this precheck, we avoid repeated logging for a known
// bad file, which will keep on failing.
return PSSettingsPtr();
// TODO-SanKumar-2002: Rate controlled log (say once in 10/25 times) in
// both +ve and -ve cases.
}
if ((m_serverOptions->psSettingsIncludesHeader() && headerStr.empty()) ||
(settingsFileContent.empty()))
{
CXPS_LOG_ERROR(AT_LOC << "Header line or content missing in the cache settings file");
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
if ((m_serverOptions->psSettingsIncludesHeader()) &&
(m_serverOptions->psSettingsEnforceMajorVersionCheck() ||
m_serverOptions->psSettingsVerifyHeader()))
{
CacheDataHeader parsedHeader;
try
{
parsedHeader =
JSON::consumer<CacheDataHeader>::convert(headerStr, true); // std::move() candidate
}
catch (const std::exception & ex)
{
CXPS_LOG_ERROR(AT_LOC <<
"Json serialization failed for the header in the cached settings file - " << ex.what());
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
if (m_serverOptions->psSettingsEnforceMajorVersionCheck())
{
if (parsedHeader.Version.empty())
{
CXPS_LOG_ERROR(AT_LOC << "Empty version found in the cached settings file");
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
std::vector<std::string> versionParts;
boost::split(versionParts, parsedHeader.Version, boost::is_any_of("."));
int majorVersion = boost::lexical_cast<int>(versionParts[0]);
int minorVersion = 0;
if (versionParts.size() != 1)
{
minorVersion = boost::lexical_cast<int>(versionParts[1]);
}
if (majorVersion != CacheDataHeader::CURRENT_CACHED_DATA_MAJOR_VERSION)
{
CXPS_LOG_ERROR(AT_LOC <<
"Major version of the cached settings file - " << majorVersion <<
" doesn't match the expected major version - " <<
CacheDataHeader::CURRENT_CACHED_DATA_MAJOR_VERSION);
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
if (m_serverOptions->psSettingsEnforceMinorVersionCheck() &&
minorVersion != CacheDataHeader::CURRENT_CACHED_DATA_MINOR_VERSION)
{
CXPS_LOG_ERROR(AT_LOC <<
"Minor version of the cached settings file - " << minorVersion <<
" doesn't match the expected minor version - " <<
CacheDataHeader::CURRENT_CACHED_DATA_MINOR_VERSION);
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
}
if (m_serverOptions->psSettingsVerifyHeader() &&
!parsedHeader.IsMatchingContent(settingsFileContent))
{
CXPS_LOG_ERROR(AT_LOC <<
"Checksum validation failed for the cached settings file - " <<
parsedHeader.Checksum << " (" << parsedHeader.ChecksumType << ")");
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
}
// the contents of the file are good, so update the cached time and content
m_MTime = t;
if (m_settingsFileContent == settingsFileContent)
{
return PSSettingsPtr();
}
PSSettingsPtr parsedSettingsPtr;
try
{
parsedSettingsPtr = boost::make_shared<PSSettings>(
JSON::consumer<PSSettings>::convert(settingsFileContent, true)); // std::move() candidate
m_settingsFileContent = settingsFileContent;
}
catch (const std::exception &ex)
{
CXPS_LOG_ERROR(AT_LOC <<
"Json serialization failed for the content in the cached settings file - " << ex.what());
knownToFailHeader = headerStr;
knownToFailContent = settingsFileContent;
return PSSettingsPtr();
}
// TODO-SanKumar-2002: By setting these values even in +ve case, we could
// avoid unnecessary JSON parsings as well as callbacks at the caller. Adverse
// effect would be that if any failed callbacks / other intermittent logic
// errors wouldn't be retried.
knownToFailHeader.clear();
knownToFailContent.clear();
return parsedSettingsPtr;
}
void PSSettingsConfigurator::ProcessNewSettings(PSSettingsPtr newSettings, bool fileUnavailable)
{
if (!newSettings && !fileUnavailable)
return; // Failure in reading the latest settings file; ignore.
PSSettingsPtr oldSettings = boost::atomic_exchange(&m_psSettings, newSettings);
NotifyTunablesChange(!newSettings? StringMapPtr() : newSettings->Tunables);
}
void PSSettingsConfigurator::Initialize(
const boost::filesystem::path& settingsFilePath,
const boost::filesystem::path& settingsLckFilePath,
serverOptionsPtr serverOptions)
{
m_settingsFilePath = settingsFilePath;
m_settingsLckFilePath = settingsLckFilePath;
m_serverOptions = serverOptions;
m_MTime = 0;
try
{
bool fileUnavailable;
std::string ignore1, ignore2;
PSSettingsPtr retrievedSettings = ReadSettingsFromFile(fileUnavailable, ignore1, ignore2);
ProcessNewSettings(retrievedSettings, fileUnavailable);
}
GENERIC_CATCH_LOG_IGNORE("Initial load of PS Settings from file failed")
m_isInitialized = true;
}
void PSSettingsConfigurator::Start()
{
if (!m_isInitialized)
{
BOOST_ASSERT(false);
throw std::logic_error("PSSettingsConfigurator is started without initialization");
}
BOOST_ASSERT(!m_thread);
// On thread spawn failure, the exception would stop the server. So, it's
// guaranteed that this thread would always be running, as long as the
// server is running.
m_thread.reset(
new boost::thread(boost::bind(&PSSettingsConfigurator::Run, this)));
}
void PSSettingsConfigurator::Stop()
{
// If the thread was created, signal completion and wait for it to finish.
if (m_thread)
{
BOOST_ASSERT(m_isInitialized);
m_thread->interrupt();
m_thread->join();
m_thread.reset();
}
}
void PSSettingsConfigurator::Run()
{
size_t dbgIgnoredExceptionCnt = 0;
CXPS_LOG_ERROR_INFO("Starting PSSettingsConfigurator thread");
boost::chrono::steady_clock::time_point lastItrStartTime;
std::string knownToFailHeader;
std::string knownToFailContent;
for(;;)
{
lastItrStartTime = boost::chrono::steady_clock::now();
try
{
bool fileUnavailable;
PSSettingsPtr retrievedSettings =
ReadSettingsFromFile(fileUnavailable, knownToFailHeader, knownToFailContent);
ProcessNewSettings(retrievedSettings, fileUnavailable);
}
GENERIC_CATCH_LOG_ACTION("PSSettingsConfigurator run loop", dbgIgnoredExceptionCnt++)
try
{
boost::chrono::steady_clock::duration durationToWait =
boost::chrono::seconds(m_serverOptions->psSettingsPollIntervalSecs());
durationToWait -= boost::chrono::steady_clock::now() - lastItrStartTime; // Adjust for already spent time
if (durationToWait < boost::chrono::steady_clock::duration::zero())
durationToWait = boost::chrono::steady_clock::duration::zero();
boost::this_thread::sleep_for(durationToWait);
}
catch (boost::thread_interrupted)
{
CXPS_LOG_ERROR_INFO("Stopping PSSettingsConfigurator thread");
return;
}
}
}
PSSettingsConfigurator::SubscriptionNum_t PSSettingsConfigurator::SubscribeForTunablesChange(TunablesChangeNotificationCallback callback)
{
if (!callback)
{
throw std::logic_error("Invalid callback used for Tunables subscription");
}
boost::unique_lock<boost::shared_mutex> wlock(m_tunablesCallbackMapMutex);
SubscriptionNum_t subscriptionNumber = boost::chrono::steady_clock::now().time_since_epoch().count();
std::pair<TunablesCallbackMap::iterator, bool> result =
m_tunablesChangeCallbackMap.insert(std::make_pair(subscriptionNumber, callback));
if (!result.second)
{
BOOST_ASSERT(false);
throw std::logic_error("Tunables subscription internal error");
}
return subscriptionNumber;
}
void PSSettingsConfigurator::UnsubscribeForTunablesChange(SubscriptionNum_t subscriptionNumber)
{
boost::unique_lock<boost::shared_mutex> wlock(m_tunablesCallbackMapMutex);
BOOST_VERIFY(m_tunablesChangeCallbackMap.erase(subscriptionNumber) == 1);
}
void PSSettingsConfigurator::NotifyTunablesChange(StringMapPtr tunables)
{
boost::shared_lock<boost::shared_mutex> rlock(m_tunablesCallbackMapMutex);
for (TunablesCallbackMap::iterator cbItr = m_tunablesChangeCallbackMap.begin();
cbItr != m_tunablesChangeCallbackMap.end();
cbItr++)
{
try
{
cbItr->second(tunables);
}
GENERIC_CATCH_LOG_IGNORE(<< "Notifying tunables change for subscription number : " << cbItr->first <<);
}
}