extensions/coap/protocols/CoapC2Protocol.cpp (248 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "CoapC2Protocol.h" #include "c2/PayloadSerializer.h" #include "c2/PayloadParser.h" #include "coap_functions.h" #include "coap_message.h" #include "io/OutputStream.h" #include "core/Resource.h" #include "utils/gsl.h" #include "utils/span.h" namespace org::apache::nifi::minifi::coap::c2 { uint8_t CoapProtocol::REGISTRATION_MSG[8] = { 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72 }; CoapProtocol::CoapProtocol(std::string name, const utils::Identifier &uuid) : RESTSender(std::move(name), uuid), require_registration_(false) { } CoapProtocol::~CoapProtocol() = default; void CoapProtocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) { RESTSender::initialize(controller, configure); if (configure->get(minifi::Configuration::nifi_c2_coap_connector_service, controller_service_name_)) { auto service = controller->getControllerService(controller_service_name_); coap_service_ = std::static_pointer_cast<coap::controllers::CoapConnectorService>(service); } else { logger_->log_info("No CoAP connector configured, so using default service"); coap_service_ = std::make_shared<coap::controllers::CoapConnectorService>("cs", configure); coap_service_->onEnable(); } } minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool /*async*/) { return RESTSender::consumePayload(url, payload, direction, false); } int CoapProtocol::writeAcknowledgement(io::OutputStream *stream, const minifi::c2::C2Payload &payload) { auto ident = payload.getIdentifier(); auto state = payload.getStatus().getState(); stream->write(ident); uint8_t payloadState = 0; switch (state) { case state::UpdateState::NESTED: case state::UpdateState::INITIATE: case state::UpdateState::FULLY_APPLIED: case state::UpdateState::READ_COMPLETE: case state::UpdateState::NO_OPERATION: payloadState = 0; break; case state::UpdateState::NOT_APPLIED: case state::UpdateState::PARTIALLY_APPLIED: payloadState = 1; break; case state::UpdateState::READ_ERROR: payloadState = 2; break; case state::UpdateState::SET_ERROR: payloadState = 3; break; } stream->write(&payloadState, 1); return 0; } int CoapProtocol::writeHeartbeat(io::OutputStream *stream, const minifi::c2::C2Payload &payload) { bool byte; uint16_t size = 0; logger_->log_trace("Writing heartbeat"); try { const auto deviceIdent = minifi::c2::PayloadParser::getInstance(payload).in("deviceInfo").getAs<std::string>("identifier"); const auto agentIdent = minifi::c2::PayloadParser::getInstance(payload).in("agentInfo").getAs<std::string>("identifier"); stream->write(deviceIdent, false); logger_->log_trace("Writing heartbeat with device Ident %s and agent Ident %s", deviceIdent, agentIdent); if (agentIdent.empty()) { return -1; } stream->write(agentIdent, false); try { auto flowInfoParser = minifi::c2::PayloadParser::getInstance(payload).in("flowInfo"); auto componentParser = flowInfoParser.in("components"); auto queueParser = flowInfoParser.in("queues"); auto vfsParser = flowInfoParser.in("versionedFlowSnapshotURI"); byte = true; stream->write(byte); size = componentParser.getSize(); stream->write(size); componentParser.foreach([this, stream](const minifi::c2::C2Payload &component) { auto myParser = minifi::c2::PayloadParser::getInstance(component); bool running = false; stream->write(component.getLabel()); try { running = myParser.getAs<bool>("running"); } catch(const minifi::c2::PayloadParseException &e) { logger_->log_error("Could not find running in components"); } stream->write(running); }); size = queueParser.getSize(); stream->write(size); queueParser.foreach([this, stream](const minifi::c2::C2Payload &component) { auto myParser = minifi::c2::PayloadParser::getInstance(component); stream->write(component.getLabel()); uint64_t datasize = 0; uint64_t datasizemax = 0; uint64_t qsize = 0; uint64_t sizemax = 0; try { datasize = myParser.getAs<uint64_t>("dataSize"); datasizemax = myParser.getAs<uint64_t>("dataSizeMax"); qsize = myParser.getAs<uint64_t>("size"); sizemax = myParser.getAs<uint64_t>("sizeMax"); } catch(const minifi::c2::PayloadParseException &e) { logger_->log_error("Could not find queue sizes"); } stream->write(datasize); stream->write(datasizemax); stream->write(qsize); stream->write(sizemax); }); auto bucketId = vfsParser.getAs<std::string>("bucketId"); auto flowId = vfsParser.getAs<std::string>("flowId"); stream->write(bucketId); stream->write(flowId); } catch (const minifi::c2::PayloadParseException &pe) { logger_->log_error("Parser exception occurred, but is ignorable, reason %s", pe.what()); // okay to ignore byte = false; stream->write(byte); } } catch (const minifi::c2::PayloadParseException &e) { logger_->log_error("Parser exception occurred, reason %s", e.what()); return -1; } return 0; } minifi::c2::Operation CoapProtocol::getOperation(int type) { switch (type) { case 0: return minifi::c2::Operation::acknowledge; case 1: return minifi::c2::Operation::heartbeat; case 2: return minifi::c2::Operation::clear; case 3: return minifi::c2::Operation::describe; case 4: return minifi::c2::Operation::restart; case 5: return minifi::c2::Operation::start; case 6: return minifi::c2::Operation::update; case 7: return minifi::c2::Operation::stop; case 8: return minifi::c2::Operation::pause; case 9: return minifi::c2::Operation::resume; default: gsl_FailFast(); } } minifi::c2::C2Payload CoapProtocol::serialize(const minifi::c2::C2Payload &payload) { if (nullptr == coap_service_) { // return an error if we don't have a coap service logger_->log_error("CoAP service requested without any configured hostname or port"); return {payload.getOperation(), state::UpdateState::READ_ERROR}; } if (require_registration_) { logger_->log_debug("Server requested agent registration, so attempting"); auto response = minifi::c2::RESTSender::consumePayload(rest_uri_, payload, minifi::c2::TRANSMIT, false); if (response.getStatus().getState() == state::UpdateState::READ_ERROR) { logger_->log_trace("Could not register"); return {payload.getOperation(), state::UpdateState::READ_COMPLETE}; } else { logger_->log_trace("Registered agent."); } require_registration_ = false; return {payload.getOperation(), state::UpdateState::READ_COMPLETE}; } uint16_t version = 0; uint8_t payload_type = 0; uint16_t size = 0; io::BufferStream stream; stream.write(version); std::string endpoint = "heartbeat"; switch (payload.getOperation()) { case minifi::c2::Operation::acknowledge: endpoint = "acknowledge"; payload_type = 0; stream.write(&payload_type, 1); if (writeAcknowledgement(&stream, payload) != 0) { return {payload.getOperation(), state::UpdateState::READ_ERROR}; } break; case minifi::c2::Operation::heartbeat: payload_type = 1; stream.write(&payload_type, 1); if (writeHeartbeat(&stream, payload) != 0) { logger_->log_error("Could not write heartbeat"); return {payload.getOperation(), state::UpdateState::READ_ERROR}; } break; default: logger_->log_error("Could not identify operation"); return {payload.getOperation(), state::UpdateState::READ_ERROR}; } size_t bsize = stream.size(); CoapMessage msg; msg.data_ = const_cast<uint8_t *>(utils::as_span<const uint8_t>(stream.getBuffer()).data()); msg.size_ = bsize; coap::controllers::CoapResponse message = coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, &msg); const auto message_data = message.getData(); if (isRegistrationMessage(message)) { require_registration_ = true; } else if (!message_data.empty()) { io::BufferStream responseStream(message_data); responseStream.read(version); responseStream.read(size); logger_->log_trace("Received ack. version %d. number of operations %d", version, size); minifi::c2::C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED); for (int i = 0; i < size; i++) { uint8_t operationType; uint16_t argsize = 0; std::string operand; std::string id; REQUIRE_SIZE_IF(1, responseStream.read(operationType)) REQUIRE_VALID(responseStream.read(id, false)) REQUIRE_VALID(responseStream.read(operand, false)) logger_->log_trace("Received op %d, with id %s and operand %s", operationType, id, operand); auto newOp = getOperation(operationType); minifi::c2::C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE); nested_payload.setIdentifier(id); minifi::c2::C2ContentResponse new_command(newOp); new_command.delay = 0; new_command.required = true; new_command.ttl = -1; new_command.name = operand; new_command.ident = id; responseStream.read(argsize); for (int j = 0; j < argsize; j++) { std::string key; std::string value; REQUIRE_VALID(responseStream.read(key)) REQUIRE_VALID(responseStream.read(value)) new_command.operation_arguments[key] = value; } nested_payload.addContent(std::move(new_command)); new_payload.addPayload(std::move(nested_payload)); } return new_payload; } return {payload.getOperation(), state::UpdateState::READ_ERROR}; } REGISTER_RESOURCE(CoapProtocol, InternalResource); } // namespace org::apache::nifi::minifi::coap::c2