in libminifi/src/core/ProcessSession.cpp [838:965]
void ProcessSessionImpl::commit() {
const auto commit_start_time = std::chrono::steady_clock::now();
try {
std::unordered_map<std::string, TransferMetrics> transfers;
auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) {
++transfers[relationship.getName()].transfer_count;
transfers[relationship.getName()].transfer_size += record.getSize();
};
// First we clone the flow record based on the transferred relationship for updated flow record
for (auto && it : updated_flowfiles_) {
auto record = it.second.modified;
if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the updated flow " + record->getUUIDStr());
}
}
// Do the same thing for added flow file
for (const auto& it : added_flowfiles_) {
auto record = it.second.flow_file;
if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the added flow " + record->getUUIDStr());
}
}
std::map<Connectable*, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
Connectable* connection = nullptr;
// Complete process the added and update flow files for the session, send the flow file to its queue
for (const auto &it : updated_flowfiles_) {
auto record = it.second.modified;
logger_->log_trace("See {} in {}", record->getUUIDStr(), "updated_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
for (const auto &it : added_flowfiles_) {
auto record = it.second.flow_file;
logger_->log_trace("See {} in {}", record->getUUIDStr(), "added_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
// Process the clone flow files
for (const auto &record : cloned_flowfiles_) {
logger_->log_trace("See {} in {}", record->getUUIDStr(), "cloned_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
for (const auto& record : deleted_flowfiles_) {
if (!record->isDeleted()) {
continue;
}
if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
// mark for deletion in the flowFileRepository
record->setStoredToRepository(false);
}
}
ensureNonNullResourceClaim(connectionQueues);
content_session_->commit();
if (stateManager_ && !stateManager_->commit()) {
throw Exception(PROCESS_SESSION_EXCEPTION, "State manager commit failed.");
}
persistFlowFilesBeforeTransfer(connectionQueues, updated_flowfiles_);
for (auto& cq : connectionQueues) {
auto connection_from_queue = dynamic_cast<Connection*>(cq.first);
if (connection_from_queue) {
connection_from_queue->multiPut(cq.second);
} else {
for (auto& file : cq.second) {
cq.first->put(file);
}
}
}
if (metrics_) {
for (const auto& [relationship_name, transfer_metrics] : transfers) {
metrics_->transferredBytes() += transfer_metrics.transfer_size;
metrics_->transferredFlowFiles() += transfer_metrics.transfer_count;
metrics_->increaseRelationshipTransferCount(relationship_name, transfer_metrics.transfer_count);
}
}
// All done
updated_flowfiles_.clear();
added_flowfiles_.clear();
cloned_flowfiles_.clear();
deleted_flowfiles_.clear();
updated_relationships_.clear();
relationships_.clear();
// persistent the provenance report
this->provenance_report_->commit();
logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessor().getName());
if (metrics_) {
auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
metrics_->processingNanos() += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process session commit, type: {}", getCurrentExceptionTypeName());
throw;
}
}