func()

in kafka/manager.go [189:328]


func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error) {
	monitorTopicConsumers := make(map[apmqueue.TopicConsumer]struct{}, len(topicConsumers))
	var regex []regexConsumer
	for _, tc := range topicConsumers {
		monitorTopicConsumers[tc] = struct{}{}
		if tc.Regex != "" {
			re, err := regexp.Compile(tc.Regex)
			if err != nil {
				return nil, fmt.Errorf("failed to compile regex %s: %w",
					tc.Regex, err,
				)
			}
			regex = append(regex, regexConsumer{
				regex:    re,
				consumer: tc.Consumer,
			})
		}
	}

	mp := m.cfg.meterProvider()
	meter := mp.Meter("github.com/elastic/apm-queue/kafka")
	consumerGroupLagMetric, err := meter.Int64ObservableGauge("consumer_group_lag")
	if err != nil {
		return nil, fmt.Errorf("kafka: failed to create consumer_group_lag metric: %w", err)
	}
	assignmentMetric, err := meter.Int64ObservableGauge("consumer_group_assignment")
	if err != nil {
		return nil, fmt.Errorf("kafka: failed to create consumer_group.assignment metric: %w", err)
	}

	namespacePrefix := m.cfg.namespacePrefix()
	gatherMetrics := func(ctx context.Context, o metric.Observer) error {
		ctx, span := m.tracer.Start(ctx, "GatherMetrics")
		defer span.End()

		consumerSet := make(map[string]struct{})
		for _, tc := range topicConsumers {
			consumerSet[tc.Consumer] = struct{}{}
		}
		consumers := make([]string, 0, len(consumerSet))
		for consumer := range consumerSet {
			consumers = append(consumers, consumer)
		}
		m.cfg.Logger.Debug("reporting consumer lag", zap.Strings("consumers", consumers))

		lag, err := m.adminClient.Lag(ctx, consumers...)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
			return fmt.Errorf("failed to calculate consumer lag: %w", err)
		}
		lag.Each(func(l kadm.DescribedGroupLag) {
			if err := l.Error(); err != nil {
				m.cfg.Logger.Warn("error calculating consumer lag",
					zap.String("group", l.Group),
					zap.Error(err),
				)
				return
			}
			// Map Consumer group member assignments.
			memberAssignments := make(map[memberTopic]int64)
			for topic, partitions := range l.Lag {
				if !strings.HasPrefix(topic, namespacePrefix) {
					// Ignore topics outside the namespace.
					continue
				}
				topic = topic[len(namespacePrefix):]

				logger := m.cfg.Logger
				if m.cfg.TopicLogFieldFunc != nil {
					logger = logger.With(m.cfg.TopicLogFieldFunc(topic))
				}

				var matchesRegex bool
				for _, re := range regex {
					if l.Group == re.consumer && re.regex.MatchString(topic) {
						matchesRegex = true
						break
					}
				}
				if _, ok := monitorTopicConsumers[apmqueue.TopicConsumer{
					Topic:    apmqueue.Topic(topic),
					Consumer: l.Group,
				}]; !ok && !matchesRegex {
					// Skip when no topic matches explicit name or regex.
					continue
				}
				for partition, lag := range partitions {
					if lag.Err != nil {
						logger.Warn("error getting consumer group lag",
							zap.String("group", l.Group),
							zap.String("topic", topic),
							zap.Int32("partition", partition),
							zap.Error(lag.Err),
						)
						continue
					}
					clientID := "nil"
					// If group is in state Empty the lag.Member is nil
					if lag.Member != nil {
						clientID = lag.Member.ClientID
					}
					key := memberTopic{topic: topic, clientID: clientID}
					count := memberAssignments[key]
					count++
					memberAssignments[key] = count
					attrs := []attribute.KeyValue{
						attribute.String("group", l.Group),
						attribute.String("topic", topic),
						attribute.Int("partition", int(partition)),
					}
					if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
						attrs = append(attrs, kv)
					}
					o.ObserveInt64(
						consumerGroupLagMetric, lag.Lag,
						metric.WithAttributeSet(attribute.NewSet(attrs...)),
					)
				}
			}
			for key, count := range memberAssignments {
				attrs := []attribute.KeyValue{
					attribute.String("group", l.Group),
					attribute.String("topic", key.topic),
					attribute.String("client_id", key.clientID),
				}
				if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) {
					attrs = append(attrs, kv)
				}
				o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet(
					attribute.NewSet(attrs...),
				))
			}
		})
		return nil
	}
	return meter.RegisterCallback(gatherMetrics, consumerGroupLagMetric,
		assignmentMetric,
	)
}