in libminifi/src/sitetosite/SiteToSiteClient.cpp [101:190]
bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
auto flow = session.get();
std::shared_ptr<Transaction> transaction = nullptr;
if (!flow) {
return false;
}
if (peer_state_ != READY) {
if (!bootstrap())
return false;
}
if (peer_state_ != READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
}
// Create the transaction
transaction = createTransaction(SEND);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
utils::Identifier transactionID = transaction->getUUID();
bool continueTransaction = true;
std::chrono::high_resolution_clock::time_point transaction_started_at = std::chrono::high_resolution_clock::now();
try {
while (continueTransaction) {
auto start_time = std::chrono::steady_clock::now();
std::string payload;
DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);
int16_t resp = send(transactionID, &packet, flow, &session);
if (resp == -1) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
}
logger_->log_debug("Site2Site transaction {} send flow record {}", transactionID.to_string(), flow->getUUIDStr());
if (resp == 0) {
auto end_time = std::chrono::steady_clock::now();
std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
session.getProvenanceReporter()->send(*flow, transitUri, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), false);
}
session.remove(flow);
std::chrono::nanoseconds transfer_duration = std::chrono::high_resolution_clock::now() - transaction_started_at;
if (transfer_duration > _batchSendNanos)
break;
flow = session.get();
if (!flow) {
continueTransaction = false;
}
} // while true
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string());
}
if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string());
}
logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
} catch (std::exception &exception) {
if (transaction)
deleteTransaction(transactionID);
context.yield();
tearDown();
logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
if (transaction)
deleteTransaction(transactionID);
context.yield();
tearDown();
logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles, type: {}", getCurrentExceptionTypeName());
throw;
}
deleteTransaction(transactionID);
return true;
}