bool SiteToSiteClient::receiveFlowFiles()

in libminifi/src/sitetosite/SiteToSiteClient.cpp [644:753]


bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
  uint64_t bytes = 0;
  int transfers = 0;
  std::shared_ptr<Transaction> transaction = nullptr;

  if (peer_state_ != READY) {
    if (!bootstrap()) {
      return false;
    }
  }

  if (peer_state_ != READY) {
    context.yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
  }

  // Create the transaction
  transaction = createTransaction(RECEIVE);

  if (transaction == nullptr) {
    context.yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
  }

  utils::Identifier transactionID = transaction->getUUID();

  try {
    while (true) {
      std::map<std::string, std::string> empty;
      auto start_time = std::chrono::steady_clock::now();
      std::string payload;
      DataPacket packet(getLogger(), transaction, empty, payload);
      bool eof = false;

      if (!receive(transactionID, &packet, eof)) {
        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transactionID.to_string());
      }
      if (eof) {
        // transaction done
        break;
      }
      auto flowFile = session.create();

      if (!flowFile) {
        throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
      }
      std::map<std::string, std::string>::iterator it;
      std::string sourceIdentifier;
      for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) {
        if (it->first == core::SpecialFlowAttribute::UUID)
          sourceIdentifier = it->second;
        flowFile->addAttribute(it->first, it->second);
      }

      if (packet._size > 0) {
        session.write(flowFile, [&packet](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
          return internal::pipe(packet.transaction_->getStream(), *output_stream);
        });
        if (flowFile->getSize() != packet._size) {
          std::stringstream message;
          message << "Receive size not correct, expected to send " << flowFile->getSize() << " bytes, but actually sent " << packet._size;
          throw Exception(SITE2SITE_EXCEPTION, message.str());
        } else {
          logger_->log_debug("received {} with expected {}", flowFile->getSize(), packet._size);
        }
      }
      core::Relationship relation;  // undefined relationship
      auto end_time = std::chrono::steady_clock::now();
      std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
      session.getProvenanceReporter()->receive(*flowFile, transitUri, sourceIdentifier, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time));
      session.transfer(flowFile, relation);
      // receive the transfer for the flow record
      bytes += packet._size;
      transfers++;
    }  // while true

    if (transfers > 0 && !confirm(transactionID)) {
      throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
    }
    if (!complete(context, transactionID)) {
      std::stringstream transaction_str;
      transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed";
      throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
    }
    logger_->log_info("Site to Site transaction {} received flow record {}, with content size {} bytes", transactionID.to_string(), transfers, bytes);
    // we yield the receive if we did not get anything
    if (transfers == 0)
      context.yield();
  } catch (std::exception &exception) {
    if (transaction)
      deleteTransaction(transactionID);
    context.yield();
    tearDown();
    logger_->log_warn("Caught Exception during RawSiteToSiteClient::receiveFlowFiles, type: {}, what: {}", typeid(exception).name(), exception.what());
    throw;
  } catch (...) {
    if (transaction)
      deleteTransaction(transactionID);
    context.yield();
    tearDown();
    logger_->log_warn("Caught Exception during RawSiteToSiteClient::receiveFlowFiles, type: {}", getCurrentExceptionTypeName());
    throw;
  }

  deleteTransaction(transactionID);
  return true;
}