bool RawSiteToSiteClient::handShake()

in libminifi/src/sitetosite/RawSocketProtocol.cpp [243:338]


bool RawSiteToSiteClient::handShake() {
  if (peer_state_ != ESTABLISHED) {
    logger_->log_error("Site2Site peer state is not established while handshake");
    return false;
  }
  logger_->log_debug("Site2Site Protocol Perform hand shake with destination port {}", port_id_.to_string());
  _commsIdentifier = id_generator_->generate();

  {
    const auto ret = peer_->write(_commsIdentifier);
    if (ret == 0 || io::isError(ret)) {
      return false;
    }
  }

  std::map<std::string, std::string> properties;
  properties[HandShakePropertyStr[GZIP]] = "false";
  properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeout.load().count());
  if (_currentVersion >= 5) {
    if (_batchCount > 0)
      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
    if (_batchSize > 0)
      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
    if (_batchDuration.load() > 0ms)
      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration.load().count());
  }

  if (_currentVersion >= 3) {
    const auto ret = peer_->write(peer_->getURL());
    if (ret == 0 || io::isError(ret)) {
      return false;
    }
  }

  {
    const auto size = gsl::narrow<uint32_t>(properties.size());
    const auto ret = peer_->write(size);
    if (ret == 0 || io::isError(ret)) {
      return false;
    }
  }

  std::map<std::string, std::string>::iterator it;
  for (it = properties.begin(); it != properties.end(); it++) {
    {
      const auto ret = peer_->write(it->first);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    {
      const auto ret = peer_->write(it->second);
      if (ret == 0 || io::isError(ret)) {
        return false;
      }
    }
    logger_->log_debug("Site2Site Protocol Send handshake properties {} {}", it->first, it->second);
  }

  RespondCode code = RESERVED;
  std::string message;

  {
    const auto ret = readRespond(nullptr, code, message);
    if (ret <= 0) {
      return false;
    }
  }

  auto logPortStateError = [this](const std::string& error) {
    logger_->log_error("Site2Site HandShake Failed because destination port, {}, is {}", port_id_.to_string(), error);
  };

  switch (code) {
    case PROPERTIES_OK:
      logger_->log_debug("Site2Site HandShake Completed");
      peer_state_ = HANDSHAKED;
      return true;
    case PORT_NOT_IN_VALID_STATE:
      logPortStateError("in invalid state");
      return false;
    case UNKNOWN_PORT:
      logPortStateError("an unknown port");
      return false;
    case PORTS_DESTINATION_FULL:
      logPortStateError("full");
      return false;
    case UNAUTHORIZED:
      logger_->log_error("Site2Site HandShake on port {} failed: UNAUTHORIZED", port_id_.to_string());
      return false;
    default:
      logger_->log_error("Site2Site HandShake on port {} failed: unknown respond code {}", port_id_.to_string(), magic_enum::enum_underlying(code));
      return false;
  }
}