host/cxpslib/pssettingsconfigurator.h (386 lines of code) (raw):
#ifndef PS_SETTINGS_CONFIGURATOR_H
#define PS_SETTINGS_CONFIGURATOR_H
#include <boost/atomic.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/chrono.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <json_reader.h>
#include <map>
#include <inm_md5.h>
#include <securityutils.h>
class ServerOptions;
typedef boost::shared_ptr<ServerOptions> serverOptionsPtr;
namespace PSSettings
{
static boost::posix_time::ptime ConvertDotNetDateTime(ptree& node, const std::string& keyName)
{
std::string timeUtcStr;
JSON_P_KEYNAME(node, keyName, timeUtcStr);
if (boost::iends_with(timeUtcStr, "z"))
timeUtcStr.erase(timeUtcStr.size() - 1, 1);
if (timeUtcStr == "0001-01-01T00:00:00")
{
// Unset DateTime objects starts from 01.01.0001 but the valid gregorian
// date range starts from year 1400.
return boost::posix_time::ptime(boost::gregorian::date(1400, 1, 1));
}
return boost::posix_time::from_iso_extended_string(timeUtcStr);
}
struct PSSettingsPairwise
{
/// \brief host ID
std::string HostId;
/// \brief disk ID
std::string DeviceId;
/// \brief Target host ID
std::string TargetHostId;
/// \brief Target disk ID
std::string TargetDeviceId;
/// \brief direction of protection
std::string ProtectionDirection;
/// \brief folder under which diff files go in from source
/// (valid if IsSourceFolderSupported is set in PSSettings)
std::string SourceDiffFolder;
/// \brief folder under which diff files go in from source
/// (if IsSourceFolderSupported is not set in PSSettings)
/// or diff files go in from SourceDiffFolder (if
/// IsSourceFolderSupported is set in PSSettings)
std::string TargetDiffFolder;
/// \brief folder under which resync files go in from source
std::string ResyncFolder;
/// \brief folder under which diff files that can create
/// recovery point(s) is put by MT'
std::string AzurePendingUploadRecoverableFolder;
/// \brief folder under which diff files that can't create
/// recovery point(s) is put by MT'
std::string AzurePendingUploadNonRecoverableFolder;
/// \brief maximum bytes of diff data allowed for the pair on PS
long long DiffThrottleLimit;
/// \brief maximum bytes of resync data allowed for the pair on PS
long long ResyncThrottleLimit;
/// \brief path of the internal file used to monitor incoming data
/// outage of the pair
std::string RPOFilePath;
/// \brief folders to monitor for diffsync related health and statistics
std::vector<std::string> DiffFoldersToMonitor;
/// \brief folder to monitor for resync related health and statistics
std::string ResyncFolderToMonitor;
// TODO-SanKumar-2001:
std::string ReplicationSessionId;
// TODO-SanKumar-2001:
std::string ReplicationState;
/// \brief last IR progress updation time by source to the server
boost::posix_time::ptime IrProgressUpdateTimeUtc;
/// \brief internal folder used to monitor RPO of the pair
std::string TagFolder;
/// \brief context to be used on communication about the pair
std::string MessageContext;
/// \brief protection state for the pair
int ProtectionState;
void serialize(JSON::Adapter& adapter)
{
throw std::logic_error("Not implemented");
}
void serialize(ptree& node)
{
JSON_P(node, HostId);
JSON_P(node, DeviceId);
JSON_P(node, TargetHostId);
JSON_P(node, TargetDeviceId);
JSON_P(node, ProtectionDirection);
JSON_P(node, SourceDiffFolder);
JSON_P(node, TargetDiffFolder);
JSON_P(node, ResyncFolder);
JSON_P(node, AzurePendingUploadRecoverableFolder);
JSON_P(node, AzurePendingUploadNonRecoverableFolder);
JSON_P(node, DiffThrottleLimit);
JSON_P(node, ResyncThrottleLimit);
JSON_P(node, RPOFilePath);
JSON_VP(node, DiffFoldersToMonitor);
JSON_P(node, ResyncFolderToMonitor);
JSON_P(node, ReplicationSessionId);
JSON_P(node, ReplicationState);
IrProgressUpdateTimeUtc = ConvertDotNetDateTime(node, "IrProgressUpdateTimeUtc");
JSON_P(node, TagFolder);
JSON_P(node, MessageContext);
JSON_P(node, ProtectionState);
// Always store host id and disk id in lower case for easier comparison of keys.
boost::algorithm::to_lower(HostId);
boost::algorithm::to_lower(DeviceId);
boost::algorithm::to_lower(TargetHostId);
boost::algorithm::to_lower(TargetDeviceId);
}
bool operator==(const PSSettingsPairwise& rhs)
{
const PSSettingsPairwise& lhs = *this;
return
lhs.HostId == rhs.HostId &&
lhs.DeviceId == rhs.DeviceId &&
lhs.TargetHostId == rhs.TargetHostId &&
lhs.TargetDeviceId == rhs.TargetDeviceId &&
lhs.ProtectionDirection == rhs.ProtectionDirection &&
lhs.SourceDiffFolder == rhs.SourceDiffFolder &&
lhs.TargetDiffFolder == rhs.TargetDiffFolder &&
lhs.ResyncFolder == rhs.ResyncFolder &&
lhs.AzurePendingUploadRecoverableFolder == rhs.AzurePendingUploadRecoverableFolder &&
lhs.AzurePendingUploadNonRecoverableFolder == rhs.AzurePendingUploadNonRecoverableFolder &&
lhs.DiffThrottleLimit == rhs.DiffThrottleLimit &&
lhs.ResyncThrottleLimit == rhs.ResyncThrottleLimit &&
lhs.RPOFilePath == rhs.RPOFilePath &&
lhs.DiffFoldersToMonitor == rhs.DiffFoldersToMonitor &&
lhs.ResyncFolderToMonitor == rhs.ResyncFolderToMonitor &&
lhs.ReplicationSessionId == rhs.ReplicationSessionId &&
lhs.ReplicationState == rhs.ReplicationState &&
lhs.IrProgressUpdateTimeUtc == rhs.IrProgressUpdateTimeUtc &&
lhs.TagFolder == rhs.TagFolder &&
lhs.MessageContext == rhs.MessageContext &&
lhs.ProtectionState == rhs.ProtectionState;
}
bool operator!=(const PSSettingsPairwise& rhs)
{
return !operator==(rhs);
}
};
typedef boost::shared_ptr<PSSettingsPairwise> PSSettingsPairwisePtr;
typedef std::map<std::string, PSSettingsPairwisePtr> PSSettingsPairwisePtrsMap;
struct PSSettingsHostwise
{
/// \brief host ID
std::string HostId;
/// \brief Target host ID
std::string TargetHostId;
// TODO-SanKumar-2001:
std::string ProtectionState;
/// \brief folder under which all the resync and diff data
/// will go into for the host.
std::string LogRootFolder;
/// \brief uri to report critical health and events for the host
std::string CriticalChannelUri;
/// \brief renewal time of CriticalChannelUri
boost::posix_time::ptime CriticalChannelUriRenewalTimeUtc;
/// \brief uri to report informational health and events
std::string InformationalChannelUri;
/// \brief renewal time of InformationalChannelUri
boost::posix_time::ptime InformationalChannelUriRenewalTimeUtc;
/// \brief context to be used, while reporting critical and
/// informational health and events for the host
std::string MessageContext;
/// \brief SourceMachine BiosId
std::string BiosId;
boost::shared_ptr<PSSettingsPairwisePtrsMap> PairwiseSettings;
void serialize(JSON::Adapter& adapter)
{
throw std::logic_error("Not implemented");
}
void serialize(ptree& node)
{
std::string timeUtcStr;
std::vector<PSSettingsPairwise> pairSettings;
JSON_P(node, HostId);
JSON_P(node, ProtectionState);
JSON_P(node, LogRootFolder);
JSON_P(node, CriticalChannelUri);
CriticalChannelUriRenewalTimeUtc = ConvertDotNetDateTime(node, "CriticalChannelUriRenewalTimeUtc");
JSON_P(node, InformationalChannelUri);
InformationalChannelUriRenewalTimeUtc = ConvertDotNetDateTime(node, "InformationalChannelUriRenewalTimeUtc");
JSON_P(node, MessageContext);
JSON_P(node, BiosId);
JSON_VCL_KEYNAME(node, "Pairs", pairSettings);
// Always store host id in lower case for easier comparison of keys.
boost::algorithm::to_lower(HostId);
PairwiseSettings = boost::make_shared<PSSettingsPairwisePtrsMap>();
for (std::vector<PSSettingsPairwise>::iterator itr = pairSettings.begin(); itr != pairSettings.end(); itr++)
{
(*PairwiseSettings)[itr->DeviceId] = boost::make_shared<PSSettingsPairwise>(*itr); // std::move() candidate
if (!itr->TargetDeviceId.empty() && !itr->TargetHostId.empty())
{
TargetHostId = itr->TargetHostId;
(*PairwiseSettings)[itr->TargetDeviceId] = boost::make_shared<PSSettingsPairwise>(*itr);
}
}
}
bool leanEquals(const PSSettingsHostwise& rhs)
{
const PSSettingsHostwise& lhs = *this;
return
lhs.HostId == rhs.HostId &&
lhs.ProtectionState == rhs.ProtectionState &&
lhs.LogRootFolder == rhs.LogRootFolder &&
lhs.CriticalChannelUri == rhs.CriticalChannelUri &&
lhs.CriticalChannelUriRenewalTimeUtc == rhs.CriticalChannelUriRenewalTimeUtc &&
lhs.InformationalChannelUri == rhs.InformationalChannelUri &&
lhs.InformationalChannelUriRenewalTimeUtc == rhs.InformationalChannelUriRenewalTimeUtc &&
lhs.MessageContext == rhs.MessageContext &&
lhs.BiosId == rhs.BiosId;
}
bool leanNotEquals(const PSSettingsHostwise& rhs)
{
return !leanEquals(rhs);
}
};
typedef boost::shared_ptr<PSSettingsHostwise> PSSettingsHostwisePtr;
typedef std::map<std::string, PSSettingsHostwisePtr> PSSettingsHostwisePtrsMap;
struct PSProtectedMachineTelemetrySettings
{
/// \brief host ID
std::string HostId;
/// \brief folder under which the logs and telemetry of
/// the protected machine would be put in to be
/// uploaded to Kusto.
std::string TelemetryFolderPath;
/// \brief SourceMachine BiosId
std::string BiosId;
void serialize(JSON::Adapter& adapter)
{
throw std::logic_error("Not implemented");
}
void serialize(ptree& node)
{
JSON_P(node, HostId);
JSON_P(node, TelemetryFolderPath);
JSON_P(node, BiosId);
}
bool operator==(const PSProtectedMachineTelemetrySettings& rhs)
{
const PSProtectedMachineTelemetrySettings& lhs = *this;
return
lhs.HostId == rhs.HostId &&
lhs.TelemetryFolderPath == rhs.TelemetryFolderPath &&
lhs.BiosId == rhs.BiosId;
}
bool operator!=(const PSProtectedMachineTelemetrySettings& rhs)
{
return !operator==(rhs);
}
};
typedef boost::shared_ptr<PSProtectedMachineTelemetrySettings> PSProtMacTelSettingsPtr;
typedef std::map<std::string, PSProtMacTelSettingsPtr> PSProtMacTelSettingsPtrsMap;
typedef std::map<std::string, std::string> StringMap;
typedef boost::shared_ptr<StringMap> StringMapPtr;
struct PSSettings
{
/// \brief percentage of used cache volume size beyond which the incoming
/// data must be throttled for the PS
double CumulativeThrottleLimit;
/// \brief does the process server type support source folder concept?
bool IsSourceFolderSupported;
/// \brief app insights key to upload logs and telemetry for the PS
/// Alternate path to main path - TelemetryChannelUri
std::string ApplicationInsightsInstrumentationKey;
/// \brief uri to report critical health and events for the PS
std::string CriticalChannelUri;
/// \brief renewal time of CriticalChannelUri
boost::posix_time::ptime CriticalChannelUriRenewalTimeUtc;
/// \brief uri to report informational health and events for the PS
std::string InformationalChannelUri;
/// \brief renewal time of InformationalChannelUri
boost::posix_time::ptime InformationalChannelUriRenewalTimeUtc;
/// \brief uri to upload logs and telemetry of all the connected components
/// Source(s), PS, MT', MARS
std::string TelemetryChannelUri;
/// \brief renewal time of <c>TelemetryChannelUri</c>
boost::posix_time::ptime TelemetryChannelUriRenewalTimeUtc;
/// \brief message context to use for the communication about the PS
std::string MessageContext;
/// \brief flag to detect if private end points are enabled or not
bool IsPrivateEndpointEnabled;
/// \brief flag to detect if access control feature is enabled or not
bool IsAccessControlEnabled;
/// \brief server provided values to override hardcoded settings in
/// the PS components. Note that, these values don't supersede
/// a value explicitly set in the corresponding conf file
/// referred by the component(s).
StringMapPtr Tunables;
boost::shared_ptr<PSSettingsHostwisePtrsMap> HostwiseSettings;
boost::shared_ptr<PSProtMacTelSettingsPtrsMap> TelemetrySettings;
void serialize(JSON::Adapter& adapter)
{
throw std::logic_error("Not implemented");
}
void serialize(ptree& node)
{
std::string timeUtcStr;
JSON_P(node, CumulativeThrottleLimit);
JSON_P(node, IsSourceFolderSupported);
JSON_P(node, ApplicationInsightsInstrumentationKey);
JSON_P(node, CriticalChannelUri);
CriticalChannelUriRenewalTimeUtc = ConvertDotNetDateTime(node, "CriticalChannelUriRenewalTimeUtc");
JSON_P(node, InformationalChannelUri);
InformationalChannelUriRenewalTimeUtc = ConvertDotNetDateTime(node, "InformationalChannelUriRenewalTimeUtc");
JSON_P(node, TelemetryChannelUri);
TelemetryChannelUriRenewalTimeUtc = ConvertDotNetDateTime(node, "TelemetryChannelUriRenewalTimeUtc");
JSON_P(node, MessageContext);
JSON_P(node, IsPrivateEndpointEnabled);
JSON_P(node, IsAccessControlEnabled);
StringMap tunables;
JSON_KV_P_KEYNAME(node, "Tunables", tunables);
Tunables = tunables.empty()? StringMapPtr() : boost::make_shared<StringMap>(tunables); // std::move() candidate
std::vector<PSSettingsHostwise> hostSettings;
JSON_VCL_KEYNAME(node, "Hosts", hostSettings);
HostwiseSettings = boost::make_shared<PSSettingsHostwisePtrsMap>();
for (std::vector<PSSettingsHostwise>::iterator itr = hostSettings.begin(); itr != hostSettings.end(); itr++)
{
(*HostwiseSettings)[itr->HostId] = boost::make_shared<PSSettingsHostwise>(*itr); // std::move() candidate
if (!(itr->TargetHostId.empty()))
{
(*HostwiseSettings)[itr->TargetHostId] = boost::make_shared<PSSettingsHostwise>(*itr);
}
}
std::vector<PSProtectedMachineTelemetrySettings> telSettings;
JSON_VCL_KEYNAME(node, "ProtectedMachineTelemetrySettings", telSettings);
TelemetrySettings = boost::make_shared<PSProtMacTelSettingsPtrsMap>();
for (std::vector<PSProtectedMachineTelemetrySettings>::iterator itr = telSettings.begin(); itr != telSettings.end(); itr++)
(*TelemetrySettings)[itr->HostId] = boost::make_shared<PSProtectedMachineTelemetrySettings>(*itr); // std::move() candidate
}
bool leanEquals(const PSSettings& rhs)
{
const PSSettings& lhs = *this;
return
std::abs(lhs.CumulativeThrottleLimit - rhs.CumulativeThrottleLimit) < std::numeric_limits<double>::epsilon() &&
lhs.IsSourceFolderSupported &&
lhs.ApplicationInsightsInstrumentationKey == rhs.ApplicationInsightsInstrumentationKey &&
lhs.CriticalChannelUri == rhs.CriticalChannelUri &&
lhs.CriticalChannelUriRenewalTimeUtc == rhs.CriticalChannelUriRenewalTimeUtc &&
lhs.InformationalChannelUri == rhs.InformationalChannelUri &&
lhs.InformationalChannelUriRenewalTimeUtc == rhs.InformationalChannelUriRenewalTimeUtc &&
lhs.TelemetryChannelUri == rhs.TelemetryChannelUri &&
lhs.TelemetryChannelUriRenewalTimeUtc == rhs.TelemetryChannelUriRenewalTimeUtc &&
lhs.MessageContext == rhs.MessageContext &&
lhs.IsPrivateEndpointEnabled == rhs.IsPrivateEndpointEnabled &&
lhs.IsAccessControlEnabled == rhs.IsAccessControlEnabled;
}
bool leanNotEquals(const PSSettings& rhs)
{
return !leanEquals(rhs);
}
};
typedef boost::shared_ptr<PSSettings> PSSettingsPtr;
struct CacheDataHeader
{
std::string Version;
std::string Checksum;
std::string ChecksumType;
static const char* CHECKSUM_TYPE_MD5;
static const char* CURRENT_CACHED_DATA_VERSION;
static const int CURRENT_CACHED_DATA_MAJOR_VERSION;
static const int CURRENT_CACHED_DATA_MINOR_VERSION;
void serialize(JSON::Adapter& adapter)
{
throw std::logic_error("Not implemented");
}
void serialize(ptree& node)
{
JSON_P(node, Version);
JSON_P(node, Checksum);
JSON_P(node, ChecksumType);
}
static CacheDataHeader BuildCacheDataHeader(const std::string& content)
{
CacheDataHeader toRet;
toRet.Version = CURRENT_CACHED_DATA_VERSION;
toRet.ChecksumType = CHECKSUM_TYPE_MD5;
const int MD5_HASH_LENGTH = 16;
std::vector<unsigned char> currhash(MD5_HASH_LENGTH, 0);
INM_MD5_CTX ctx;
INM_MD5Init(&ctx);
INM_MD5Update(&ctx, (unsigned char*)content.c_str(), content.size());
INM_MD5Final(&currhash[0], &ctx);
toRet.Checksum = securitylib::base64Encode((const char*)&currhash[0], currhash.size());
return toRet;
}
bool operator==(const CacheDataHeader& rhs)
{
CacheDataHeader lhs = *this;
return
lhs.Version == rhs.Version &&
lhs.Checksum == rhs.Checksum &&
lhs.ChecksumType == rhs.ChecksumType;
}
bool operator!=(const CacheDataHeader& rhs)
{
return !operator==(rhs);
}
bool IsMatchingContent(const std::string& content)
{
CacheDataHeader comp = BuildCacheDataHeader(content);
// TODO-SanKumar-2002: Implement the minor and major version
// restrictions/relaxations as per settings here too.
return
//this->Version == comp.Version &&
this->Checksum == comp.Checksum &&
this->ChecksumType == comp.ChecksumType;
}
};
class PSSettingsConfigurator
{
public:
void Initialize(
const boost::filesystem::path& settingsFilePath,
const boost::filesystem::path& settingsLckFilePath,
serverOptionsPtr serverOptions);
void Start();
void Stop();
PSSettingsPtr GetPSSettings() { return m_psSettings; }
// Singleton pattern
static PSSettingsConfigurator& GetInstance()
{
return s_instance;
}
private:
// Singleton instance
static PSSettingsConfigurator s_instance;
PSSettingsPtr m_psSettings;
std::string m_settingsFileContent;
time_t m_MTime;
boost::atomic<bool> m_isInitialized;
boost::unique_ptr<boost::thread> m_thread;
boost::filesystem::path m_settingsFilePath;
boost::filesystem::path m_settingsLckFilePath;
serverOptionsPtr m_serverOptions;
void Run();
PSSettingsPtr ReadSettingsFromFile(
bool& fileUnavailable,
std::string& knownToFailCacheDataHeader,
std::string& knownToFailContent);
void ProcessNewSettings(PSSettingsPtr newSettings, bool fileUnavailable);
public:
typedef boost::function1<void, StringMapPtr> TunablesChangeNotificationCallback;
typedef long long SubscriptionNum_t;
SubscriptionNum_t SubscribeForTunablesChange(TunablesChangeNotificationCallback callback);
void UnsubscribeForTunablesChange(SubscriptionNum_t subscriptionNumber);
private:
typedef std::map<SubscriptionNum_t, TunablesChangeNotificationCallback> TunablesCallbackMap;
TunablesCallbackMap m_tunablesChangeCallbackMap;
boost::shared_mutex m_tunablesCallbackMapMutex;
void NotifyTunablesChange(StringMapPtr tunables);
};
//enum PSSettingsChangeType
//{
// PSSettingsChangeType_Added,
// PSSettingsChangeType_Modified,
// PSSettingsChangeType_Removed
//};
}
#endif // !PS_SETTINGS_CONFIGURATOR_H