in kafka/manager.go [189:328]
func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error) {
monitorTopicConsumers := make(map[apmqueue.TopicConsumer]struct{}, len(topicConsumers))
var regex []regexConsumer
for _, tc := range topicConsumers {
monitorTopicConsumers[tc] = struct{}{}
if tc.Regex != "" {
re, err := regexp.Compile(tc.Regex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %s: %w",
tc.Regex, err,
)
}
regex = append(regex, regexConsumer{
regex: re,
consumer: tc.Consumer,
})
}
}
mp := m.cfg.meterProvider()
meter := mp.Meter("github.com/elastic/apm-queue/kafka")
consumerGroupLagMetric, err := meter.Int64ObservableGauge("consumer_group_lag")
if err != nil {
return nil, fmt.Errorf("kafka: failed to create consumer_group_lag metric: %w", err)
}
assignmentMetric, err := meter.Int64ObservableGauge("consumer_group_assignment")
if err != nil {
return nil, fmt.Errorf("kafka: failed to create consumer_group.assignment metric: %w", err)
}
namespacePrefix := m.cfg.namespacePrefix()
gatherMetrics := func(ctx context.Context, o metric.Observer) error {
ctx, span := m.tracer.Start(ctx, "GatherMetrics")
defer span.End()
consumerSet := make(map[string]struct{})
for _, tc := range topicConsumers {
consumerSet[tc.Consumer] = struct{}{}
}
consumers := make([]string, 0, len(consumerSet))
for consumer := range consumerSet {
consumers = append(consumers, consumer)
}
m.cfg.Logger.Debug("reporting consumer lag", zap.Strings("consumers", consumers))
lag, err := m.adminClient.Lag(ctx, consumers...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to calculate consumer lag: %w", err)
}
lag.Each(func(l kadm.DescribedGroupLag) {
if err := l.Error(); err != nil {
m.cfg.Logger.Warn("error calculating consumer lag",
zap.String("group", l.Group),
zap.Error(err),
)
return
}
// Map Consumer group member assignments.
memberAssignments := make(map[memberTopic]int64)
for topic, partitions := range l.Lag {
if !strings.HasPrefix(topic, namespacePrefix) {
// Ignore topics outside the namespace.
continue
}
topic = topic[len(namespacePrefix):]
logger := m.cfg.Logger
if m.cfg.TopicLogFieldFunc != nil {
logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
}
var matchesRegex bool
for _, re := range regex {
if l.Group == re.consumer && re.regex.MatchString(topic) {
matchesRegex = true
break
}
}
if _, ok := monitorTopicConsumers[apmqueue.TopicConsumer{
Topic: apmqueue.Topic(topic),
Consumer: l.Group,
}]; !ok && !matchesRegex {
// Skip when no topic matches explicit name or regex.
continue
}
for partition, lag := range partitions {
if lag.Err != nil {
logger.Warn("error getting consumer group lag",
zap.String("group", l.Group),
zap.String("topic", topic),
zap.Int32("partition", partition),
zap.Error(lag.Err),
)
continue
}
clientID := "nil"
// If group is in state Empty the lag.Member is nil
if lag.Member != nil {
clientID = lag.Member.ClientID
}
key := memberTopic{topic: topic, clientID: clientID}
count := memberAssignments[key]
count++
memberAssignments[key] = count
attrs := []attribute.KeyValue{
attribute.String("group", l.Group),
attribute.String("topic", topic),
attribute.Int("partition", int(partition)),
}
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
o.ObserveInt64(
consumerGroupLagMetric, lag.Lag,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
}
for key, count := range memberAssignments {
attrs := []attribute.KeyValue{
attribute.String("group", l.Group),
attribute.String("topic", key.topic),
attribute.String("client_id", key.clientID),
}
if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) {
attrs = append(attrs, kv)
}
o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet(
attribute.NewSet(attrs...),
))
}
})
return nil
}
return meter.RegisterCallback(gatherMetrics, consumerGroupLagMetric,
assignmentMetric,
)
}