void SourceInitiatedSubscriptionListener::onSchedule()

in extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp [677:791]


void SourceInitiatedSubscriptionListener::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  std::string ssl_certificate_file;
  std::string ssl_ca_file;

  state_manager_ = context->getStateManager();
  if (state_manager_ == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
  }

  std::string value;
  context->getProperty(ListenHostname, listen_hostname_);
  if (!context->getProperty(ListenPort, value)) {
    throw Exception(PROCESSOR_EXCEPTION, "Listen Port attribute is missing or invalid");
  } else {
    core::Property::StringToInt(value, listen_port_);
  }
  context->getProperty(SubscriptionManagerPath, subscription_manager_path_);
  context->getProperty(SubscriptionsBasePath, subscriptions_base_path_);
  if (!context->getProperty(SSLCertificate, ssl_certificate_file)) {
    throw Exception(PROCESSOR_EXCEPTION, "SSL Certificate attribute is missing");
  }
  if (!context->getProperty(SSLCertificateAuthority, ssl_ca_file)) {
    throw Exception(PROCESSOR_EXCEPTION, "SSL Certificate Authority attribute is missing");
  }
  if (!context->getProperty(SSLVerifyPeer, value)) {
    throw Exception(PROCESSOR_EXCEPTION, "SSL Verify Peer attribute is missing");
  }
  bool verify_peer = utils::StringUtils::toBool(value).value_or(true);
  context->getProperty(XPathXmlQuery, xpath_xml_query_);
  if (!context->getProperty(InitialExistingEventsStrategy, initial_existing_events_strategy_)) {
    throw Exception(PROCESSOR_EXCEPTION, "Initial Existing Events Strategy attribute is missing or invalid");
  }
  if (auto subscription_expiration_interval = context->getProperty<core::TimePeriodValue>(SubscriptionExpirationInterval)) {
    subscription_expiration_interval_ = subscription_expiration_interval->getMilliseconds();
  } else {
    throw Exception(PROCESSOR_EXCEPTION, "Subscription Expiration Interval attribute is missing or invalid");
  }
  if (auto heartbeat_interval = context->getProperty<core::TimePeriodValue>(HeartbeatInterval)) {
    heartbeat_interval_ = heartbeat_interval->getMilliseconds();
  } else {
    throw Exception(PROCESSOR_EXCEPTION, "Heartbeat Interval attribute is missing or invalid");
  }
  if (!context->getProperty(MaxElements, value)) {
    throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is missing or invalid");
  } else if (!core::Property::StringToInt(value, max_elements_)) {
    throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is invalid");
  }
  if (auto max_latency = context->getProperty<core::TimePeriodValue>(MaxLatency)) {
    max_latency_ = max_latency->getMilliseconds();
  } else {
    throw Exception(PROCESSOR_EXCEPTION, "Max Latency attribute is missing or invalid");
  }
  if (auto connection_retry_interval = context->getProperty<core::TimePeriodValue>(ConnectionRetryInterval)) {
    connection_retry_interval_ = connection_retry_interval->getMilliseconds();
  } else {
    throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Interval attribute is missing or invalid");
  }
  if (!context->getProperty(ConnectionRetryCount, value)) {
    throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Count attribute is missing or invalid");
  } else if (!core::Property::StringToInt(value, connection_retry_count_)) {
    throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Count attribute is invalid");
  }

  FILE* fp = fopen(ssl_ca_file.c_str(), "rb");
  if (fp == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to open file specified by SSL Certificate Authority attribute");
  }
  X509* ca = nullptr;
  PEM_read_X509(fp, &ca, nullptr, nullptr);
  fclose(fp);
  if (ca == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to parse file specified by SSL Certificate Authority attribute");
  }
  utils::tls::X509_unique_ptr ca_ptr{ca};

  std::array<std::byte, 20U> hash_buf{};
  int ret = X509_digest(ca, EVP_sha1(), gsl::make_span(hash_buf).as_span<unsigned char>().data(), nullptr);
  if (ret != 1) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to get fingerprint for CA specified by SSL Certificate Authority attribute");
  }
  ssl_ca_cert_thumbprint_ = utils::StringUtils::to_hex(hash_buf, true /*uppercase*/);
  logger_->log_debug("%s SHA-1 thumbprint is %s", ssl_ca_file.c_str(), ssl_ca_cert_thumbprint_.c_str());

  session_factory_ = sessionFactory;

  // Load state
  loadState();

  // Start server
  std::vector<std::string> options;
  options.emplace_back("enable_keep_alive");
  options.emplace_back("yes");
  options.emplace_back("keep_alive_timeout_ms");
  options.emplace_back("15000");
  options.emplace_back("num_threads");
  options.emplace_back("1");
  options.emplace_back("listening_ports");
  options.emplace_back(std::to_string(listen_port_) + "s");
  options.emplace_back("ssl_certificate");
  options.emplace_back(ssl_certificate_file);
  options.emplace_back("ssl_ca_file");
  options.emplace_back(ssl_ca_file);
  options.emplace_back("ssl_verify_peer");
  options.emplace_back(verify_peer ? "yes" : "no");

  try {
    server_ = std::make_unique<CivetServer>(options);
  } catch (const std::exception& e) {
    throw Exception(PROCESSOR_EXCEPTION, std::string("Failed to initialize server, error: ") + e.what());
  } catch (...) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to initialize server");
  }
  handler_ = std::make_unique<Handler>(*this);
  server_->addHandler("**", *handler_);
}