func()

in kafka/consumer.go [442:462]


func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) {
	c.mu.Lock()
	defer c.mu.Unlock()
	for topic, partitions := range assigned {
		for _, partition := range partitions {
			t := strings.TrimPrefix(topic, c.topicPrefix)
			logger := c.logger.With(
				zap.String("topic", t),
				zap.Int32("partition", partition),
			)
			if c.logFieldFn != nil {
				logger = logger.With(c.logFieldFn(t))
			}

			pc := newPartitionConsumer(c.ctx, client, c.processor,
				c.delivery, t, logger,
			)
			c.assignments[topicPartition{topic: topic, partition: partition}] = pc
		}
	}
}