in controller/MiNiFiController.cpp [89:261]
int main(int argc, char **argv) {
const auto logger = minifi::core::logging::LoggerConfiguration::getConfiguration().getLogger("controller");
const auto minifi_home = determineMinifiHome(logger);
if (minifi_home.empty()) {
// determineMinifiHome already logged everything we need
return -1;
}
const auto configuration = std::make_shared<minifi::Configure>();
configuration->setHome(minifi_home);
configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
const auto log_properties = std::make_shared<minifi::core::logging::LoggerProperties>();
log_properties->setHome(minifi_home);
log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
std::shared_ptr<minifi::controllers::SSLContextService> secure_context;
try {
secure_context = getSSLContextService(configuration);
} catch(const minifi::Exception& ex) {
logger->log_error(ex.what());
exit(1);
}
auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
std::string host = "localhost";
std::string port_str;
std::string ca_cert;
int port = -1;
cxxopts::Options options("MiNiFiController", "MiNiFi local agent controller");
options.positional_help("[optional args]").show_positional_help();
options.add_options()
("h,help", "Shows Help")
("host", "Specifies connecting host name", cxxopts::value<std::string>())
("port", "Specifies connecting host port", cxxopts::value<int>())
("stop", "Shuts down the provided component", cxxopts::value<std::vector<std::string>>())
("start", "Starts provided component", cxxopts::value<std::vector<std::string>>())
("l,list", "Provides a list of connections or processors", cxxopts::value<std::string>())
("c,clear", "Clears the associated connection queue", cxxopts::value<std::vector<std::string>>())
("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>())
("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>())
("getfull", "Reports a list of full connections")
("jstack", "Returns backtraces from the agent")
("manifest", "Generates a manifest for the current binary")
("noheaders", "Removes headers from output streams");
bool show_headers = true;
try {
auto result = options.parse(argc, argv);
if (result.count("help")) {
std::cout << options.help({ "", "Group" }) << std::endl;
exit(0);
}
if (result.count("host")) {
host = result["host"].as<std::string>();
} else {
configuration->get(minifi::Configure::controller_socket_host, host);
}
if (result.count("port")) {
port = result["port"].as<int>();
} else {
if (port == -1 && configuration->get(minifi::Configure::controller_socket_port, port_str)) {
port = std::stoi(port_str);
}
}
if ((minifi::IsNullOrEmpty(host) && port == -1)) {
std::cout << "MiNiFi Controller is disabled" << std::endl;
exit(0);
}
if (result.count("noheaders")) {
show_headers = false;
}
if (result.count("stop") > 0) {
auto& components = result["stop"].as<std::vector<std::string>>();
for (const auto& component : components) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::stopComponent(std::move(socket), component))
std::cout << component << " requested to stop" << std::endl;
else
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
if (result.count("start") > 0) {
auto& components = result["start"].as<std::vector<std::string>>();
for (const auto& component : components) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::startComponent(std::move(socket), component))
std::cout << component << " requested to start" << std::endl;
else
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
if (result.count("c") > 0) {
auto& components = result["c"].as<std::vector<std::string>>();
for (const auto& connection : components) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
: stream_factory_->createSocket(host, port);
if (minifi::controller::clearConnection(std::move(socket), connection)) {
std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl;
socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
: stream_factory_->createSocket(host, port);
if (minifi::controller::getConnectionSize(std::move(socket), std::cout, connection) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
} else {
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
}
if (result.count("getsize") > 0) {
auto& components = result["getsize"].as<std::vector<std::string>>();
for (const auto& component : components) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::getConnectionSize(std::move(socket), std::cout, component) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
if (result.count("l") > 0) {
auto& option = result["l"].as<std::string>();
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (option == "components") {
if (minifi::controller::listComponents(std::move(socket), std::cout, show_headers) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
} else if (option == "connections") {
if (minifi::controller::listConnections(std::move(socket), std::cout, show_headers) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
if (result.count("getfull") > 0) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::getFullConnections(std::move(socket), std::cout) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
if (result.count("updateflow") > 0) {
auto& flow_file = result["updateflow"].as<std::string>();
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::updateFlow(std::move(socket), std::cout, flow_file) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
if (result.count("manifest") > 0) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::printManifest(std::move(socket), std::cout) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
if (result.count("jstack") > 0) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (minifi::controller::getJstacks(std::move(socket), std::cout) < 0)
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
} catch (const std::exception &exc) {
// catch anything thrown within try block that derives from std::exception
std::cerr << exc.what() << std::endl;
} catch (...) {
std::cout << options.help({ "", "Group" }) << std::endl;
exit(0);
}
return 0;
}