func()

in kafka/producer.go [224:284]


func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error {
	if len(rs) == 0 {
		return nil
	}

	// Take a read lock to prevent Close from closing the client
	// while we're attempting to produce records.
	p.mu.RLock()
	defer p.mu.RUnlock()

	var headers []kgo.RecordHeader
	if m, ok := queuecontext.MetadataFromContext(ctx); ok {
		headers = make([]kgo.RecordHeader, 0, len(m))
		for k, v := range m {
			headers = append(headers, kgo.RecordHeader{
				Key: k, Value: []byte(v),
			})
		}
	}

	var wg sync.WaitGroup
	wg.Add(len(rs))
	if !p.cfg.Sync {
		ctx = queuecontext.DetachedContext(ctx)
	}
	namespacePrefix := p.cfg.namespacePrefix()
	for _, record := range rs {
		kgoRecord := &kgo.Record{
			Headers: headers,
			Topic:   fmt.Sprintf("%s%s", namespacePrefix, record.Topic),
			Key:     record.OrderingKey,
			Value:   record.Value,
		}
		p.client.Produce(ctx, kgoRecord, func(r *kgo.Record, err error) {
			defer wg.Done()
			// kotel already marks spans as errors. No need to handle it here.
			if err != nil {
				topicName := strings.TrimPrefix(r.Topic, namespacePrefix)
				logger := p.cfg.Logger
				if p.cfg.TopicLogFieldFunc != nil {
					logger = logger.With(p.cfg.TopicLogFieldFunc(topicName))
				}

				logger.Error("failed producing message",
					zap.Error(err),
					zap.String("topic", topicName),
					zap.Int64("offset", r.Offset),
					zap.Int32("partition", r.Partition),
					zap.Any("headers", headers),
				)
			}
			if p.cfg.ProduceCallback != nil {
				p.cfg.ProduceCallback(r, err)
			}
		})
	}
	if p.cfg.Sync {
		wg.Wait()
	}
	return nil
}