controller/Controller.cpp (178 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "Controller.h" #include <utility> #include "io/BufferStream.h" #include "c2/C2Payload.h" namespace org::apache::nifi::minifi::controller { bool sendSingleCommand(std::unique_ptr<io::Socket> socket, uint8_t op, const std::string& value) { if (socket->initialize() < 0) { return false; } io::BufferStream stream; stream.write(&op, 1); stream.write(value); return socket->write(stream.getBuffer()) == stream.size(); } bool stopComponent(std::unique_ptr<io::Socket> socket, const std::string& component) { return sendSingleCommand(std::move(socket), static_cast<uint8_t>(c2::Operation::stop), component); } bool startComponent(std::unique_ptr<io::Socket> socket, const std::string& component) { return sendSingleCommand(std::move(socket), static_cast<uint8_t>(c2::Operation::start), component); } bool clearConnection(std::unique_ptr<io::Socket> socket, const std::string& connection) { return sendSingleCommand(std::move(socket), static_cast<uint8_t>(c2::Operation::clear), connection); } int updateFlow(std::unique_ptr<io::Socket> socket, std::ostream &out, const std::string& file) { if (socket->initialize() < 0) { return -1; } auto op = static_cast<uint8_t>(c2::Operation::update); io::BufferStream stream; stream.write(&op, 1); stream.write("flow"); stream.write(file); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } // read the response uint8_t resp = 0; socket->read(resp); if (resp == static_cast<uint8_t>(c2::Operation::describe)) { uint16_t connections = 0; socket->read(connections); out << connections << " are full" << std::endl; for (int i = 0; i < connections; i++) { std::string fullcomponent; socket->read(fullcomponent); out << fullcomponent << " is full" << std::endl; } } return 0; } int getFullConnections(std::unique_ptr<io::Socket> socket, std::ostream &out) { if (socket->initialize() < 0) { return -1; } auto op = static_cast<uint8_t>(c2::Operation::describe); io::BufferStream stream; stream.write(&op, 1); stream.write("getfull"); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } // read the response uint8_t resp = 0; socket->read(resp); if (resp == static_cast<uint8_t>(c2::Operation::describe)) { uint16_t connections = 0; socket->read(connections); out << connections << " are full" << std::endl; for (int i = 0; i < connections; i++) { std::string fullcomponent; socket->read(fullcomponent); out << fullcomponent << " is full" << std::endl; } } return 0; } int getConnectionSize(std::unique_ptr<io::Socket> socket, std::ostream &out, const std::string& connection) { if (socket->initialize() < 0) { return -1; } auto op = static_cast<uint8_t>(c2::Operation::describe); io::BufferStream stream; stream.write(&op, 1); stream.write("queue"); stream.write(connection); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } // read the response uint8_t resp = 0; socket->read(resp); if (resp == static_cast<uint8_t>(c2::Operation::describe)) { std::string size; socket->read(size); out << "Size/Max of " << connection << " " << size << std::endl; } return 0; } int listComponents(std::unique_ptr<io::Socket> socket, std::ostream &out, bool show_header) { if (socket->initialize() < 0) { return -1; } io::BufferStream stream; auto op = static_cast<uint8_t>(c2::Operation::describe); stream.write(&op, 1); stream.write("components"); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } uint16_t responses = 0; socket->read(op); socket->read(responses); if (show_header) out << "Components:" << std::endl; for (int i = 0; i < responses; i++) { std::string name; socket->read(name, false); std::string status; socket->read(status, false); out << name << ", running: " << status << std::endl; } return 0; } int listConnections(std::unique_ptr<io::Socket> socket, std::ostream &out, bool show_header) { if (socket->initialize() < 0) { return -1; } io::BufferStream stream; auto op = static_cast<uint8_t>(c2::Operation::describe); stream.write(&op, 1); stream.write("connections"); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } uint16_t responses = 0; socket->read(op); socket->read(responses); if (show_header) out << "Connection Names:" << std::endl; for (int i = 0; i < responses; i++) { std::string name; socket->read(name, false); out << name << std::endl; } return 0; } int printManifest(std::unique_ptr<io::Socket> socket, std::ostream &out) { if (socket->initialize() < 0) { return -1; } io::BufferStream stream; auto op = static_cast<uint8_t>(c2::Operation::describe); stream.write(&op, 1); stream.write("manifest"); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } socket->read(op); std::string manifest; socket->read(manifest, true); out << manifest << std::endl; return 0; } int getJstacks(std::unique_ptr<io::Socket> socket, std::ostream &out) { if (socket->initialize() < 0) { return -1; } io::BufferStream stream; auto op = static_cast<uint8_t>(c2::Operation::describe); stream.write(&op, 1); stream.write("jstack"); if (io::isError(socket->write(stream.getBuffer()))) { return -1; } socket->read(op); std::string manifest; socket->read(manifest, true); out << manifest << std::endl; return 0; } } // namespace org::apache::nifi::minifi::controller