public static CollectorRegistry createNifiMetrics()

in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/prometheusutil/PrometheusMetricsUtil.java [49:262]


    public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetricsRegistry, ProcessGroupStatus status,
                                                      String instId, String parentProcessGroupId, String compType, String metricsStrategy) {

        final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
        final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
        final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
        final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
        final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();

        nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);

        nifiMetricsRegistry.setDataPoint(status.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getBytesTransferred(), "AMOUNT_BYTES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);

        nifiMetricsRegistry.setDataPoint(status.getOutputContentSize(), "SIZE_CONTENT_OUTPUT_TOTAL",
                instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
        nifiMetricsRegistry.setDataPoint(status.getInputContentSize(), "SIZE_CONTENT_INPUT_TOTAL",
                instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
        nifiMetricsRegistry.setDataPoint(status.getQueuedContentSize(), "SIZE_CONTENT_QUEUED_TOTAL",
                instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");

        nifiMetricsRegistry.setDataPoint(status.getOutputCount(), "AMOUNT_ITEMS_OUTPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
        nifiMetricsRegistry.setDataPoint(status.getInputCount(), "AMOUNT_ITEMS_INPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
        nifiMetricsRegistry.setDataPoint(status.getQueuedCount(), "AMOUNT_ITEMS_QUEUED", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");

        nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
                instanceId, componentType, componentName, componentId, parentPGId);
        nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
                instanceId, componentType, componentName, componentId, parentPGId);

        addProcessingPerformanceMetrics(nifiMetricsRegistry, status.getProcessingPerformanceStatus(),
                instanceId, componentType, componentName, componentId, parentPGId);

        nifiMetricsRegistry.setDataPoint(status.getProcessingNanos(), "TOTAL_TASK_DURATION",
                instanceId, componentType, componentName, componentId, parentPGId);

        // Report metrics for child process groups if specified
        if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
            status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(nifiMetricsRegistry, childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
        }

        if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
            // Report metrics for all components
            for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
                Map<String, Long> counters = processorStatus.getCounters();

                if (counters != null) {
                    counters.entrySet().stream().forEach(entry -> nifiMetricsRegistry.setDataPoint(entry.getValue(), "PROCESSOR_COUNTERS",
                            processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId));
                }

                final String procComponentType = "Processor";
                final String procComponentId = StringUtils.isEmpty(processorStatus.getId()) ? DEFAULT_LABEL_STRING : processorStatus.getId();
                final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName();
                final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId();

                nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesRemoved(), "AMOUNT_FLOWFILES_REMOVED",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(processorStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(processorStatus.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(processorStatus.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(processorStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, procComponentType, procComponentName, procComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(processorStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(processorStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(processorStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(processorStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(processorStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
                        instanceId, procComponentType, procComponentName, procComponentId, parentId);

                addProcessingPerformanceMetrics(nifiMetricsRegistry, processorStatus.getProcessingPerformanceStatus(),
                        instanceId, procComponentType, procComponentName, procComponentId, parentId);

            }

            for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
                final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId();
                final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName();
                final String sourceId = StringUtils.isEmpty(connectionStatus.getSourceId()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceId();
                final String sourceName = StringUtils.isEmpty(connectionStatus.getSourceName()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceName();
                final String destinationId = StringUtils.isEmpty(connectionStatus.getDestinationId()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationId();
                final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName();
                final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId();
                final String connComponentType = "Connection";
                nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(connectionStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedBytes(), "SIZE_CONTENT_QUEUED_TOTAL",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);

                nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(connectionStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedCount(), "AMOUNT_ITEMS_QUEUED",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);

                nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureBytesThreshold(), "BACKPRESSURE_BYTES_THRESHOLD",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureObjectThreshold(), "BACKPRESSURE_OBJECT_THRESHOLD",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);

                nifiMetricsRegistry.setDataPoint(getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold()),
                        "PERCENT_USED_BYTES", instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
                nifiMetricsRegistry.setDataPoint(getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold()),
                        "PERCENT_USED_COUNT", instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);

                boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
                        || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getQueuedBytes());
                nifiMetricsRegistry.setDataPoint(isBackpressureEnabled ? 1 : 0, "IS_BACKPRESSURE_ENABLED",
                        instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
            }

            for (PortStatus portStatus : status.getInputPortStatus()) {
                final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
                final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
                final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
                final String portComponentType = "InputPort";
                nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");

                final Boolean isTransmitting = portStatus.isTransmitting();
                nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());

                nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
            }
            for (PortStatus portStatus : status.getOutputPortStatus()) {
                final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
                final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName();
                final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId();
                final String portComponentType = "OutputPort";
                nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");

                final Boolean isTransmitting = portStatus.isTransmitting();
                nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
                        instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());

                nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
            }
            for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
                final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId();
                final String rpgComponentName = StringUtils.isEmpty(remoteProcessGroupStatus.getName()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getName();
                final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId();
                final String rpgComponentType = "RemoteProcessGroup";

                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentContentSize(), "AMOUNT_BYTES_WRITTEN", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedContentSize(), "AMOUNT_BYTES_RECEIVED", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);

                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentCount(), "AMOUNT_ITEMS_OUTPUT",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedCount(), "AMOUNT_ITEMS_INPUT",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveRemotePortCount(), "ACTIVE_REMOTE_PORT_COUNT",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getInactiveRemotePortCount(), "INACTIVE_REMOTE_PORT_COUNT",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");

                nifiMetricsRegistry.setDataPoint(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0, "IS_TRANSMITTING",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name());

                nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
                        instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
            }
        }

        return nifiMetricsRegistry.getRegistry();
    }