public static void merge()

in src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java [437:649]


    public static void merge(final ProcessGroupStatus target, final ProcessGroupStatus toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        target.setInputCount(target.getInputCount() + toMerge.getInputCount());
        target.setInputContentSize(target.getInputContentSize() + toMerge.getInputContentSize());
        target.setOutputCount(target.getOutputCount() + toMerge.getOutputCount());
        target.setOutputContentSize(target.getOutputContentSize() + toMerge.getOutputContentSize());
        target.setQueuedCount(target.getQueuedCount() + toMerge.getQueuedCount());
        target.setQueuedContentSize(target.getQueuedContentSize() + toMerge.getQueuedContentSize());
        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
        target.setStatelessActiveThreadCount(target.getStatelessActiveThreadCount() + toMerge.getStatelessActiveThreadCount());
        target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
        target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
        target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
        target.setProcessingNanos(target.getProcessingNanos() + toMerge.getProcessingNanos());

        // if the versioned flow state to merge is sync failure allow it to take precedence.
        if (VersionedFlowState.SYNC_FAILURE.equals(toMerge.getVersionedFlowState())) {
            target.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
        }

        // connection status
        // sort by id
        final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>();
        for (final ConnectionStatus status : target.getConnectionStatus()) {
            mergedConnectionMap.put(status.getId(), status);
        }

        for (final ConnectionStatus statusToMerge : toMerge.getConnectionStatus()) {
            ConnectionStatus merged = mergedConnectionMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merged.setQueuedCount(merged.getQueuedCount() + statusToMerge.getQueuedCount());
            merged.setQueuedBytes(merged.getQueuedBytes() + statusToMerge.getQueuedBytes());
            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
            merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
            merged.setLoadBalanceStatus(mergeLoadBalanceStatus(merged.getLoadBalanceStatus(), statusToMerge.getLoadBalanceStatus()));
        }
        target.setConnectionStatus(mergedConnectionMap.values());

        // processor status
        final Map<String, ProcessorStatus> mergedProcessorMap = new HashMap<>();
        for (final ProcessorStatus status : target.getProcessorStatus()) {
            mergedProcessorMap.put(status.getId(), status);
        }

        for (final ProcessorStatus statusToMerge : toMerge.getProcessorStatus()) {
            ProcessorStatus merged = mergedProcessorMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
            merged.setTerminatedThreadCount(merged.getTerminatedThreadCount() + statusToMerge.getTerminatedThreadCount());
            merged.setBytesRead(merged.getBytesRead() + statusToMerge.getBytesRead());
            merged.setBytesWritten(merged.getBytesWritten() + statusToMerge.getBytesWritten());
            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
            merged.setInvocations(merged.getInvocations() + statusToMerge.getInvocations());
            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
            merged.setProcessingNanos(merged.getProcessingNanos() + statusToMerge.getProcessingNanos());
            merged.setFlowFilesRemoved(merged.getFlowFilesRemoved() + statusToMerge.getFlowFilesRemoved());

            // if the status to merge is invalid allow it to take precedence. whether the
            // processor run status is disabled/stopped/running is part of the flow configuration
            // and should not differ amongst nodes. however, whether a processor is invalid
            // can be driven by environmental conditions. this check allows any of those to
            // take precedence over the configured run status.
            if (RunStatus.Validating.equals(statusToMerge.getRunStatus())) {
                merged.setRunStatus(RunStatus.Validating);
            } else if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
                merged.setRunStatus(RunStatus.Invalid);
            }
        }
        target.setProcessorStatus(mergedProcessorMap.values());

        // input ports
        final Map<String, PortStatus> mergedInputPortMap = new HashMap<>();
        for (final PortStatus status : target.getInputPortStatus()) {
            mergedInputPortMap.put(status.getId(), status);
        }

        for (final PortStatus statusToMerge : toMerge.getInputPortStatus()) {
            PortStatus merged = mergedInputPortMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
            if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
                merged.setTransmitting(true);
            }

            // should be unnecessary here since ports run status should not be affected by
            // environmental conditions but doing so in case that changes
            if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
                merged.setRunStatus(RunStatus.Invalid);
            }
        }
        target.setInputPortStatus(mergedInputPortMap.values());

        // output ports
        final Map<String, PortStatus> mergedOutputPortMap = new HashMap<>();
        for (final PortStatus status : target.getOutputPortStatus()) {
            mergedOutputPortMap.put(status.getId(), status);
        }

        for (final PortStatus statusToMerge : toMerge.getOutputPortStatus()) {
            PortStatus merged = mergedOutputPortMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
            if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
                merged.setTransmitting(true);
            }

            // should be unnecessary here since ports run status not should be affected by
            // environmental conditions but doing so in case that changes
            if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
                merged.setRunStatus(RunStatus.Invalid);
            }
        }
        target.setOutputPortStatus(mergedOutputPortMap.values());

        // child groups
        final Map<String, ProcessGroupStatus> mergedGroupMap = new HashMap<>();
        for (final ProcessGroupStatus status : target.getProcessGroupStatus()) {
            mergedGroupMap.put(status.getId(), status);
        }

        for (final ProcessGroupStatus statusToMerge : toMerge.getProcessGroupStatus()) {
            ProcessGroupStatus merged = mergedGroupMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setOutputPortStatus(mergedOutputPortMap.values());

        // remote groups
        final Map<String, RemoteProcessGroupStatus> mergedRemoteGroupMap = new HashMap<>();
        for (final RemoteProcessGroupStatus status : target.getRemoteProcessGroupStatus()) {
            mergedRemoteGroupMap.put(status.getId(), status);
        }

        for (final RemoteProcessGroupStatus statusToMerge : toMerge.getRemoteProcessGroupStatus()) {
            RemoteProcessGroupStatus merged = mergedRemoteGroupMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            // NOTE - active/inactive port counts are not merged since that state is considered part of the flow (like runStatus)
            merged.setReceivedContentSize(merged.getReceivedContentSize() + statusToMerge.getReceivedContentSize());
            merged.setReceivedCount(merged.getReceivedCount() + statusToMerge.getReceivedCount());
            merged.setSentContentSize(merged.getSentContentSize() + statusToMerge.getSentContentSize());
            merged.setSentCount(merged.getSentCount() + statusToMerge.getSentCount());
            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());

            // Take the earliest last refresh time
            final Date mergedLastRefreshTime = merged.getLastRefreshTime();
            final Date toMergeLastRefreshTime = statusToMerge.getLastRefreshTime();
            if (mergedLastRefreshTime == null || (toMergeLastRefreshTime != null && toMergeLastRefreshTime.before(mergedLastRefreshTime))) {
                merged.setLastRefreshTime(toMergeLastRefreshTime);
            }
        }

        target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());

        final ProcessingPerformanceStatus targetPerformanceStatus = target.getProcessingPerformanceStatus();
        final ProcessingPerformanceStatus toMergePerformanceStatus = toMerge.getProcessingPerformanceStatus();

        if (targetPerformanceStatus != null && toMergePerformanceStatus != null) {
            targetPerformanceStatus.setIdentifier(toMergePerformanceStatus.getIdentifier());
            targetPerformanceStatus.setCpuDuration(targetPerformanceStatus.getCpuDuration() + toMergePerformanceStatus.getCpuDuration());
            targetPerformanceStatus.setContentReadDuration(targetPerformanceStatus.getContentReadDuration() + toMergePerformanceStatus.getContentReadDuration());
            targetPerformanceStatus.setContentWriteDuration(targetPerformanceStatus.getContentWriteDuration() + toMergePerformanceStatus.getContentWriteDuration());
            targetPerformanceStatus.setSessionCommitDuration(targetPerformanceStatus.getSessionCommitDuration() + toMergePerformanceStatus.getSessionCommitDuration());
            targetPerformanceStatus.setGarbageCollectionDuration(targetPerformanceStatus.getGarbageCollectionDuration() + toMergePerformanceStatus.getGarbageCollectionDuration());
        } else {
            target.setProcessingPerformanceStatus(targetPerformanceStatus);
        }
    }