in libminifi/src/sitetosite/SiteToSiteClient.cpp [644:753]
bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
uint64_t bytes = 0;
int transfers = 0;
std::shared_ptr<Transaction> transaction = nullptr;
if (peer_state_ != READY) {
if (!bootstrap()) {
return false;
}
}
if (peer_state_ != READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
}
// Create the transaction
transaction = createTransaction(RECEIVE);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
utils::Identifier transactionID = transaction->getUUID();
try {
while (true) {
std::map<std::string, std::string> empty;
auto start_time = std::chrono::steady_clock::now();
std::string payload;
DataPacket packet(getLogger(), transaction, empty, payload);
bool eof = false;
if (!receive(transactionID, &packet, eof)) {
throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transactionID.to_string());
}
if (eof) {
// transaction done
break;
}
auto flowFile = session.create();
if (!flowFile) {
throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
}
std::map<std::string, std::string>::iterator it;
std::string sourceIdentifier;
for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) {
if (it->first == core::SpecialFlowAttribute::UUID)
sourceIdentifier = it->second;
flowFile->addAttribute(it->first, it->second);
}
if (packet._size > 0) {
session.write(flowFile, [&packet](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
return internal::pipe(packet.transaction_->getStream(), *output_stream);
});
if (flowFile->getSize() != packet._size) {
std::stringstream message;
message << "Receive size not correct, expected to send " << flowFile->getSize() << " bytes, but actually sent " << packet._size;
throw Exception(SITE2SITE_EXCEPTION, message.str());
} else {
logger_->log_debug("received {} with expected {}", flowFile->getSize(), packet._size);
}
}
core::Relationship relation; // undefined relationship
auto end_time = std::chrono::steady_clock::now();
std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
session.getProvenanceReporter()->receive(*flowFile, transitUri, sourceIdentifier, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time));
session.transfer(flowFile, relation);
// receive the transfer for the flow record
bytes += packet._size;
transfers++;
} // while true
if (transfers > 0 && !confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
}
if (!complete(context, transactionID)) {
std::stringstream transaction_str;
transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
}
logger_->log_info("Site to Site transaction {} received flow record {}, with content size {} bytes", transactionID.to_string(), transfers, bytes);
// we yield the receive if we did not get anything
if (transfers == 0)
context.yield();
} catch (std::exception &exception) {
if (transaction)
deleteTransaction(transactionID);
context.yield();
tearDown();
logger_->log_warn("Caught Exception during RawSiteToSiteClient::receiveFlowFiles, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
if (transaction)
deleteTransaction(transactionID);
context.yield();
tearDown();
logger_->log_warn("Caught Exception during RawSiteToSiteClient::receiveFlowFiles, type: {}", getCurrentExceptionTypeName());
throw;
}
deleteTransaction(transactionID);
return true;
}