in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java [121:327]
public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
this.registry = Objects.requireNonNull(registry);
this.metrics = Objects.requireNonNull(metrics);
classicGroupCountMetricName = metrics.metricName(
GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The total number of groups using the classic rebalance protocol.",
Map.of(GROUP_COUNT_PROTOCOL_TAG, Group.GroupType.CLASSIC.toString())
);
consumerGroupCountMetricName = metrics.metricName(
GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The total number of groups using the consumer rebalance protocol.",
Map.of(GROUP_COUNT_PROTOCOL_TAG, Group.GroupType.CONSUMER.toString())
);
consumerGroupCountEmptyMetricName = metrics.metricName(
CONSUMER_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of consumer groups in empty state.",
Map.of(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.EMPTY.toString())
);
consumerGroupCountAssigningMetricName = metrics.metricName(
CONSUMER_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of consumer groups in assigning state.",
Map.of(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.ASSIGNING.toString())
);
consumerGroupCountReconcilingMetricName = metrics.metricName(
CONSUMER_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of consumer groups in reconciling state.",
Map.of(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.RECONCILING.toString())
);
consumerGroupCountStableMetricName = metrics.metricName(
CONSUMER_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of consumer groups in stable state.",
Map.of(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.STABLE.toString())
);
consumerGroupCountDeadMetricName = metrics.metricName(
CONSUMER_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of consumer groups in dead state.",
Map.of(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.DEAD.toString())
);
shareGroupCountMetricName = metrics.metricName(
GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The total number of share groups.",
Map.of(SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())
);
shareGroupCountEmptyMetricName = metrics.metricName(
SHARE_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of share groups in empty state.",
Map.of(SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.EMPTY.toString())
);
shareGroupCountStableMetricName = metrics.metricName(
SHARE_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of share groups in stable state.",
Map.of(SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.STABLE.toString())
);
shareGroupCountDeadMetricName = metrics.metricName(
SHARE_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of share groups in dead state.",
Map.of(SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.DEAD.toString())
);
streamsGroupCountMetricName = metrics.metricName(
GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The total number of groups using the streams rebalance protocol.",
Map.of(GROUP_COUNT_PROTOCOL_TAG, Group.GroupType.STREAMS.toString())
);
streamsGroupCountEmptyMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in empty state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.EMPTY.toString())
);
streamsGroupCountAssigningMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in assigning state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.ASSIGNING.toString())
);
streamsGroupCountReconcilingMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in reconciling state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.RECONCILING.toString())
);
streamsGroupCountStableMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in stable state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.STABLE.toString())
);
streamsGroupCountDeadMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in dead state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.DEAD.toString())
);
streamsGroupCountNotReadyMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in not ready state.",
Map.of(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.NOT_READY.toString())
);
registerGauges();
Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
offsetCommitsSensor.add(new Meter(
metrics.metricName("offset-commit-rate",
METRICS_GROUP,
"The rate of committed offsets"),
metrics.metricName("offset-commit-count",
METRICS_GROUP,
"The total number of committed offsets")));
Sensor offsetExpiredSensor = metrics.sensor(OFFSET_EXPIRED_SENSOR_NAME);
offsetExpiredSensor.add(new Meter(
metrics.metricName("offset-expiration-rate",
METRICS_GROUP,
"The rate of expired offsets"),
metrics.metricName("offset-expiration-count",
METRICS_GROUP,
"The total number of expired offsets")));
Sensor offsetDeletionsSensor = metrics.sensor(OFFSET_DELETIONS_SENSOR_NAME);
offsetDeletionsSensor.add(new Meter(
metrics.metricName("offset-deletion-rate",
METRICS_GROUP,
"The rate of administrative deleted offsets"),
metrics.metricName("offset-deletion-count",
METRICS_GROUP,
"The total number of administrative deleted offsets")));
Sensor classicGroupCompletedRebalancesSensor = metrics.sensor(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
classicGroupCompletedRebalancesSensor.add(new Meter(
metrics.metricName("group-completed-rebalance-rate",
METRICS_GROUP,
"The rate of classic group completed rebalances"),
metrics.metricName("group-completed-rebalance-count",
METRICS_GROUP,
"The total number of classic group completed rebalances")));
Sensor consumerGroupRebalanceSensor = metrics.sensor(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
consumerGroupRebalanceSensor.add(new Meter(
metrics.metricName("consumer-group-rebalance-rate",
METRICS_GROUP,
"The rate of consumer group rebalances"),
metrics.metricName("consumer-group-rebalance-count",
METRICS_GROUP,
"The total number of consumer group rebalances")));
Sensor shareGroupRebalanceSensor = metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME);
shareGroupRebalanceSensor.add(new Meter(
metrics.metricName("rebalance-rate",
METRICS_GROUP,
"The rate of share group rebalances",
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()),
metrics.metricName("rebalance-count",
METRICS_GROUP,
"The total number of share group rebalances",
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
streamsGroupRebalanceSensor.add(new Meter(
metrics.metricName("streams-group-rebalance-rate",
METRICS_GROUP,
"The rate of streams group rebalances"),
metrics.metricName("streams-group-rebalance-count",
METRICS_GROUP,
"The total number of streams group rebalances")));
globalSensors = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor),
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor),
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor),
Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, shareGroupRebalanceSensor),
Utils.mkEntry(STREAMS_GROUP_REBALANCES_SENSOR_NAME, streamsGroupRebalanceSensor)
));
}