in extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp [677:791]
void SourceInitiatedSubscriptionListener::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string ssl_certificate_file;
std::string ssl_ca_file;
state_manager_ = context->getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
std::string value;
context->getProperty(ListenHostname, listen_hostname_);
if (!context->getProperty(ListenPort, value)) {
throw Exception(PROCESSOR_EXCEPTION, "Listen Port attribute is missing or invalid");
} else {
core::Property::StringToInt(value, listen_port_);
}
context->getProperty(SubscriptionManagerPath, subscription_manager_path_);
context->getProperty(SubscriptionsBasePath, subscriptions_base_path_);
if (!context->getProperty(SSLCertificate, ssl_certificate_file)) {
throw Exception(PROCESSOR_EXCEPTION, "SSL Certificate attribute is missing");
}
if (!context->getProperty(SSLCertificateAuthority, ssl_ca_file)) {
throw Exception(PROCESSOR_EXCEPTION, "SSL Certificate Authority attribute is missing");
}
if (!context->getProperty(SSLVerifyPeer, value)) {
throw Exception(PROCESSOR_EXCEPTION, "SSL Verify Peer attribute is missing");
}
bool verify_peer = utils::StringUtils::toBool(value).value_or(true);
context->getProperty(XPathXmlQuery, xpath_xml_query_);
if (!context->getProperty(InitialExistingEventsStrategy, initial_existing_events_strategy_)) {
throw Exception(PROCESSOR_EXCEPTION, "Initial Existing Events Strategy attribute is missing or invalid");
}
if (auto subscription_expiration_interval = context->getProperty<core::TimePeriodValue>(SubscriptionExpirationInterval)) {
subscription_expiration_interval_ = subscription_expiration_interval->getMilliseconds();
} else {
throw Exception(PROCESSOR_EXCEPTION, "Subscription Expiration Interval attribute is missing or invalid");
}
if (auto heartbeat_interval = context->getProperty<core::TimePeriodValue>(HeartbeatInterval)) {
heartbeat_interval_ = heartbeat_interval->getMilliseconds();
} else {
throw Exception(PROCESSOR_EXCEPTION, "Heartbeat Interval attribute is missing or invalid");
}
if (!context->getProperty(MaxElements, value)) {
throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is missing or invalid");
} else if (!core::Property::StringToInt(value, max_elements_)) {
throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is invalid");
}
if (auto max_latency = context->getProperty<core::TimePeriodValue>(MaxLatency)) {
max_latency_ = max_latency->getMilliseconds();
} else {
throw Exception(PROCESSOR_EXCEPTION, "Max Latency attribute is missing or invalid");
}
if (auto connection_retry_interval = context->getProperty<core::TimePeriodValue>(ConnectionRetryInterval)) {
connection_retry_interval_ = connection_retry_interval->getMilliseconds();
} else {
throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Interval attribute is missing or invalid");
}
if (!context->getProperty(ConnectionRetryCount, value)) {
throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Count attribute is missing or invalid");
} else if (!core::Property::StringToInt(value, connection_retry_count_)) {
throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Count attribute is invalid");
}
FILE* fp = fopen(ssl_ca_file.c_str(), "rb");
if (fp == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to open file specified by SSL Certificate Authority attribute");
}
X509* ca = nullptr;
PEM_read_X509(fp, &ca, nullptr, nullptr);
fclose(fp);
if (ca == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to parse file specified by SSL Certificate Authority attribute");
}
utils::tls::X509_unique_ptr ca_ptr{ca};
std::array<std::byte, 20U> hash_buf{};
int ret = X509_digest(ca, EVP_sha1(), gsl::make_span(hash_buf).as_span<unsigned char>().data(), nullptr);
if (ret != 1) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get fingerprint for CA specified by SSL Certificate Authority attribute");
}
ssl_ca_cert_thumbprint_ = utils::StringUtils::to_hex(hash_buf, true /*uppercase*/);
logger_->log_debug("%s SHA-1 thumbprint is %s", ssl_ca_file.c_str(), ssl_ca_cert_thumbprint_.c_str());
session_factory_ = sessionFactory;
// Load state
loadState();
// Start server
std::vector<std::string> options;
options.emplace_back("enable_keep_alive");
options.emplace_back("yes");
options.emplace_back("keep_alive_timeout_ms");
options.emplace_back("15000");
options.emplace_back("num_threads");
options.emplace_back("1");
options.emplace_back("listening_ports");
options.emplace_back(std::to_string(listen_port_) + "s");
options.emplace_back("ssl_certificate");
options.emplace_back(ssl_certificate_file);
options.emplace_back("ssl_ca_file");
options.emplace_back(ssl_ca_file);
options.emplace_back("ssl_verify_peer");
options.emplace_back(verify_peer ? "yes" : "no");
try {
server_ = std::make_unique<CivetServer>(options);
} catch (const std::exception& e) {
throw Exception(PROCESSOR_EXCEPTION, std::string("Failed to initialize server, error: ") + e.what());
} catch (...) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to initialize server");
}
handler_ = std::make_unique<Handler>(*this);
server_->addHandler("**", *handler_);
}