in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/prometheusutil/PrometheusMetricsUtil.java [49:262]
public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetricsRegistry, ProcessGroupStatus status,
String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();
nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesTransferred(), "AMOUNT_BYTES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getOutputContentSize(), "SIZE_CONTENT_OUTPUT_TOTAL",
instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getInputContentSize(), "SIZE_CONTENT_INPUT_TOTAL",
instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getQueuedContentSize(), "SIZE_CONTENT_QUEUED_TOTAL",
instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getOutputCount(), "AMOUNT_ITEMS_OUTPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getInputCount(), "AMOUNT_ITEMS_INPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getQueuedCount(), "AMOUNT_ITEMS_QUEUED", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
instanceId, componentType, componentName, componentId, parentPGId);
addProcessingPerformanceMetrics(nifiMetricsRegistry, status.getProcessingPerformanceStatus(),
instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getProcessingNanos(), "TOTAL_TASK_DURATION",
instanceId, componentType, componentName, componentId, parentPGId);
// Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(nifiMetricsRegistry, childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
}
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
// Report metrics for all components
for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
Map<String, Long> counters = processorStatus.getCounters();
if (counters != null) {
counters.entrySet().stream().forEach(entry -> nifiMetricsRegistry.setDataPoint(entry.getValue(), "PROCESSOR_COUNTERS",
processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId));
}
final String procComponentType = "Processor";
final String procComponentId = StringUtils.isEmpty(processorStatus.getId()) ? DEFAULT_LABEL_STRING : processorStatus.getId();
final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName();
final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId();
nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED",
instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesRemoved(), "AMOUNT_FLOWFILES_REMOVED",
instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(processorStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(processorStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(processorStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(processorStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
instanceId, procComponentType, procComponentName, procComponentId, parentId);
addProcessingPerformanceMetrics(nifiMetricsRegistry, processorStatus.getProcessingPerformanceStatus(),
instanceId, procComponentType, procComponentName, procComponentId, parentId);
}
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId();
final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName();
final String sourceId = StringUtils.isEmpty(connectionStatus.getSourceId()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceId();
final String sourceName = StringUtils.isEmpty(connectionStatus.getSourceName()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceName();
final String destinationId = StringUtils.isEmpty(connectionStatus.getDestinationId()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationId();
final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName();
final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId();
final String connComponentType = "Connection";
nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedBytes(), "SIZE_CONTENT_QUEUED_TOTAL",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedCount(), "AMOUNT_ITEMS_QUEUED",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureBytesThreshold(), "BACKPRESSURE_BYTES_THRESHOLD",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureObjectThreshold(), "BACKPRESSURE_OBJECT_THRESHOLD",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold()),
"PERCENT_USED_BYTES", instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
nifiMetricsRegistry.setDataPoint(getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold()),
"PERCENT_USED_COUNT", instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
|| (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getQueuedBytes());
nifiMetricsRegistry.setDataPoint(isBackpressureEnabled ? 1 : 0, "IS_BACKPRESSURE_ENABLED",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
}
for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentType = "InputPort";
nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
final Boolean isTransmitting = portStatus.isTransmitting();
nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());
nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
}
for (PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName();
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId();
final String portComponentType = "OutputPort";
nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
final Boolean isTransmitting = portStatus.isTransmitting();
nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());
nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
}
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId();
final String rpgComponentName = StringUtils.isEmpty(remoteProcessGroupStatus.getName()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getName();
final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId();
final String rpgComponentType = "RemoteProcessGroup";
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentContentSize(), "AMOUNT_BYTES_WRITTEN", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedContentSize(), "AMOUNT_BYTES_RECEIVED", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentCount(), "AMOUNT_ITEMS_OUTPUT",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedCount(), "AMOUNT_ITEMS_INPUT",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveRemotePortCount(), "ACTIVE_REMOTE_PORT_COUNT",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getInactiveRemotePortCount(), "INACTIVE_REMOTE_PORT_COUNT",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
nifiMetricsRegistry.setDataPoint(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0, "IS_TRANSMITTING",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name());
nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
}
}
return nifiMetricsRegistry.getRegistry();
}