in kafka/consumer.go [442:462]
func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) {
c.mu.Lock()
defer c.mu.Unlock()
for topic, partitions := range assigned {
for _, partition := range partitions {
t := strings.TrimPrefix(topic, c.topicPrefix)
logger := c.logger.With(
zap.String("topic", t),
zap.Int32("partition", partition),
)
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(t))
}
pc := newPartitionConsumer(c.ctx, client, c.processor,
c.delivery, t, logger,
)
c.assignments[topicPartition{topic: topic, partition: partition}] = pc
}
}
}