int16_t SiteToSiteClient::send()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [395:518]


int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
  if (peer_state_ != READY) {
    bootstrap();
  }

  if (peer_state_ != READY) {
    return -1;
  }

  auto it = this->known_transactions_.find(transactionID);
  if (it == known_transactions_.end()) {
    return -1;
  }
  std::shared_ptr<Transaction> transaction = it->second;

  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
    logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID.to_string());
    return -1;
  }

  if (transaction->getDirection() != SEND) {
    logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID.to_string());
    return -1;
  }

  if (transaction->current_transfers_ > 0) {
    const auto ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
    if (ret <= 0) {
      return -1;
    }
  }
  // start to read the packet
  {
    const auto numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size());
    const auto ret = transaction->getStream().write(numAttributes);
    if (ret != 4) {
      return -1;
    }
  }

  for (const auto& attribute : packet->_attributes) {
    {
      const auto ret = transaction->getStream().write(attribute.first, true);
      if (ret == 0 || io::isError(ret)) {
        return -1;
      }
    }
    {
      const auto ret = transaction->getStream().write(attribute.second, true);
      if (ret == 0 || io::isError(ret)) {
        return -1;
      }
    }
    logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID.to_string(), attribute.first, attribute.second);
  }

  bool flowfile_has_content = (flowFile != nullptr);

  if (flowFile && (flowFile->getResourceClaim() == nullptr || !flowFile->getResourceClaim()->exists())) {
    auto path = flowFile->getResourceClaim() != nullptr ? flowFile->getResourceClaim()->getContentFullPath() : "nullclaim";
    logger_->log_debug("Claim %s does not exist for FlowFile %s", path, flowFile->getUUIDStr());
    flowfile_has_content = false;
  }

  uint64_t len = 0;
  if (flowFile && flowfile_has_content) {
    len = flowFile->getSize();
    const auto ret = transaction->getStream().write(len);
    if (ret != 8) {
      logger_->log_debug("Failed to write content size!");
      return -1;
    }
    if (flowFile->getSize() > 0) {
      session->read(flowFile, [packet](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
        const auto result = internal::pipe(*input_stream, packet->transaction_->getStream());
        if (result == -1) return -1;
        packet->_size = gsl::narrow<size_t>(result);
        return result;
      });
      if (flowFile->getSize() != packet->_size) {
        logger_->log_debug("Mismatched sizes %llu %llu", flowFile->getSize(), packet->_size);
        return -2;
      }
    }
    if (packet->payload_.length() == 0 && len == 0) {
      if (flowFile->getResourceClaim() == nullptr)
        logger_->log_trace("no claim");
      else
        logger_->log_trace("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
    }
  } else if (packet->payload_.length() > 0) {
    len = packet->payload_.length();
    {
      const auto ret = transaction->getStream().write(len);
      if (ret != 8) {
        return -1;
      }
    }
    {
      const auto ret = transaction->getStream().write(reinterpret_cast<const uint8_t*>(packet->payload_.c_str()), gsl::narrow<size_t>(len));
      if (ret != gsl::narrow<size_t>(len)) {
        logger_->log_debug("Failed to write payload size!");
        return -1;
      }
    }
    packet->_size += len;
  } else if (flowFile && !flowfile_has_content) {
    const auto ret = transaction->getStream().write(len);  // Indicate zero length
    if (ret != 8) {
      logger_->log_debug("Failed to write content size (0)!");
      return -1;
    }
  }

  transaction->current_transfers_++;
  transaction->total_transfers_++;
  transaction->_state = DATA_EXCHANGED;
  transaction->_bytes += len;

  core::logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " sent flow " << transaction->total_transfers_
                             << "flow records, with total size " << transaction->_bytes;

  return 0;
}