func NewConsumer()

in kafka/consumer.go [205:290]


func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
	if err := cfg.finalize(); err != nil {
		return nil, fmt.Errorf("kafka: invalid consumer config: %w", err)
	}
	// `forceClose` is called by `Consumer.Close()` if / when the
	// `cfg.ShutdownGracePeriod` is exceeded.
	processingCtx, forceClose := context.WithCancelCause(context.Background())
	namespacePrefix := cfg.namespacePrefix()
	consumer := &consumer{
		topicPrefix: namespacePrefix,
		logFieldFn:  cfg.TopicLogFieldFunc,
		assignments: make(map[topicPartition]*pc),
		processor:   cfg.Processor,
		logger:      cfg.Logger.Named("partition"),
		delivery:    cfg.Delivery,
		ctx:         processingCtx,
	}
	topics := make([]string, len(cfg.Topics))
	for i, topic := range cfg.Topics {
		topics[i] = fmt.Sprintf("%s%s", consumer.topicPrefix, topic)
	}
	opts := []kgo.Opt{
		// Injects the kgo.Client context as the record.Context.
		kgo.WithHooks(consumer),
		kgo.ConsumerGroup(cfg.GroupID),
		kgo.ConsumeTopics(topics...),
		// If a rebalance happens while the client is polling, the consumed
		// records may belong to a partition which has been reassigned to a
		// different consumer int he group. To avoid this scenario, Polls will
		// block rebalances of partitions which would be lost, and the consumer
		// MUST manually call `AllowRebalance`.
		kgo.BlockRebalanceOnPoll(),
		kgo.DisableAutoCommit(),
		// Assign concurrent consumer callbacks to ensure consuming starts
		// for newly assigned partitions, and consuming ceases from lost or
		// revoked partitions.
		kgo.OnPartitionsAssigned(consumer.assigned),
		kgo.OnPartitionsLost(consumer.lost),
		kgo.OnPartitionsRevoked(consumer.lost),
	}
	if cfg.ConsumeRegex {
		opts = append(opts, kgo.ConsumeRegex())
	}
	if cfg.MaxPollWait > 0 {
		opts = append(opts, kgo.FetchMaxWait(cfg.MaxPollWait))
	}
	if cfg.MaxPollBytes != 0 {
		opts = append(opts, kgo.FetchMaxBytes(cfg.MaxPollBytes))
	}
	if cfg.MaxPollPartitionBytes != 0 {
		opts = append(opts, kgo.FetchMaxPartitionBytes(cfg.MaxPollPartitionBytes))
	}
	if cfg.MaxConcurrentFetches > 0 {
		opts = append(opts, kgo.MaxConcurrentFetches(cfg.MaxConcurrentFetches))
	}
	if cfg.PreferLagFn != nil {
		opts = append(opts, kgo.ConsumePreferringLagFn(cfg.PreferLagFn))
	}
	if cfg.ShutdownGracePeriod <= 0 {
		cfg.ShutdownGracePeriod = 5 * time.Second
	}
	if cfg.FetchMinBytes > 0 {
		opts = append(opts, kgo.FetchMinBytes(cfg.FetchMinBytes))
	}
	if cfg.BrokerMaxReadBytes > 0 {
		opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes))
	}

	client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
	if err != nil {
		return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err)
	}
	if cfg.MaxPollRecords <= 0 {
		cfg.MaxPollRecords = 500
	}
	return &Consumer{
		cfg:        cfg,
		client:     client,
		consumer:   consumer,
		closed:     make(chan struct{}),
		running:    make(chan struct{}),
		forceClose: forceClose,
		stopPoll:   func() {},
		tracer:     cfg.tracerProvider().Tracer("kafka"),
	}, nil
}