in pulsar/consumer_partition.go [1800:1850]
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
state := pc.getConsumerState()
if state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}
pc.setConsumerState(consumerClosing)
pc.log.Infof("Closing consumer=%d", pc.consumerID)
requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
if err != nil {
pc.log.WithError(err).Warn("Failed to close consumer")
} else {
pc.log.Info("Closed consumer")
}
pc.compressionProviders.Range(func(_, v interface{}) bool {
if provider, ok := v.(compression.Provider); ok {
provider.Close()
} else {
err := fmt.Errorf("unexpected compression provider type: %T", v)
pc.log.WithError(err).Warn("Failed to close compression provider")
}
return true
})
pc.setConsumerState(consumerClosed)
pc._getConn().DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
close(pc.closeCh)
}