extensions/coap/protocols/CoapC2Protocol.cpp (248 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "CoapC2Protocol.h"
#include "c2/PayloadSerializer.h"
#include "c2/PayloadParser.h"
#include "coap_functions.h"
#include "coap_message.h"
#include "io/OutputStream.h"
#include "core/Resource.h"
#include "utils/gsl.h"
#include "utils/span.h"
namespace org::apache::nifi::minifi::coap::c2 {
uint8_t CoapProtocol::REGISTRATION_MSG[8] = { 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72 };
CoapProtocol::CoapProtocol(std::string name, const utils::Identifier &uuid)
    : RESTSender(std::move(name), uuid),
      require_registration_(false) {
}
CoapProtocol::~CoapProtocol() = default;
void CoapProtocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
  RESTSender::initialize(controller, configure);
  if (configure->get(minifi::Configuration::nifi_c2_coap_connector_service, controller_service_name_)) {
    auto service = controller->getControllerService(controller_service_name_);
    coap_service_ = std::static_pointer_cast<coap::controllers::CoapConnectorService>(service);
  } else {
    logger_->log_info("No CoAP connector configured, so using default service");
    coap_service_ = std::make_shared<coap::controllers::CoapConnectorService>("cs", configure);
    coap_service_->onEnable();
  }
}
minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool /*async*/) {
  return RESTSender::consumePayload(url, payload, direction, false);
}
int CoapProtocol::writeAcknowledgement(io::OutputStream *stream, const minifi::c2::C2Payload &payload) {
  auto ident = payload.getIdentifier();
  auto state = payload.getStatus().getState();
  stream->write(ident);
  uint8_t payloadState = 0;
  switch (state) {
    case state::UpdateState::NESTED:
    case state::UpdateState::INITIATE:
    case state::UpdateState::FULLY_APPLIED:
    case state::UpdateState::READ_COMPLETE:
    case state::UpdateState::NO_OPERATION:
      payloadState = 0;
      break;
    case state::UpdateState::NOT_APPLIED:
    case state::UpdateState::PARTIALLY_APPLIED:
      payloadState = 1;
      break;
    case state::UpdateState::READ_ERROR:
      payloadState = 2;
      break;
    case state::UpdateState::SET_ERROR:
      payloadState = 3;
      break;
  }
  stream->write(&payloadState, 1);
  return 0;
}
int CoapProtocol::writeHeartbeat(io::OutputStream *stream, const minifi::c2::C2Payload &payload) {
  bool byte;
  uint16_t size = 0;
  logger_->log_trace("Writing heartbeat");
  try {
    const auto deviceIdent = minifi::c2::PayloadParser::getInstance(payload).in("deviceInfo").getAs<std::string>("identifier");
    const auto agentIdent = minifi::c2::PayloadParser::getInstance(payload).in("agentInfo").getAs<std::string>("identifier");
    stream->write(deviceIdent, false);
    logger_->log_trace("Writing heartbeat with device Ident %s and agent Ident %s", deviceIdent, agentIdent);
    if (agentIdent.empty()) {
      return -1;
    }
    stream->write(agentIdent, false);
    try {
      auto flowInfoParser = minifi::c2::PayloadParser::getInstance(payload).in("flowInfo");
      auto componentParser = flowInfoParser.in("components");
      auto queueParser = flowInfoParser.in("queues");
      auto vfsParser = flowInfoParser.in("versionedFlowSnapshotURI");
      byte = true;
      stream->write(byte);
      size = componentParser.getSize();
      stream->write(size);
      componentParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
        auto myParser = minifi::c2::PayloadParser::getInstance(component);
        bool running = false;
        stream->write(component.getLabel());
        try {
          running = myParser.getAs<bool>("running");
        }
        catch(const minifi::c2::PayloadParseException &e) {
          logger_->log_error("Could not find running in components");
        }
        stream->write(running);
      });
      size = queueParser.getSize();
      stream->write(size);
      queueParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
        auto myParser = minifi::c2::PayloadParser::getInstance(component);
        stream->write(component.getLabel());
        uint64_t datasize = 0;
        uint64_t datasizemax = 0;
        uint64_t qsize = 0;
        uint64_t sizemax = 0;
        try {
          datasize = myParser.getAs<uint64_t>("dataSize");
          datasizemax = myParser.getAs<uint64_t>("dataSizeMax");
          qsize = myParser.getAs<uint64_t>("size");
          sizemax = myParser.getAs<uint64_t>("sizeMax");
        }
        catch(const minifi::c2::PayloadParseException &e) {
          logger_->log_error("Could not find queue sizes");
        }
        stream->write(datasize);
        stream->write(datasizemax);
        stream->write(qsize);
        stream->write(sizemax);
      });
      auto bucketId = vfsParser.getAs<std::string>("bucketId");
      auto flowId = vfsParser.getAs<std::string>("flowId");
      stream->write(bucketId);
      stream->write(flowId);
    } catch (const minifi::c2::PayloadParseException &pe) {
      logger_->log_error("Parser exception occurred, but is ignorable, reason %s", pe.what());
      // okay to ignore
      byte = false;
      stream->write(byte);
    }
  } catch (const minifi::c2::PayloadParseException &e) {
    logger_->log_error("Parser exception occurred, reason %s", e.what());
    return -1;
  }
  return 0;
}
minifi::c2::Operation CoapProtocol::getOperation(int type) {
  switch (type) {
    case 0:
      return minifi::c2::Operation::acknowledge;
    case 1:
      return minifi::c2::Operation::heartbeat;
    case 2:
      return minifi::c2::Operation::clear;
    case 3:
      return minifi::c2::Operation::describe;
    case 4:
      return minifi::c2::Operation::restart;
    case 5:
      return minifi::c2::Operation::start;
    case 6:
      return minifi::c2::Operation::update;
    case 7:
      return minifi::c2::Operation::stop;
    case 8:
      return minifi::c2::Operation::pause;
    case 9:
      return minifi::c2::Operation::resume;
    default:
      gsl_FailFast();
  }
}
minifi::c2::C2Payload CoapProtocol::serialize(const minifi::c2::C2Payload &payload) {
  if (nullptr == coap_service_) {
    // return an error if we don't have a coap service
    logger_->log_error("CoAP service requested without any configured hostname or port");
    return {payload.getOperation(), state::UpdateState::READ_ERROR};
  }
  if (require_registration_) {
    logger_->log_debug("Server requested agent registration, so attempting");
    auto response = minifi::c2::RESTSender::consumePayload(rest_uri_, payload, minifi::c2::TRANSMIT, false);
    if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
      logger_->log_trace("Could not register");
      return {payload.getOperation(), state::UpdateState::READ_COMPLETE};
    } else {
      logger_->log_trace("Registered agent.");
    }
    require_registration_ = false;
    return {payload.getOperation(), state::UpdateState::READ_COMPLETE};
  }
  uint16_t version = 0;
  uint8_t payload_type = 0;
  uint16_t size = 0;
  io::BufferStream stream;
  stream.write(version);
  std::string endpoint = "heartbeat";
  switch (payload.getOperation()) {
    case minifi::c2::Operation::acknowledge:
      endpoint = "acknowledge";
      payload_type = 0;
      stream.write(&payload_type, 1);
      if (writeAcknowledgement(&stream, payload) != 0) {
        return {payload.getOperation(), state::UpdateState::READ_ERROR};
      }
      break;
    case minifi::c2::Operation::heartbeat:
      payload_type = 1;
      stream.write(&payload_type, 1);
      if (writeHeartbeat(&stream, payload) != 0) {
        logger_->log_error("Could not write heartbeat");
        return {payload.getOperation(), state::UpdateState::READ_ERROR};
      }
      break;
    default:
      logger_->log_error("Could not identify operation");
      return {payload.getOperation(), state::UpdateState::READ_ERROR};
  }
  size_t bsize = stream.size();
  CoapMessage msg;
  msg.data_ = const_cast<uint8_t *>(utils::as_span<const uint8_t>(stream.getBuffer()).data());
  msg.size_ = bsize;
  coap::controllers::CoapResponse message = coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, &msg);
  const auto message_data = message.getData();
  if (isRegistrationMessage(message)) {
    require_registration_ = true;
  } else if (!message_data.empty()) {
    io::BufferStream responseStream(message_data);
    responseStream.read(version);
    responseStream.read(size);
    logger_->log_trace("Received ack. version %d. number of operations %d", version, size);
    minifi::c2::C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED);
    for (int i = 0; i < size; i++) {
      uint8_t operationType;
      uint16_t argsize = 0;
      std::string operand;
      std::string id;
      REQUIRE_SIZE_IF(1, responseStream.read(operationType))
      REQUIRE_VALID(responseStream.read(id, false))
      REQUIRE_VALID(responseStream.read(operand, false))
      logger_->log_trace("Received op %d, with id %s and operand %s", operationType, id, operand);
      auto newOp = getOperation(operationType);
      minifi::c2::C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE);
      nested_payload.setIdentifier(id);
      minifi::c2::C2ContentResponse new_command(newOp);
      new_command.delay = 0;
      new_command.required = true;
      new_command.ttl = -1;
      new_command.name = operand;
      new_command.ident = id;
      responseStream.read(argsize);
      for (int j = 0; j < argsize; j++) {
        std::string key;
        std::string value;
        REQUIRE_VALID(responseStream.read(key))
        REQUIRE_VALID(responseStream.read(value))
        new_command.operation_arguments[key] = value;
      }
      nested_payload.addContent(std::move(new_command));
      new_payload.addPayload(std::move(nested_payload));
    }
    return new_payload;
  }
  return {payload.getOperation(), state::UpdateState::READ_ERROR};
}
REGISTER_RESOURCE(CoapProtocol, InternalResource);
}  // namespace org::apache::nifi::minifi::coap::c2