func()

in kafka/consumer.go [293:325]


func (c *Consumer) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()
	select {
	case <-c.closed:
	default:
		close(c.closed)
		defer c.client.CloseAllowingRebalance() // Last, close the `kgo.Client`
		// Cancel the context used in client.PollRecords, triggering graceful
		// cancellation.
		c.stopPoll()
		stopped := make(chan struct{})
		go func() {
			defer close(stopped)
			// Close all partition consumers first to ensure there aren't any
			// records being processed while the kgo.Client is being closed.
			// Also ensures that commits can be issued after the records are
			// processed when AtLeastOnceDelivery is configured.
			c.consumer.close()
		}()
		// Wait for the consumers to process any in-flight records, or cancel
		// the underlying processing context if they aren't stopped in time.
		select {
		case <-time.After(c.cfg.ShutdownGracePeriod): // Timeout
			c.forceClose(fmt.Errorf(
				"consumer: close: timeout waiting for consumers to stop (%s)",
				c.cfg.ShutdownGracePeriod.String(),
			))
		case <-stopped: // Stopped within c.cfg.ShutdownGracePeriod.
		}
	}
	return nil
}