in kafka/consumer.go [337:360]
func (c *Consumer) Run(ctx context.Context) error {
c.mu.Lock()
select {
case <-c.running:
c.mu.Unlock()
return apmqueue.ErrConsumerAlreadyRunning
default:
close(c.running)
}
// Create a new context from the passed context, used exclusively for
// kgo.Client.* calls. c.stopFetch is called by consumer.Close() to
// cancel this context as part of the graceful shutdown sequence.
var clientCtx context.Context
clientCtx, c.stopPoll = context.WithCancel(ctx)
c.mu.Unlock()
for {
if err := c.fetch(clientCtx); err != nil {
if errors.Is(err, context.Canceled) {
return nil // Return no error if err == context.Canceled.
}
return fmt.Errorf("cannot fetch records: %w", err)
}
}
}