in kafka/consumer.go [293:325]
func (c *Consumer) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
select {
case <-c.closed:
default:
close(c.closed)
defer c.client.CloseAllowingRebalance() // Last, close the `kgo.Client`
// Cancel the context used in client.PollRecords, triggering graceful
// cancellation.
c.stopPoll()
stopped := make(chan struct{})
go func() {
defer close(stopped)
// Close all partition consumers first to ensure there aren't any
// records being processed while the kgo.Client is being closed.
// Also ensures that commits can be issued after the records are
// processed when AtLeastOnceDelivery is configured.
c.consumer.close()
}()
// Wait for the consumers to process any in-flight records, or cancel
// the underlying processing context if they aren't stopped in time.
select {
case <-time.After(c.cfg.ShutdownGracePeriod): // Timeout
c.forceClose(fmt.Errorf(
"consumer: close: timeout waiting for consumers to stop (%s)",
c.cfg.ShutdownGracePeriod.String(),
))
case <-stopped: // Stopped within c.cfg.ShutdownGracePeriod.
}
}
return nil
}