Diagnostic/mdsd/mdsd/MdsdConfig.cc (822 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include "MdsdConfig.hh"
#include "CfgCtxRoot.hh"
#include "ConfigParser.hh"
#include "TableSchema.hh"
#include "Subscription.hh"
#include "Batch.hh"
#include "Credentials.hh"
#include "OmiTask.hh"
#include "MdsdExtension.hh"
#include "ITask.hh"
#include "Crypto.hh"
#include "Logger.hh"
#include "Utility.hh"
#include "Trace.hh"
#include "EventHubCmd.hh"
#include "ConfigUpdateCmd.hh"
#include "CmdXmlCommon.hh"
#include "EventHubUploaderId.hh"
#include "EventHubUploaderMgr.hh"
#include "EventHubType.hh"
#include "EventPubCfg.hh"
#include "MdsdEventCfg.hh"
#include "LocalSink.hh"
#include "EventType.hh"
#include <fstream>
#include <sstream>
#include <iomanip>
#include <algorithm>
#include <iterator>
#include <vector>
#include <utility>
#include <ctime>
#include <cpprest/pplx/threadpool.h>
extern "C" {
#include <unistd.h>
}
using std::string;
using std::vector;
using std::pair;
using std::make_pair;
// The set of batches that aren't associated with any particular config instance. (Thus the
// nullptr initializer.)
//
// This global static could be associated with the BatchSet class just as easily as the
// MdsdConfig class.
BatchSet MdsdConfig::_localBatches { nullptr };
MdsdConfig::MdsdConfig(string path, string autokeyConfigPath) :
configFilePath(path),
_autokeyConfigFilePath(autokeyConfigPath),
eventVersion(1), _isUseful(false),
_defaultCreds(nullptr), _batchSet(this), _batchFlushTimer(crossplat::threadpool::shared_instance().service()),
_agentIdentity(MdsdUtil::GetHostname()),
_autoKeyReloadTimer(crossplat::threadpool::shared_instance().service()), _monitoringManagementSeen(false),
_hasAutoKey(false),
_mdsdEventCfg(std::make_shared<mdsd::MdsdEventCfg>()),
_eventPubCfg(std::make_shared<mdsd::EventPubCfg>(_mdsdEventCfg))
{
LoadFromConfigFile(path);
}
void
MdsdConfig::Initialize()
{
Trace trace(Trace::ConfigLoad, "MdsdConfig Initialize");
InitEventHubPub();
FlushBatches(boost::system::error_code()); // Also schedules the next flush
}
// No autokey support.
bool
MdsdConfig::LoadAutokey(const boost::system::error_code &e)
{
Trace trace(Trace::Credentials, "LoadAutoKey");
return false;
}
// there could be multiple monikers pointing to different storage accounts
// pair: first=moniker, second=container SAS
std::vector<std::pair<std::string, std::string>>
MdsdConfig::ExtractCmdContainerAutoKeys()
{
Trace trace(Trace::Credentials, "GetContainerCred");
auto rootContainer = mdsd::CmdXmlCommon::GetRootContainerName();
std::vector<std::pair<std::string, std::string>> keylist;
std::unique_lock<std::mutex> lock(_ehMapMutex);
for (const auto & iter : _autoKeyMap) {
if (rootContainer == iter.first.second) {
keylist.push_back(std::make_pair(iter.first.first, iter.second));
}
}
lock.unlock();
// Get default account to use: either the default credential or the first credential
Credentials* cred = _defaultCreds;
if (!cred) {
cred = credentials.begin()->second;
}
if (!cred) {
TRACEWARN(trace, "No default account is found. No way to do config auto update.");
}
else {
for (const auto & iter : keylist) {
auto moniker = iter.first;
if (moniker == cred->Moniker()) {
cmdContainerSas = iter.second;
break;
}
}
if (!cmdContainerSas.empty()) {
TRACEINFO(trace, "Found container SAS to download config command blob: " << cmdContainerSas);
}
}
return keylist;
}
void
MdsdConfig::SetMappedMoniker(
const EventHubSasInfo_t & ehmap
)
{
Trace trace(Trace::Credentials, "SetMappedMoniker");
for (const auto & ehEntry : ehmap) {
auto & origMoniker = ehEntry.first;
auto & itemsMap = ehEntry.second;
for (const auto & item : (*itemsMap)) {
auto & eventName = item.first;
auto & newMoniker = item.second.moniker;
_mdsdEventCfg->UpdateMoniker(eventName, origMoniker, newMoniker);
}
}
}
void
MdsdConfig::LoadEventHubKeys(
const std::vector<std::pair<std::string, std::string>>& keylist
)
{
Trace trace(Trace::Credentials, "LoadEventHubKeys");
for (const auto & iter : keylist) {
auto & moniker = iter.first; // this is what's in mdsd.xml
auto & containerSas = iter.second;
trace.NOTE("Get EventHub cmd XML for moniker " + moniker + ", containerSas " + containerSas);
if(!_mdsdEventCfg->IsEventHubEnabled(moniker)) {
trace.NOTE("Moniker " + moniker + " does not have EventHub");
continue;
}
mdsd::EventHubCmd ehCmd(Namespace(), EventVersion(), containerSas);
ehCmd.ProcessCmdXml();
_ehNoticeItemsMap[moniker] = ehCmd.GetNoticeXmlItemsTable();
_ehPubItemsMap[moniker] = ehCmd.GetPublisherXmlItemsTable();
trace.NOTE("Successfully get EventHub cmd XML items (that include SAS keys) for moniker " + moniker);
DumpEventPublisherInfo();
}
SetMappedMoniker(_ehNoticeItemsMap);
SetMappedMoniker(_ehPubItemsMap);
}
mdsd::EhCmdXmlItems
MdsdConfig::GetEventNoticeCmdXmlItems(
const std::string & moniker,
const std::string & eventName
)
{
Trace trace(Trace::Credentials, "MdsdConfig::GetEventNoticeCmdXmlItems");
return GetEventHubCmdXmlItems(_ehNoticeItemsMap, moniker, eventName, "EventNotice");
}
mdsd::EhCmdXmlItems
MdsdConfig::GetEventPublishCmdXmlItems(
const std::string & moniker,
const std::string & eventName
)
{
Trace trace(Trace::Credentials, "MdsdConfig::GetEventPublishCmdXmlItems");
return GetEventHubCmdXmlItems(_ehPubItemsMap, moniker, eventName, "EventPublish");
}
mdsd::EhCmdXmlItems
MdsdConfig::GetEventHubCmdXmlItems(
EventHubItemsMap_t& ehmap,
const std::string & moniker,
const std::string & eventName,
const std::string & eventType
)
{
Trace trace(Trace::Credentials, "MdsdConfig::GetEventHubCmdXmlItems");
std::lock_guard<std::mutex> lock(_ehMapMutex);
auto iter = ehmap.find(moniker);
if (iter == ehmap.end()) {
std::ostringstream strm;
strm << "Failed to find " << eventType << " SAS & endpoint for moniker=" << moniker;
Logger::LogError(strm.str());
return mdsd::EhCmdXmlItems();
}
auto xmlItemsMap = iter->second;
auto xmlItemsIter = xmlItemsMap->find(eventName);
if (xmlItemsIter == xmlItemsMap->end()) {
std::ostringstream strm;
strm << "Failed to find " << eventType << " SAS & endpoint for event=" << eventName << " (moniker=" << moniker << ").";
Logger::LogError(strm.str());
return mdsd::EhCmdXmlItems();
}
TRACEINFO(trace, "Found " << eventType << " (SAS & endpoint) for moniker=" << moniker <<
", event=" << eventName << ": " << xmlItemsIter->second);
return xmlItemsIter->second;
}
// Flush the batch set and schedule the next flush. This should be explicitly called
// only once; the method is also the timer-pop handler and thus arranges for itself
// to be called again. The "cancel()" call is a safety measure in case the method is
// called explicitly after loading.
void
MdsdConfig::FlushBatches(const boost::system::error_code &e)
{
Trace trace(Trace::Scheduler, "MdsdConfig::FlushBatches");
if (e == boost::asio::error::operation_aborted) {
trace.NOTE("Timer cancelled");
} else {
_batchSet.FlushIfStale();
_batchFlushTimer.expires_from_now(boost::posix_time::minutes(1));
_batchFlushTimer.async_wait(boost::bind(&MdsdConfig::FlushBatches, this, boost::asio::placeholders::error));
}
}
// Stop timers that are not related to scheduled tasks:
// _batchFlushTimer, _autoKeyReloadTimer
void
MdsdConfig::StopAllTimers()
{
Trace trace(Trace::Scheduler, "MdsdConfig::StopAllTimers");
_batchFlushTimer.cancel();
_autoKeyReloadTimer.cancel();
}
MdsdConfig::~MdsdConfig()
{
Trace trace(Trace::ConfigLoad, "MdsdConfig Destructor");
StopAllTimers();
// Configuration load/parse messages
size_t count = 0;
for (Message* msgptr : messages) {
delete msgptr;
count++;
}
trace.NOTE("Removed " + std::to_string(count) + " messages");
messages.clear();
// Configured table schemas (distinct from cached MDS-ready forms of those schemas)
count = 0;
for (auto iter : schemas) {
count++;
std::ostringstream msg;
msg << "Deleting TableSchema \"" << iter.first << "\" at address " << iter.second;
trace.NOTE(msg.str());
delete iter.second;
}
trace.NOTE("Removed " + std::to_string(count) + " TableSchemas");
schemas.clear();
// Credentials
count = 0;
for (auto iter : credentials) {
count++;
std::ostringstream msg;
msg << "Deleting Credentials \"" << iter.first << "\" at address " << iter.second;
trace.NOTE(msg.str());
delete iter.second;
}
trace.NOTE("Removed " + std::to_string(count) + " Credentials");
credentials.clear();
// Event sources
// Just map source names to TableSchema*, and I've already deleted all the TableSchema objects.
trace.NOTE("Clearing all source entries");
sources.clear();
// OmiTask
count = 0;
for (OmiTask* taskptr : _omiTasks) {
count++;
std::ostringstream msg;
msg << "Deleting OmiTask at address " << taskptr;
trace.NOTE(msg.str());
taskptr->Cancel();
delete taskptr;
}
trace.NOTE("Removed " + std::to_string(count) + " OmiTask object(s)");
_omiTasks.clear();
// ITask
count = 0;
for (ITask* taskptr : _tasks) {
count++;
std::ostringstream msg;
msg << "Deleting ITask at address " << taskptr;
trace.NOTE(msg.str());
taskptr->cancel();
delete taskptr;
}
trace.NOTE("Removed " + std::to_string(count) + " ITask object(s)");
_tasks.clear();
// Mdsd Extensions
count = 0;
for (auto & iter : extensions) {
count++;
std::ostringstream msg;
msg << "Deleting MdsdExtension \"" << iter.first << "\" at address " << iter.second;
trace.NOTE(msg.str());
delete iter.second;
}
trace.NOTE("Removed " + std::to_string(count) + " MdsdExtension");
extensions.clear();
// BatchSet() - gets destroyed when this destructor completes
// No need to flush; the BatchSet destructor will do that
// Autokey map contains no pointers so it gets cleaned up correctly when this destructor completes
trace.NOTE("Clearing autokey map");
_autoKeyMap.clear();
_defaultCreds = 0; // Already deleted it while clearing the credentials vector
}
void
MdsdConfig::LoadFromConfigFile(string path)
{
// Create an appropriate root document context
CfgCtxRoot root(this);
// Instantiate a new parser with the context
ConfigParser parser(&root, this);
// Open the path
std::ifstream infile(path);
if (!infile) {
AddMessage(error, "Failed to open config file " + path + " for reading");
return;
}
// Remember where we were when we were asked to load this file
string previousPath(currentPath);
long previousLine(currentLine);
currentPath = path;
currentLine = 0;
// Read one line at a time, hand it to the parser's parse_chunk() method
string line;
while (std::getline(infile, line)) {
NextLine();
parser.ParseChunk(line);
}
if (!infile.eof()) {
if (infile.bad()) {
AddMessage(error, "Corrupted stream");
}
else if (infile.fail()) {
AddMessage(error, "IO operation failed");
}
else {
AddMessage(error, "std::getline returned 0 for unknown reason");
}
}
currentPath = previousPath;
currentLine = previousLine;
}
void
MdsdConfig::AddMessage(severity_t s, const std::string& msg)
{
Message* newmsg = new MdsdConfig::Message(currentPath, currentLine, s, msg);
messages.push_back(newmsg);
}
bool
MdsdConfig::GotMessages(int mask) const
{
for (const auto& msg : messages) {
if (msg->severity & mask) {
return true;
}
}
return false;
}
void
MdsdConfig::MessagesToStream(std::ostream& output, int mask) const {
for (const auto& msg : messages) {
if (msg->severity & mask) {
output << msg->filename << "(" << msg->line << ") " << SeverityToString(msg->severity)
<< ": " << msg->msg << "\n";
}
}
output << std::flush;
}
// File scope constants
static const std::string
_str_fatal = "Fatal",
_str_error = "Error",
_str_warning = "Warning",
_str_info = "Info",
_str_unknown = "?"
;
const std::string&
MdsdConfig::SeverityToString(MdsdConfig::severity_t severity) const
{
switch (severity) {
case MdsdConfig::info: return _str_info;
case MdsdConfig::warning: return _str_warning;
case MdsdConfig::error: return _str_error;
case MdsdConfig::fatal: return _str_fatal;
default: return _str_unknown; // Should never happen
}
}
void
MdsdConfig::AddSchema(TableSchema* schema)
{
if (schemas.count(schema->Name())) {
AddMessage(error, "Duplicate schema " + schema->Name() + " ignored");
delete schema;
}
else {
schemas[schema->Name()] = schema;
}
}
void
MdsdConfig::AddCredentials(Credentials* creds, bool makeDefault)
{
if (credentials.count(creds->Moniker())) {
AddMessage(error, "Duplicate creds " + creds->Moniker() + " ignored");
delete creds;
return;
}
credentials[creds->Moniker()] = creds;
if (makeDefault) {
if (_defaultCreds) {
AddMessage(error, "Cannot make " + creds->Moniker() + " default; another is already set");
} else {
_defaultCreds = creds;
}
}
}
void
MdsdConfig::AddSource(const string& source, const string& schema)
{
if (schema.length() > 0 && schemas.count(schema) == 0) {
AddMessage(error, "Undefined schema " + schema + " referenced");
}
else if (sources.count(source)) {
AddMessage(error, "Source " + source + " already mapped to a schema; ignored");
}
else {
sources[source] = schemas[schema];
}
}
void
MdsdConfig::AddDynamicSchemaSource(const string& source)
{
if (_dynamic_sources.count(source)) {
AddMessage(error, "Dynamic Schema Source " + source + " has already been configured; ignored");
}
else
{
_dynamic_sources.insert(source);
}
}
bool
MdsdConfig::AddIdentityColumn(const string& colname, const string& colval)
{
for (auto iter = identityColumns.begin(); iter != identityColumns.end(); ++iter) {
if (iter->first == colname) {
AddMessage(error, "Ignoring duplicate identity column " + colname);
return false;
}
}
identityColumns.push_back(make_pair(colname, colval));
return true;
}
void
MdsdConfig::GetIdentityColumnValues(std::back_insert_iterator<vector<pair<string, string> > > destination)
{
std::copy(identityColumns.begin(), identityColumns.end(), destination);
}
void
MdsdConfig::GetIdentityColumnTypes(std::back_insert_iterator<vector<pair<string, string> > > destination)
{
for (auto iter = identityColumns.begin(); iter != identityColumns.end(); ++iter) {
destination = make_pair(iter->first, "mt:wstr");
}
}
void
MdsdConfig::GetIdentityValues(std::string & tenant, std::string& role, std::string& roleInstance)
{
ident_vect_t identityColumns;
GetIdentityColumnValues(std::back_inserter(identityColumns));
for (const auto & col : identityColumns) {
if (col.first.compare(TenantAlias()) == 0) {
tenant = col.second;
}
else if (col.first.compare(RoleAlias()) == 0) {
role = col.second;
}
else if (col.first.compare(RoleInstanceAlias()) == 0) {
roleInstance = col.second;
}
}
}
void
MdsdConfig::AddEnvelopeColumn(std::string && name, std::string && value)
{
for (const EnvelopeColumn & column : _envelopeColumns) {
if (column.first == name) {
throw std::runtime_error("Column already in envelope");
}
}
_envelopeColumns.emplace_back(name, value);
}
void
MdsdConfig::ForeachEnvelopeColumn(const std::function<void(const EnvelopeColumn&)>& process)
{
for (const EnvelopeColumn & column : _envelopeColumns) {
process(column);
}
}
TableSchema*
MdsdConfig::GetSchema(const string& source) const
{
const auto &iter = sources.find(source);
if (iter == sources.end()) {
return 0;
}
return iter->second;
}
Credentials*
MdsdConfig::GetCredentials(const string& moniker) const
{
const auto &iter = credentials.find(moniker);
if (iter == credentials.end()) {
return 0;
}
return iter->second;
}
std::string
MdsdConfig::GetAutokey(const std::string& moniker, const std::string& fullTableName)
{
std::lock_guard<std::mutex> lock(_aKMmutex);
auto iter = _autoKeyMap.find(std::make_pair(moniker, fullTableName));
if (iter == _autoKeyMap.end()) {
return std::string();
}
return iter->second;
}
void
MdsdConfig::DumpAutokeyTable(std::ostream &os)
{
os << "Dump format: <MonikerName, ItemName>" << std::endl;
for (const auto & iter : _autoKeyMap) {
os << "<" << iter.first.first << "," << iter.first.second << ">" << std::endl;
}
}
bool
MdsdConfig::IsQuotaExceeded(const std::string &name, unsigned long current) const
{
Trace trace(Trace::ConfigUse, "MdsdConfig:IsQuotaExceeded");
auto iter = _quotas.find(name);
if (iter == _quotas.end()) {
trace.NOTE("Check against unset quota " + name);
return false;
}
return (current > iter->second);
}
void
MdsdConfig::AddOmiTask(OmiTask *task)
{
// Defer the creation of the batch; autokey data might not yet be loaded.
// The task will create the batch when an attempt is made to start it
_omiTasks.push_back(task);
_isUseful = true;
}
void
MdsdConfig::ForeachOmiTask(const std::function<void(OmiTask*)>& fn)
{
std::for_each(_omiTasks.begin(), _omiTasks.end(), fn);
}
void
MdsdConfig::AddTask(ITask *task)
{
Trace trace(Trace::Scheduler, "MdsdConfig::AddTask");
if (trace.IsActive()) {
std::ostringstream msg;
msg << "Adding task " << task;
trace.NOTE(msg.str());
}
_tasks.push_back(task);
_isUseful = true;
}
void
MdsdConfig::ForeachTask(const std::function<void(ITask*)>& fn)
{
Trace trace(Trace::Scheduler, "MdsdConfig::ForeachTask");
trace.NOTE("Invoking function on " + std::to_string(_tasks.size()) + " task(s)");
std::for_each(_tasks.begin(), _tasks.end(), fn);
}
void
MdsdConfig::AddExtension(MdsdExtension * extension)
{
Trace trace (Trace::ConfigUse, "MdsdConfig::AddExtension");
if (!extension) {
return;
}
const std::string& extname = extension->Name();
if (extensions.count(extname)) {
AddMessage(error, "Duplicate Extension " + extname + " ignored.");
delete extension;
extension = nullptr;
}
else {
extensions[extname] = extension;
_isUseful = true;
}
}
void
MdsdConfig::ForeachExtension(const std::function<void(MdsdExtension*)>& fn)
{
Trace trace (Trace::ConfigUse, "MdsdConfig::ForeachExtension");
for (const auto & kv : extensions) {
trace.NOTE("Walking MdsdExtension with name='" + kv.first + "'");
fn(kv.second);
}
}
void
MdsdConfig::StartScheduledTasks()
{
Trace trace(Trace::Scheduler, "MdsdConfig::StartScheduledTasks");
ForeachOmiTask([](OmiTask *job) { job->Start(); });
ForeachTask([](ITask *task) { task->start(); });
}
void
MdsdConfig::StopScheduledTasks()
{
Trace trace(Trace::Scheduler, "MdsdConfig::StopScheduledTasks");
ForeachOmiTask([](OmiTask *job) { job->Cancel(); });
ForeachTask([](ITask *task) { task->cancel(); });
}
// Tells this configuration to remove itself in the future. The config takes
// steps immediately to stop generating work for itself, then schedules the
// final cleanup action to take place after the requested delay.
void
MdsdConfig::SelfDestruct(int seconds)
{
Trace trace(Trace::ConfigUse, "MdsdConfig::SelfDestruct");
StopScheduledTasks();
StopAllTimers();
// Flush any data we're still holding on to. Don't use FlushBatches; that
// will restart the autoflush timer, and we just stopped that. One last
// flush will happen when the Destroyer calls delete.
_batchSet.Flush();
// Create a deadline_timer on the heap; when it expires, call our Destroyer helper
auto timer = new boost::asio::deadline_timer(crossplat::threadpool::shared_instance().service());
timer->expires_from_now(boost::posix_time::seconds(seconds));
timer->async_wait(boost::bind(MdsdConfig::Destroyer, this, timer));
}
// This static private method does the final delete. Also deletes the heap timer.
void
MdsdConfig::Destroyer(MdsdConfig *config, boost::asio::deadline_timer *timer)
{
Trace trace(Trace::ConfigUse, "MdsdConfig:Destroyer");
std::ostringstream msg;
msg << "Deleting MdsdConfig at " << config;
trace.NOTE(msg.str());
delete config;
delete timer;
}
// Create a batch for a given target. If one has already been created for that target,
// return the one we're already using.
Batch*
MdsdConfig::GetBatch(const MdsEntityName &target, int interval)
{
if (target.GetStoreType() == StoreType::Local) {
return _localBatches.GetBatch(target, interval);
} else {
return _batchSet.GetBatch(target, interval);
}
}
bool
MdsdConfig::ValidateConfig(
bool isStartupConfig
) const
{
Trace trace(Trace::ConfigUse, "MdsdConfig::ValidateConfig");
if (!IsUseful()) {
std::ostringstream msg;
msg << "No productive configuration resulted from loading config file(s): " << configFilePath << ".";
if (!isStartupConfig) {
msg << " New configuration ignored.\n";
}
msg << "Warnings detected:\n";
MessagesToStream(msg, MdsdConfig::warning);
Logger::LogWarn(msg);
}
if (GotMessages(MdsdConfig::fatal)) {
std::ostringstream msg;
msg << "Fatal errors while loading configuration " << configFilePath << ":" << std::endl;
MessagesToStream(msg, MdsdConfig::fatal);
if (!isStartupConfig) {
msg << "\nNew configuration ignored; using previous configuration";
}
Logger::LogError(msg);
return false;
}
if (GotMessages(MdsdConfig::error)) {
std::ostringstream msg;
msg << "Config file " << configFilePath << " parsing errors:\n";
MessagesToStream(msg, MdsdConfig::error);
Logger::LogError(msg);
return false;
}
if (GotMessages(MdsdConfig::warning)) {
std::ostringstream msg;
msg << "Config file " << configFilePath << "parsing warnings:\n";
MessagesToStream(msg, MdsdConfig::warning);
Logger::LogWarn(msg);
}
return true;
}
void
MdsdConfig::DumpEventPublisherInfo()
{
Trace trace(Trace::ConfigLoad, "MdsdConfig::DumpEventPublisherInfo");
if (!trace.IsActive()) {
return;
}
if (_ehPubItemsMap.empty()) {
TRACEINFO(trace, "EventPublisher map is empty");
}
else {
for (const auto & iter : _ehPubItemsMap) {
auto moniker = iter.first;
auto itemsmap = iter.second;
if (itemsmap->empty()) {
TRACEINFO(trace, "Moniker='" << moniker << "'; Event: N/A.");
}
else {
for (const auto& item : (*itemsmap)) {
auto eventname = item.first;
auto ehinfo = item.second;
TRACEINFO(trace, "Moniker='" << moniker << "'; EventName='"
<< eventname << "'; EHInfo: " << ehinfo);
}
}
}
}
}
std::string
MdsdConfig::GetDefaultMoniker() const
{
auto defaultCreds = GetDefaultCredentials();
if (!defaultCreds) {
throw std::runtime_error("No default credential is found.");
}
return defaultCreds->Moniker();
}
void
MdsdConfig::AddMonikerEventInfo(
const std::string & moniker,
const std::string & eventName,
StoreType::Type type,
const std::string & sourceName,
mdsd::EventType eventType
)
{
Trace trace(Trace::ConfigLoad, "AddMonikerEventInfo");
try {
auto monikerToUse = moniker.empty()? GetDefaultMoniker() : moniker;
_mdsdEventCfg->AddEventSinkCfgInfoItem({eventName, monikerToUse, type, sourceName, eventType });
TRACEINFO(trace, "Saved event=" << eventName << " moniker=" << monikerToUse);
}
catch(const std::exception& ex) {
AddMessage(fatal, std::string("AddMonikerEventInfo() failed: ") + ex.what());
}
}
void
MdsdConfig::SetOboDirectPartitionFieldNameValue(std::string&& name, std::string&& value)
{
_oboDirectPartitionFieldsMap.emplace(name, value);
if (name == "resourceId") {
_resourceId = value;
}
}
std::string
MdsdConfig::GetOboDirectPartitionFieldValue(const std::string& name) const
{
if (name.empty()) {
throw std::invalid_argument("MdsdConfig::GetOboDirectPartitionFieldValue(name): name cannot be empty");
}
std::string value;
auto it = _oboDirectPartitionFieldsMap.find(name);
if (it != _oboDirectPartitionFieldsMap.end()) {
value = it->second;
}
else {
Logger::LogWarn("OboDirectPartitionField with name='" + name
+ "' not found. Make sure the mdsd.xml includes the corresponding "
"Management/OboDirectPartitionField element. Returning an empty string "
"as the result value.");
}
return value;
}
void
MdsdConfig::ValidateEvents()
{
Trace trace(Trace::ConfigLoad, "MdsdConfig::ValidateEvents");
try {
ValidateAnnotations();
ValidateEventHubPubKeys();
ValidateEventHubPubSinks();
}
catch(const std::exception & ex) {
AddMessage(error, std::string("MdsdConfig::ValidateEvents() failed: ") + ex.what());
}
}
void
MdsdConfig::ValidateAnnotations()
{
for (const auto & name : _mdsdEventCfg->GetInvalidAnnotations()) {
AddMessage(MdsdConfig::error, "Unknown name '" + name + "' in EventStreamingAnnotation");
}
}
void
MdsdConfig::ValidateEventHubPubKeys()
{
for (const auto & publisherName : _eventPubCfg->CheckForInconsistencies(_hasAutoKey)) {
AddMessage(MdsdConfig::error,
"Failed to find event publisher SAS key for item '" + publisherName + "'");
}
}
void
MdsdConfig::ValidateEventHubPubSinks()
{
for (const auto & publisherName: _mdsdEventCfg->GetEventPublishers())
{
if (!LocalSink::Lookup(publisherName)) {
AddMessage(error, "failed to find LocalSink object for Event Publisher " + publisherName);
} else {
_isUseful = true; // Found a valid event publisher
}
}
}
void
MdsdConfig::InitEventHubPub()
{
Trace trace(Trace::ConfigUse, "MdsdConfig::InitEventHubPub");
SetEventHubPubForLocalSinks();
// create uploaders first before setting SAS key
mdsd::EventHubUploaderMgr::GetInstance().CreateUploaders(mdsd::EventHubType::Publish,
_eventPubCfg->GetNameMonikers());
SetupEventHubPubEmbeddedKeys();
}
void
MdsdConfig::SetupEventHubPubEmbeddedKeys()
{
Trace trace(Trace::ConfigUse, "MdsdConfig::SetupEventHubPubEmbeddedKeys");
auto& ehUploaderMgr = mdsd::EventHubUploaderMgr::GetInstance();
auto ehtype = mdsd::EventHubType::Publish;
for (const auto & item : _eventPubCfg->GetEmbeddedSasData()) {
auto & publisherName = item.first;
auto & monikerSasMap = item.second;
for (const auto & keyItem : monikerSasMap) {
auto & moniker = keyItem.first;
auto & saskey = keyItem.second;
ehUploaderMgr.SetSasAndStart(mdsd::EventHubUploaderId(ehtype, moniker, publisherName), saskey);
}
}
}
void
MdsdConfig::SetEventHubPubForLocalSinks()
{
Trace trace(Trace::ConfigUse, "MdsdConfig::SetEventHubPubForLocalSinks");
std::string tenant, role, roleInstance;
GetIdentityValues(tenant, role, roleInstance);
for (const auto & item : _eventPubCfg->GetNameMonikers()) {
auto & publisherName = item.first;
auto sinkObj = LocalSink::Lookup(publisherName);
if (!sinkObj) {
throw std::runtime_error("SetEventHubPubForLocalSinks(): failed to find LocalSink object for "
+ publisherName);
}
else {
std::string duration = GetDurationForEventName(publisherName);
auto & monikers = item.second;
sinkObj->SetEventPublishInfo(monikers, std::move(duration), tenant, role, roleInstance);
}
}
}
// vim: sw=8