in kafka/consumer.go [205:290]
func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid consumer config: %w", err)
}
// `forceClose` is called by `Consumer.Close()` if / when the
// `cfg.ShutdownGracePeriod` is exceeded.
processingCtx, forceClose := context.WithCancelCause(context.Background())
namespacePrefix := cfg.namespacePrefix()
consumer := &consumer{
topicPrefix: namespacePrefix,
logFieldFn: cfg.TopicLogFieldFunc,
assignments: make(map[topicPartition]*pc),
processor: cfg.Processor,
logger: cfg.Logger.Named("partition"),
delivery: cfg.Delivery,
ctx: processingCtx,
}
topics := make([]string, len(cfg.Topics))
for i, topic := range cfg.Topics {
topics[i] = fmt.Sprintf("%s%s", consumer.topicPrefix, topic)
}
opts := []kgo.Opt{
// Injects the kgo.Client context as the record.Context.
kgo.WithHooks(consumer),
kgo.ConsumerGroup(cfg.GroupID),
kgo.ConsumeTopics(topics...),
// If a rebalance happens while the client is polling, the consumed
// records may belong to a partition which has been reassigned to a
// different consumer int he group. To avoid this scenario, Polls will
// block rebalances of partitions which would be lost, and the consumer
// MUST manually call `AllowRebalance`.
kgo.BlockRebalanceOnPoll(),
kgo.DisableAutoCommit(),
// Assign concurrent consumer callbacks to ensure consuming starts
// for newly assigned partitions, and consuming ceases from lost or
// revoked partitions.
kgo.OnPartitionsAssigned(consumer.assigned),
kgo.OnPartitionsLost(consumer.lost),
kgo.OnPartitionsRevoked(consumer.lost),
}
if cfg.ConsumeRegex {
opts = append(opts, kgo.ConsumeRegex())
}
if cfg.MaxPollWait > 0 {
opts = append(opts, kgo.FetchMaxWait(cfg.MaxPollWait))
}
if cfg.MaxPollBytes != 0 {
opts = append(opts, kgo.FetchMaxBytes(cfg.MaxPollBytes))
}
if cfg.MaxPollPartitionBytes != 0 {
opts = append(opts, kgo.FetchMaxPartitionBytes(cfg.MaxPollPartitionBytes))
}
if cfg.MaxConcurrentFetches > 0 {
opts = append(opts, kgo.MaxConcurrentFetches(cfg.MaxConcurrentFetches))
}
if cfg.PreferLagFn != nil {
opts = append(opts, kgo.ConsumePreferringLagFn(cfg.PreferLagFn))
}
if cfg.ShutdownGracePeriod <= 0 {
cfg.ShutdownGracePeriod = 5 * time.Second
}
if cfg.FetchMinBytes > 0 {
opts = append(opts, kgo.FetchMinBytes(cfg.FetchMinBytes))
}
if cfg.BrokerMaxReadBytes > 0 {
opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes))
}
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err)
}
if cfg.MaxPollRecords <= 0 {
cfg.MaxPollRecords = 500
}
return &Consumer{
cfg: cfg,
client: client,
consumer: consumer,
closed: make(chan struct{}),
running: make(chan struct{}),
forceClose: forceClose,
stopPoll: func() {},
tracer: cfg.tracerProvider().Tracer("kafka"),
}, nil
}