in src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java [437:649]
public static void merge(final ProcessGroupStatus target, final ProcessGroupStatus toMerge) {
if (target == null || toMerge == null) {
return;
}
target.setInputCount(target.getInputCount() + toMerge.getInputCount());
target.setInputContentSize(target.getInputContentSize() + toMerge.getInputContentSize());
target.setOutputCount(target.getOutputCount() + toMerge.getOutputCount());
target.setOutputContentSize(target.getOutputContentSize() + toMerge.getOutputContentSize());
target.setQueuedCount(target.getQueuedCount() + toMerge.getQueuedCount());
target.setQueuedContentSize(target.getQueuedContentSize() + toMerge.getQueuedContentSize());
target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setStatelessActiveThreadCount(target.getStatelessActiveThreadCount() + toMerge.getStatelessActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
target.setProcessingNanos(target.getProcessingNanos() + toMerge.getProcessingNanos());
// if the versioned flow state to merge is sync failure allow it to take precedence.
if (VersionedFlowState.SYNC_FAILURE.equals(toMerge.getVersionedFlowState())) {
target.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
}
// connection status
// sort by id
final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>();
for (final ConnectionStatus status : target.getConnectionStatus()) {
mergedConnectionMap.put(status.getId(), status);
}
for (final ConnectionStatus statusToMerge : toMerge.getConnectionStatus()) {
ConnectionStatus merged = mergedConnectionMap.get(statusToMerge.getId());
if (merged == null) {
mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
merged.setQueuedCount(merged.getQueuedCount() + statusToMerge.getQueuedCount());
merged.setQueuedBytes(merged.getQueuedBytes() + statusToMerge.getQueuedBytes());
merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
merged.setLoadBalanceStatus(mergeLoadBalanceStatus(merged.getLoadBalanceStatus(), statusToMerge.getLoadBalanceStatus()));
}
target.setConnectionStatus(mergedConnectionMap.values());
// processor status
final Map<String, ProcessorStatus> mergedProcessorMap = new HashMap<>();
for (final ProcessorStatus status : target.getProcessorStatus()) {
mergedProcessorMap.put(status.getId(), status);
}
for (final ProcessorStatus statusToMerge : toMerge.getProcessorStatus()) {
ProcessorStatus merged = mergedProcessorMap.get(statusToMerge.getId());
if (merged == null) {
mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
merged.setTerminatedThreadCount(merged.getTerminatedThreadCount() + statusToMerge.getTerminatedThreadCount());
merged.setBytesRead(merged.getBytesRead() + statusToMerge.getBytesRead());
merged.setBytesWritten(merged.getBytesWritten() + statusToMerge.getBytesWritten());
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
merged.setInvocations(merged.getInvocations() + statusToMerge.getInvocations());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setProcessingNanos(merged.getProcessingNanos() + statusToMerge.getProcessingNanos());
merged.setFlowFilesRemoved(merged.getFlowFilesRemoved() + statusToMerge.getFlowFilesRemoved());
// if the status to merge is invalid allow it to take precedence. whether the
// processor run status is disabled/stopped/running is part of the flow configuration
// and should not differ amongst nodes. however, whether a processor is invalid
// can be driven by environmental conditions. this check allows any of those to
// take precedence over the configured run status.
if (RunStatus.Validating.equals(statusToMerge.getRunStatus())) {
merged.setRunStatus(RunStatus.Validating);
} else if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
merged.setRunStatus(RunStatus.Invalid);
}
}
target.setProcessorStatus(mergedProcessorMap.values());
// input ports
final Map<String, PortStatus> mergedInputPortMap = new HashMap<>();
for (final PortStatus status : target.getInputPortStatus()) {
mergedInputPortMap.put(status.getId(), status);
}
for (final PortStatus statusToMerge : toMerge.getInputPortStatus()) {
PortStatus merged = mergedInputPortMap.get(statusToMerge.getId());
if (merged == null) {
mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
merged.setTransmitting(true);
}
// should be unnecessary here since ports run status should not be affected by
// environmental conditions but doing so in case that changes
if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
merged.setRunStatus(RunStatus.Invalid);
}
}
target.setInputPortStatus(mergedInputPortMap.values());
// output ports
final Map<String, PortStatus> mergedOutputPortMap = new HashMap<>();
for (final PortStatus status : target.getOutputPortStatus()) {
mergedOutputPortMap.put(status.getId(), status);
}
for (final PortStatus statusToMerge : toMerge.getOutputPortStatus()) {
PortStatus merged = mergedOutputPortMap.get(statusToMerge.getId());
if (merged == null) {
mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
merged.setTransmitting(true);
}
// should be unnecessary here since ports run status not should be affected by
// environmental conditions but doing so in case that changes
if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
merged.setRunStatus(RunStatus.Invalid);
}
}
target.setOutputPortStatus(mergedOutputPortMap.values());
// child groups
final Map<String, ProcessGroupStatus> mergedGroupMap = new HashMap<>();
for (final ProcessGroupStatus status : target.getProcessGroupStatus()) {
mergedGroupMap.put(status.getId(), status);
}
for (final ProcessGroupStatus statusToMerge : toMerge.getProcessGroupStatus()) {
ProcessGroupStatus merged = mergedGroupMap.get(statusToMerge.getId());
if (merged == null) {
mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
merge(merged, statusToMerge);
}
target.setOutputPortStatus(mergedOutputPortMap.values());
// remote groups
final Map<String, RemoteProcessGroupStatus> mergedRemoteGroupMap = new HashMap<>();
for (final RemoteProcessGroupStatus status : target.getRemoteProcessGroupStatus()) {
mergedRemoteGroupMap.put(status.getId(), status);
}
for (final RemoteProcessGroupStatus statusToMerge : toMerge.getRemoteProcessGroupStatus()) {
RemoteProcessGroupStatus merged = mergedRemoteGroupMap.get(statusToMerge.getId());
if (merged == null) {
mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
continue;
}
// NOTE - active/inactive port counts are not merged since that state is considered part of the flow (like runStatus)
merged.setReceivedContentSize(merged.getReceivedContentSize() + statusToMerge.getReceivedContentSize());
merged.setReceivedCount(merged.getReceivedCount() + statusToMerge.getReceivedCount());
merged.setSentContentSize(merged.getSentContentSize() + statusToMerge.getSentContentSize());
merged.setSentCount(merged.getSentCount() + statusToMerge.getSentCount());
merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
// Take the earliest last refresh time
final Date mergedLastRefreshTime = merged.getLastRefreshTime();
final Date toMergeLastRefreshTime = statusToMerge.getLastRefreshTime();
if (mergedLastRefreshTime == null || (toMergeLastRefreshTime != null && toMergeLastRefreshTime.before(mergedLastRefreshTime))) {
merged.setLastRefreshTime(toMergeLastRefreshTime);
}
}
target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
final ProcessingPerformanceStatus targetPerformanceStatus = target.getProcessingPerformanceStatus();
final ProcessingPerformanceStatus toMergePerformanceStatus = toMerge.getProcessingPerformanceStatus();
if (targetPerformanceStatus != null && toMergePerformanceStatus != null) {
targetPerformanceStatus.setIdentifier(toMergePerformanceStatus.getIdentifier());
targetPerformanceStatus.setCpuDuration(targetPerformanceStatus.getCpuDuration() + toMergePerformanceStatus.getCpuDuration());
targetPerformanceStatus.setContentReadDuration(targetPerformanceStatus.getContentReadDuration() + toMergePerformanceStatus.getContentReadDuration());
targetPerformanceStatus.setContentWriteDuration(targetPerformanceStatus.getContentWriteDuration() + toMergePerformanceStatus.getContentWriteDuration());
targetPerformanceStatus.setSessionCommitDuration(targetPerformanceStatus.getSessionCommitDuration() + toMergePerformanceStatus.getSessionCommitDuration());
targetPerformanceStatus.setGarbageCollectionDuration(targetPerformanceStatus.getGarbageCollectionDuration() + toMergePerformanceStatus.getGarbageCollectionDuration());
} else {
target.setProcessingPerformanceStatus(targetPerformanceStatus);
}
}