func()

in kafka/consumer.go [337:360]


func (c *Consumer) Run(ctx context.Context) error {
	c.mu.Lock()
	select {
	case <-c.running:
		c.mu.Unlock()
		return apmqueue.ErrConsumerAlreadyRunning
	default:
		close(c.running)
	}
	// Create a new context from the passed context, used exclusively for
	// kgo.Client.* calls. c.stopFetch is called by consumer.Close() to
	// cancel this context as part of the graceful shutdown sequence.
	var clientCtx context.Context
	clientCtx, c.stopPoll = context.WithCancel(ctx)
	c.mu.Unlock()
	for {
		if err := c.fetch(clientCtx); err != nil {
			if errors.Is(err, context.Canceled) {
				return nil // Return no error if err == context.Canceled.
			}
			return fmt.Errorf("cannot fetch records: %w", err)
		}
	}
}