in nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java [140:555]
ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> checkAuthorization,
final int recursiveStatusDepth, final int currentDepth, final boolean includeConnectionDetails) {
if (group == null) {
return null;
}
final ProcessGroupStatus status = new ProcessGroupStatus();
status.setId(group.getIdentifier());
status.setName(checkAuthorization.test(group) ? group.getName() : group.getIdentifier());
int activeGroupThreads = 0;
int terminatedGroupThreads = 0;
long bytesRead = 0L;
long bytesWritten = 0L;
int queuedCount = 0;
long queuedContentSize = 0L;
int flowFilesIn = 0;
long bytesIn = 0L;
int flowFilesOut = 0;
long bytesOut = 0L;
int flowFilesReceived = 0;
long bytesReceived = 0L;
int flowFilesSent = 0;
long bytesSent = 0L;
int flowFilesTransferred = 0;
long bytesTransferred = 0;
long processingNanos = 0;
final ProcessingPerformanceStatus performanceStatus = new ProcessingPerformanceStatus();
performanceStatus.setIdentifier(group.getIdentifier());
final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth;
// Set Authorization predicate based on whether to populate child component status avoiding unnecessary calls to Authorizer
final Predicate<Authorizable> isAuthorized;
if (populateChildStatuses) {
isAuthorized = checkAuthorization;
} else {
isAuthorized = AUTHORIZATION_DENIED;
}
// set status for processors
final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
status.setProcessorStatus(processorStatusCollection);
for (final ProcessorNode procNode : group.getProcessors()) {
final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode, isAuthorized);
if (populateChildStatuses) {
processorStatusCollection.add(procStat);
}
activeGroupThreads += procStat.getActiveThreadCount();
terminatedGroupThreads += procStat.getTerminatedThreadCount();
bytesRead += procStat.getBytesRead();
bytesWritten += procStat.getBytesWritten();
flowFilesReceived += procStat.getFlowFilesReceived();
bytesReceived += procStat.getBytesReceived();
flowFilesSent += procStat.getFlowFilesSent();
bytesSent += procStat.getBytesSent();
processingNanos += procStat.getProcessingNanos();
final ProcessingPerformanceStatus processorPerformanceStatus = procStat.getProcessingPerformanceStatus();
if (processorPerformanceStatus != null) {
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() + processorPerformanceStatus.getCpuDuration());
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration() + processorPerformanceStatus.getContentReadDuration());
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration() + processorPerformanceStatus.getContentWriteDuration());
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration() + processorPerformanceStatus.getSessionCommitDuration());
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration() + processorPerformanceStatus.getGarbageCollectionDuration());
}
}
// set status for local child groups
final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>();
status.setProcessGroupStatus(localChildGroupStatusCollection);
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final ProcessGroupStatus childGroupStatus;
if (populateChildStatuses) {
childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
localChildGroupStatusCollection.add(childGroupStatus);
} else {
// In this case, we don't want to include any of the recursive components' individual statuses. As a result, we can
// avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate
// that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating
// things that we would otherwise need to calculate if the user were in fact authorized.
childGroupStatus = getGroupStatus(childGroup, statusReport, AUTHORIZATION_DENIED, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
}
activeGroupThreads += childGroupStatus.getActiveThreadCount();
terminatedGroupThreads += childGroupStatus.getTerminatedThreadCount();
bytesRead += childGroupStatus.getBytesRead();
bytesWritten += childGroupStatus.getBytesWritten();
queuedCount += childGroupStatus.getQueuedCount();
queuedContentSize += childGroupStatus.getQueuedContentSize();
flowFilesReceived += childGroupStatus.getFlowFilesReceived();
bytesReceived += childGroupStatus.getBytesReceived();
flowFilesSent += childGroupStatus.getFlowFilesSent();
bytesSent += childGroupStatus.getBytesSent();
flowFilesTransferred += childGroupStatus.getFlowFilesTransferred();
bytesTransferred += childGroupStatus.getBytesTransferred();
processingNanos += childGroupStatus.getProcessingNanos();
final ProcessingPerformanceStatus childGroupPerformanceStatus = childGroupStatus.getProcessingPerformanceStatus();
if (childGroupPerformanceStatus != null) {
performanceStatus.setCpuDuration(performanceStatus.getCpuDuration() + childGroupPerformanceStatus.getCpuDuration());
performanceStatus.setContentReadDuration(performanceStatus.getContentReadDuration() + childGroupPerformanceStatus.getContentReadDuration());
performanceStatus.setContentWriteDuration(performanceStatus.getContentWriteDuration() + childGroupPerformanceStatus.getContentWriteDuration());
performanceStatus.setSessionCommitDuration(performanceStatus.getSessionCommitDuration() + childGroupPerformanceStatus.getSessionCommitDuration());
performanceStatus.setGarbageCollectionDuration(performanceStatus.getGarbageCollectionDuration() + childGroupPerformanceStatus.getGarbageCollectionDuration());
}
}
// set status for remote child groups
final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>();
status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection);
for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport, isAuthorized);
if (remoteStatus != null) {
if (populateChildStatuses) {
remoteProcessGroupStatusCollection.add(remoteStatus);
}
flowFilesReceived += remoteStatus.getReceivedCount();
bytesReceived += remoteStatus.getReceivedContentSize();
flowFilesSent += remoteStatus.getSentCount();
bytesSent += remoteStatus.getSentContentSize();
}
}
// connection status
final Collection<ConnectionStatus> connectionStatusCollection = new ArrayList<>();
status.setConnectionStatus(connectionStatusCollection);
long now = System.currentTimeMillis();
// get the connection and remote port status
for (final Connection conn : group.getConnections()) {
final boolean isConnectionAuthorized = isAuthorized.test(conn);
final boolean isSourceAuthorized = isAuthorized.test(conn.getSource());
final boolean isDestinationAuthorized = isAuthorized.test(conn.getDestination());
final ConnectionStatus connStatus = new ConnectionStatus();
connStatus.setId(conn.getIdentifier());
connStatus.setGroupId(conn.getProcessGroup().getIdentifier());
connStatus.setSourceId(conn.getSource().getIdentifier());
connStatus.setSourceName(isSourceAuthorized ? conn.getSource().getName() : conn.getSource().getIdentifier());
connStatus.setDestinationId(conn.getDestination().getIdentifier());
connStatus.setDestinationName(isDestinationAuthorized ? conn.getDestination().getName() : conn.getDestination().getIdentifier());
connStatus.setBackPressureDataSizeThreshold(conn.getFlowFileQueue().getBackPressureDataSizeThreshold());
connStatus.setBackPressureObjectThreshold(conn.getFlowFileQueue().getBackPressureObjectThreshold());
if (includeConnectionDetails) {
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
} else {
connStatus.setTotalQueuedDuration(0L);
connStatus.setMaxQueuedDuration(0L);
}
connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability());
final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
if (connectionStatusReport != null) {
connStatus.setInputBytes(connectionStatusReport.getContentSizeIn());
connStatus.setInputCount(connectionStatusReport.getFlowFilesIn());
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
}
if (statusAnalyticsEngine != null) {
StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier());
if (statusAnalytics != null) {
Map<String, Long> predictionValues = statusAnalytics.getPredictions();
ConnectionStatusPredictions predictions = new ConnectionStatusPredictions();
connStatus.setPredictions(predictions);
predictions.setPredictedTimeToBytesBackpressureMillis(predictionValues.get("timeToBytesBackpressureMillis"));
predictions.setPredictedTimeToCountBackpressureMillis(predictionValues.get("timeToCountBackpressureMillis"));
predictions.setNextPredictedQueuedBytes(predictionValues.get("nextIntervalBytes"));
predictions.setNextPredictedQueuedCount(predictionValues.get("nextIntervalCount").intValue());
predictions.setPredictedPercentCount(predictionValues.get("nextIntervalPercentageUseCount").intValue());
predictions.setPredictedPercentBytes(predictionValues.get("nextIntervalPercentageUseBytes").intValue());
predictions.setPredictionIntervalMillis(predictionValues.get("intervalTimeMillis"));
}
} else {
connStatus.setPredictions(null);
}
if (isConnectionAuthorized) {
if (StringUtils.isNotBlank(conn.getName())) {
connStatus.setName(conn.getName());
} else if (conn.getRelationships() != null && !conn.getRelationships().isEmpty()) {
final Collection<String> relationships = new ArrayList<>(conn.getRelationships().size());
for (final Relationship relationship : conn.getRelationships()) {
relationships.add(relationship.getName());
}
connStatus.setName(StringUtils.join(relationships, ", "));
}
} else {
connStatus.setName(conn.getIdentifier());
}
final QueueSize queueSize = conn.getFlowFileQueue().size();
final int connectionQueuedCount = queueSize.getObjectCount();
final long connectionQueuedBytes = queueSize.getByteCount();
if (connectionQueuedCount > 0) {
connStatus.setQueuedBytes(connectionQueuedBytes);
connStatus.setQueuedCount(connectionQueuedCount);
}
final FlowFileQueue flowFileQueue = conn.getFlowFileQueue();
final LoadBalanceStrategy loadBalanceStrategy = flowFileQueue.getLoadBalanceStrategy();
if (loadBalanceStrategy == LoadBalanceStrategy.DO_NOT_LOAD_BALANCE) {
connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_NOT_CONFIGURED);
} else if (flowFileQueue.isActivelyLoadBalancing()) {
connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_ACTIVE);
} else {
connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_INACTIVE);
}
if (populateChildStatuses) {
connectionStatusCollection.add(connStatus);
}
queuedCount += connectionQueuedCount;
queuedContentSize += connectionQueuedBytes;
final Connectable source = conn.getSource();
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) {
final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) source;
activeGroupThreads += processScheduler.getActiveThreadCount(remoteOutputPort);
}
final Connectable destination = conn.getDestination();
if (ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) {
final RemoteGroupPort remoteInputPort = (RemoteGroupPort) destination;
activeGroupThreads += processScheduler.getActiveThreadCount(remoteInputPort);
}
}
// status for input ports
final Collection<PortStatus> inputPortStatusCollection = new ArrayList<>();
status.setInputPortStatus(inputPortStatusCollection);
final Set<Port> inputPorts = group.getInputPorts();
for (final Port port : inputPorts) {
final boolean isInputPortAuthorized = isAuthorized.test(port);
final PortStatus portStatus = new PortStatus();
portStatus.setId(port.getIdentifier());
portStatus.setGroupId(port.getProcessGroup().getIdentifier());
portStatus.setName(isInputPortAuthorized ? port.getName() : port.getIdentifier());
portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port));
// determine the run status
if (ScheduledState.RUNNING.equals(port.getScheduledState())) {
portStatus.setRunStatus(RunStatus.Running);
} else if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
portStatus.setRunStatus(RunStatus.Disabled);
} else if (!port.isValid()) {
portStatus.setRunStatus(RunStatus.Invalid);
} else {
portStatus.setRunStatus(RunStatus.Stopped);
}
// special handling for public ports
if (port instanceof PublicPort) {
portStatus.setTransmitting(((PublicPort) port).isTransmitting());
}
final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
if (entry == null) {
portStatus.setInputBytes(0L);
portStatus.setInputCount(0);
portStatus.setOutputBytes(0L);
portStatus.setOutputCount(0);
} else {
final int processedCount = entry.getFlowFilesOut();
final long numProcessedBytes = entry.getContentSizeOut();
portStatus.setOutputBytes(numProcessedBytes);
portStatus.setOutputCount(processedCount);
final int inputCount = entry.getFlowFilesIn();
final long inputBytes = entry.getContentSizeIn();
portStatus.setInputBytes(inputBytes);
portStatus.setInputCount(inputCount);
flowFilesIn += port instanceof PublicPort ? entry.getFlowFilesReceived() : inputCount;
bytesIn += port instanceof PublicPort ? entry.getBytesReceived() : inputBytes;
bytesWritten += entry.getBytesWritten();
flowFilesReceived += entry.getFlowFilesReceived();
bytesReceived += entry.getBytesReceived();
}
if (populateChildStatuses) {
inputPortStatusCollection.add(portStatus);
}
activeGroupThreads += portStatus.getActiveThreadCount();
}
// status for output ports
final Collection<PortStatus> outputPortStatusCollection = new ArrayList<>();
status.setOutputPortStatus(outputPortStatusCollection);
final Set<Port> outputPorts = group.getOutputPorts();
for (final Port port : outputPorts) {
final boolean isOutputPortAuthorized = isAuthorized.test(port);
final PortStatus portStatus = new PortStatus();
portStatus.setId(port.getIdentifier());
portStatus.setGroupId(port.getProcessGroup().getIdentifier());
portStatus.setName(isOutputPortAuthorized ? port.getName() : port.getIdentifier());
portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port));
// determine the run status
if (ScheduledState.RUNNING.equals(port.getScheduledState())) {
portStatus.setRunStatus(RunStatus.Running);
} else if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
portStatus.setRunStatus(RunStatus.Disabled);
} else if (!port.isValid()) {
portStatus.setRunStatus(RunStatus.Invalid);
} else {
portStatus.setRunStatus(RunStatus.Stopped);
}
// special handling for public ports
if (port instanceof PublicPort) {
portStatus.setTransmitting(((PublicPort) port).isTransmitting());
}
final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
if (entry == null) {
portStatus.setInputBytes(0L);
portStatus.setInputCount(0);
portStatus.setOutputBytes(0L);
portStatus.setOutputCount(0);
} else {
final int processedCount = entry.getFlowFilesOut();
final long numProcessedBytes = entry.getContentSizeOut();
portStatus.setOutputBytes(numProcessedBytes);
portStatus.setOutputCount(processedCount);
final int inputCount = entry.getFlowFilesIn();
final long inputBytes = entry.getContentSizeIn();
portStatus.setInputBytes(inputBytes);
portStatus.setInputCount(inputCount);
bytesRead += entry.getBytesRead();
flowFilesOut += port instanceof PublicPort ? entry.getFlowFilesSent() : entry.getFlowFilesOut();
bytesOut += port instanceof PublicPort ? entry.getBytesSent() : entry.getContentSizeOut();
flowFilesSent = entry.getFlowFilesSent();
bytesSent += entry.getBytesSent();
}
if (populateChildStatuses) {
outputPortStatusCollection.add(portStatus);
}
activeGroupThreads += portStatus.getActiveThreadCount();
}
for (final Funnel funnel : group.getFunnels()) {
activeGroupThreads += processScheduler.getActiveThreadCount(funnel);
}
final int statelessActiveThreadCount;
if (group.resolveExecutionEngine() == ExecutionEngine.STATELESS) {
statelessActiveThreadCount = processScheduler.getActiveThreadCount(group);
activeGroupThreads = statelessActiveThreadCount;
} else {
statelessActiveThreadCount = 0;
}
status.setActiveThreadCount(activeGroupThreads);
status.setStatelessActiveThreadCount(statelessActiveThreadCount);
status.setTerminatedThreadCount(terminatedGroupThreads);
status.setBytesRead(bytesRead);
status.setBytesWritten(bytesWritten);
status.setQueuedCount(queuedCount);
status.setQueuedContentSize(queuedContentSize);
status.setInputContentSize(bytesIn);
status.setInputCount(flowFilesIn);
status.setOutputContentSize(bytesOut);
status.setOutputCount(flowFilesOut);
status.setFlowFilesReceived(flowFilesReceived);
status.setBytesReceived(bytesReceived);
status.setFlowFilesSent(flowFilesSent);
status.setBytesSent(bytesSent);
status.setFlowFilesTransferred(flowFilesTransferred);
status.setBytesTransferred(bytesTransferred);
status.setProcessingNanos(processingNanos);
status.setProcessingPerformanceStatus(performanceStatus);
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
try {
final VersionedFlowStatus flowStatus = vci.getStatus();
if (flowStatus != null && flowStatus.getState() != null) {
status.setVersionedFlowState(flowStatus.getState());
}
} catch (final Exception e) {
logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e);
status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
}
}
return status;
}