in libminifi/src/RemoteProcessorGroupPort.cpp [117:197]
void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
std::string value;
if (context->getProperty(portUUID, value) && !value.empty()) {
protocol_uuid_ = value;
}
std::string context_name;
if (!context->getProperty(SSLContext, context_name) || IsNullOrEmpty(context_name)) {
context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
}
std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
if (nullptr != service) {
ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
} else {
std::string secureStr;
if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && utils::StringUtils::toBool(secureStr).value_or(false)) {
ssl_service = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_);
ssl_service->onEnable();
}
}
{
if (auto idle_timeout = context->getProperty<core::TimePeriodValue>(idleTimeout)) {
idle_timeout_ = idle_timeout->getMilliseconds();
} else {
static_assert(idleTimeout.default_value);
logger_->log_debug("%s attribute is invalid, so default value of %s will be used", std::string(idleTimeout.name), std::string(*idleTimeout.default_value));
idle_timeout_ = core::TimePeriodValue(std::string(*idleTimeout.default_value)).getMilliseconds();
}
}
std::lock_guard<std::mutex> lock(peer_mutex_);
if (!nifi_instances_.empty()) {
refreshPeerList();
if (!peers_.empty())
peer_index_ = 0;
}
/**
* If at this point we have no peers and HTTP support is disabled this means
* we must rely on the configured host/port
*/
if (peers_.empty() && is_http_disabled()) {
std::string host;
std::string portStr;
int configured_port = -1;
// place hostname/port into the log message if we have it
context->getProperty(hostName, host);
context->getProperty(port, portStr);
if (!host.empty() && !portStr.empty() && !portStr.empty() && core::Property::StringToInt(portStr, configured_port)) {
nifi_instances_.push_back({ host, configured_port, "" });
bypass_rest_api_ = true;
} else {
// we cannot proceed, so log error and throw an exception
logger_->log_error("%s/%s/%d -- configuration values after eval of configuration options", host, portStr, configured_port);
throw(Exception(SITE2SITE_EXCEPTION, "HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule"));
}
}
// populate the site2site protocol for load balancing between them
if (!peers_.empty()) {
auto count = peers_.size();
if (max_concurrent_tasks_ > count)
count = max_concurrent_tasks_;
for (uint32_t i = 0; i < count; i++) {
std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr;
sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), this->getInterface(), client_type_);
config.setSecurityContext(ssl_service);
peer_index_++;
if (peer_index_ >= static_cast<int>(peers_.size())) {
peer_index_ = 0;
}
logger_->log_trace("Creating client");
config.setHTTPProxy(this->proxy_);
config.setIdleTimeout(idle_timeout_);
nextProtocol = sitetosite::createClient(config);
logger_->log_trace("Created client, moving into available protocols");
returnProtocol(std::move(nextProtocol));
}
} else {
// we don't have any peers
logger_->log_error("No peers selected during scheduling");
}
}