bool SiteToSiteClient::transferFlowFiles()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [101:190]


bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
  auto flow = session.get();

  std::shared_ptr<Transaction> transaction = nullptr;

  if (!flow) {
    return false;
  }

  if (peer_state_ != READY) {
    if (!bootstrap())
      return false;
  }

  if (peer_state_ != READY) {
    context.yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
  }

  // Create the transaction
  transaction = createTransaction(SEND);
  if (transaction == nullptr) {
    context.yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
  }
  utils::Identifier transactionID = transaction->getUUID();

  bool continueTransaction = true;
  std::chrono::high_resolution_clock::time_point transaction_started_at = std::chrono::high_resolution_clock::now();

  try {
    while (continueTransaction) {
      auto start_time = std::chrono::steady_clock::now();
      std::string payload;
      DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);

      int16_t resp = send(transactionID, &packet, flow, &session);
      if (resp == -1) {
        throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
      }

      logger_->log_debug("Site2Site transaction {} send flow record {}", transactionID.to_string(), flow->getUUIDStr());
      if (resp == 0) {
        auto end_time = std::chrono::steady_clock::now();
        std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
        std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
        session.getProvenanceReporter()->send(*flow, transitUri, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), false);
      }
      session.remove(flow);

      std::chrono::nanoseconds transfer_duration = std::chrono::high_resolution_clock::now() - transaction_started_at;
      if (transfer_duration > _batchSendNanos)
        break;

      flow = session.get();

      if (!flow) {
        continueTransaction = false;
      }
    }  // while true

    if (!confirm(transactionID)) {
      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string());
    }
    if (!complete(context, transactionID)) {
      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string());
    }
    logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
  } catch (std::exception &exception) {
    if (transaction)
      deleteTransaction(transactionID);
    context.yield();
    tearDown();
    logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles, type: {}, what: {}", typeid(exception).name(), exception.what());
    throw;
  } catch (...) {
    if (transaction)
      deleteTransaction(transactionID);
    context.yield();
    tearDown();
    logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles, type: {}", getCurrentExceptionTypeName());
    throw;
  }

  deleteTransaction(transactionID);

  return true;
}