in kafka/producer.go [224:284]
func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error {
if len(rs) == 0 {
return nil
}
// Take a read lock to prevent Close from closing the client
// while we're attempting to produce records.
p.mu.RLock()
defer p.mu.RUnlock()
var headers []kgo.RecordHeader
if m, ok := queuecontext.MetadataFromContext(ctx); ok {
headers = make([]kgo.RecordHeader, 0, len(m))
for k, v := range m {
headers = append(headers, kgo.RecordHeader{
Key: k, Value: []byte(v),
})
}
}
var wg sync.WaitGroup
wg.Add(len(rs))
if !p.cfg.Sync {
ctx = queuecontext.DetachedContext(ctx)
}
namespacePrefix := p.cfg.namespacePrefix()
for _, record := range rs {
kgoRecord := &kgo.Record{
Headers: headers,
Topic: fmt.Sprintf("%s%s", namespacePrefix, record.Topic),
Key: record.OrderingKey,
Value: record.Value,
}
p.client.Produce(ctx, kgoRecord, func(r *kgo.Record, err error) {
defer wg.Done()
// kotel already marks spans as errors. No need to handle it here.
if err != nil {
topicName := strings.TrimPrefix(r.Topic, namespacePrefix)
logger := p.cfg.Logger
if p.cfg.TopicLogFieldFunc != nil {
logger = logger.With(p.cfg.TopicLogFieldFunc(topicName))
}
logger.Error("failed producing message",
zap.Error(err),
zap.String("topic", topicName),
zap.Int64("offset", r.Offset),
zap.Int32("partition", r.Partition),
zap.Any("headers", headers),
)
}
if p.cfg.ProduceCallback != nil {
p.cfg.ProduceCallback(r, err)
}
})
}
if p.cfg.Sync {
wg.Wait()
}
return nil
}