void ProcessSessionImpl::commit()

in libminifi/src/core/ProcessSession.cpp [838:965]


void ProcessSessionImpl::commit() {
  const auto commit_start_time = std::chrono::steady_clock::now();
  try {
    std::unordered_map<std::string, TransferMetrics> transfers;
    auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) {
      ++transfers[relationship.getName()].transfer_count;
      transfers[relationship.getName()].transfer_size += record.getSize();
    };
    // First we clone the flow record based on the transferred relationship for updated flow record
    for (auto && it : updated_flowfiles_) {
      auto record = it.second.modified;
      if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
        // Can not find relationship for the flow
        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the updated flow " + record->getUUIDStr());
      }
    }

    // Do the same thing for added flow file
    for (const auto& it : added_flowfiles_) {
      auto record = it.second.flow_file;
      if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
        // Can not find relationship for the flow
        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the added flow " + record->getUUIDStr());
      }
    }

    std::map<Connectable*, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;

    Connectable* connection = nullptr;
    // Complete process the added and update flow files for the session, send the flow file to its queue
    for (const auto &it : updated_flowfiles_) {
      auto record = it.second.modified;
      logger_->log_trace("See {} in {}", record->getUUIDStr(), "updated_flowfiles_");
      if (record->isDeleted()) {
        continue;
      }

      connection = record->getConnection();
      if ((connection) != nullptr) {
        connectionQueues[connection].push_back(record);
      }
    }
    for (const auto &it : added_flowfiles_) {
      auto record = it.second.flow_file;
      logger_->log_trace("See {} in {}", record->getUUIDStr(), "added_flowfiles_");
      if (record->isDeleted()) {
        continue;
      }
      connection = record->getConnection();
      if ((connection) != nullptr) {
        connectionQueues[connection].push_back(record);
      }
    }
    // Process the clone flow files
    for (const auto &record : cloned_flowfiles_) {
      logger_->log_trace("See {} in {}", record->getUUIDStr(), "cloned_flowfiles_");
      if (record->isDeleted()) {
        continue;
      }
      connection = record->getConnection();
      if ((connection) != nullptr) {
        connectionQueues[connection].push_back(record);
      }
    }

    for (const auto& record : deleted_flowfiles_) {
      if (!record->isDeleted()) {
        continue;
      }
      if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
        // mark for deletion in the flowFileRepository
        record->setStoredToRepository(false);
      }
    }

    ensureNonNullResourceClaim(connectionQueues);

    content_session_->commit();

    if (stateManager_ && !stateManager_->commit()) {
      throw Exception(PROCESS_SESSION_EXCEPTION, "State manager commit failed.");
    }

    persistFlowFilesBeforeTransfer(connectionQueues, updated_flowfiles_);

    for (auto& cq : connectionQueues) {
      auto connection_from_queue = dynamic_cast<Connection*>(cq.first);
      if (connection_from_queue) {
        connection_from_queue->multiPut(cq.second);
      } else {
        for (auto& file : cq.second) {
          cq.first->put(file);
        }
      }
    }

    if (metrics_) {
      for (const auto& [relationship_name, transfer_metrics] : transfers) {
        metrics_->transferredBytes() += transfer_metrics.transfer_size;
        metrics_->transferredFlowFiles() += transfer_metrics.transfer_count;
        metrics_->increaseRelationshipTransferCount(relationship_name, transfer_metrics.transfer_count);
      }
    }

    // All done
    updated_flowfiles_.clear();
    added_flowfiles_.clear();
    cloned_flowfiles_.clear();
    deleted_flowfiles_.clear();

    updated_relationships_.clear();
    relationships_.clear();
    // persistent the provenance report
    this->provenance_report_->commit();
    logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessor().getName());
    if (metrics_) {
      auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
      metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
      metrics_->processingNanos() += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
    }
  } catch (const std::exception& exception) {
    logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
    throw;
  } catch (...) {
    logger_->log_debug("Caught Exception during process session commit, type: {}", getCurrentExceptionTypeName());
    throw;
  }
}