void RemoteProcessorGroupPort::onSchedule()

in libminifi/src/RemoteProcessorGroupPort.cpp [117:197]


void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
  std::string value;
  if (context->getProperty(portUUID, value) && !value.empty()) {
    protocol_uuid_ = value;
  }

  std::string context_name;
  if (!context->getProperty(SSLContext, context_name) || IsNullOrEmpty(context_name)) {
    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
  }
  std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
  if (nullptr != service) {
    ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
  } else {
    std::string secureStr;
    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && utils::StringUtils::toBool(secureStr).value_or(false)) {
      ssl_service = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_);
      ssl_service->onEnable();
    }
  }
  {
    if (auto idle_timeout = context->getProperty<core::TimePeriodValue>(idleTimeout)) {
      idle_timeout_ = idle_timeout->getMilliseconds();
    } else {
      static_assert(idleTimeout.default_value);
      logger_->log_debug("%s attribute is invalid, so default value of %s will be used", std::string(idleTimeout.name), std::string(*idleTimeout.default_value));
      idle_timeout_ = core::TimePeriodValue(std::string(*idleTimeout.default_value)).getMilliseconds();
    }
  }

  std::lock_guard<std::mutex> lock(peer_mutex_);
  if (!nifi_instances_.empty()) {
    refreshPeerList();
    if (!peers_.empty())
      peer_index_ = 0;
  }
  /**
   * If at this point we have no peers and HTTP support is disabled this means
   * we must rely on the configured host/port
   */
  if (peers_.empty() && is_http_disabled()) {
    std::string host;
    std::string portStr;
    int configured_port = -1;
    // place hostname/port into the log message if we have it
    context->getProperty(hostName, host);
    context->getProperty(port, portStr);
    if (!host.empty() && !portStr.empty() && !portStr.empty() && core::Property::StringToInt(portStr, configured_port)) {
      nifi_instances_.push_back({ host, configured_port, "" });
      bypass_rest_api_ = true;
    } else {
      // we cannot proceed, so log error and throw an exception
      logger_->log_error("%s/%s/%d -- configuration values after eval of configuration options", host, portStr, configured_port);
      throw(Exception(SITE2SITE_EXCEPTION, "HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule"));
    }
  }
  // populate the site2site protocol for load balancing between them
  if (!peers_.empty()) {
    auto count = peers_.size();
    if (max_concurrent_tasks_ > count)
      count = max_concurrent_tasks_;
    for (uint32_t i = 0; i < count; i++) {
      std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr;
      sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), this->getInterface(), client_type_);
      config.setSecurityContext(ssl_service);
      peer_index_++;
      if (peer_index_ >= static_cast<int>(peers_.size())) {
        peer_index_ = 0;
      }
      logger_->log_trace("Creating client");
      config.setHTTPProxy(this->proxy_);
      config.setIdleTimeout(idle_timeout_);
      nextProtocol = sitetosite::createClient(config);
      logger_->log_trace("Created client, moving into available protocols");
      returnProtocol(std::move(nextProtocol));
    }
  } else {
    // we don't have any peers
    logger_->log_error("No peers selected during scheduling");
  }
}