func()

in pulsar/consumer_partition.go [1565:1615]


func (pc *partitionConsumer) internalClose(req *closeRequest) {
	defer close(req.doneCh)
	state := pc.getConsumerState()
	if state != consumerReady {
		// this might be redundant but to ensure nack tracker is closed
		if pc.nackTracker != nil {
			pc.nackTracker.Close()
		}
		return
	}

	if state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
		if pc.nackTracker != nil {
			pc.nackTracker.Close()
		}
		return
	}

	pc.setConsumerState(consumerClosing)
	pc.log.Infof("Closing consumer=%d", pc.consumerID)

	requestID := pc.client.rpcClient.NewRequestID()
	cmdClose := &pb.CommandCloseConsumer{
		ConsumerId: proto.Uint64(pc.consumerID),
		RequestId:  proto.Uint64(requestID),
	}
	_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
	if err != nil {
		pc.log.WithError(err).Warn("Failed to close consumer")
	} else {
		pc.log.Info("Closed consumer")
	}

	pc.compressionProviders.Range(func(_, v interface{}) bool {
		if provider, ok := v.(compression.Provider); ok {
			provider.Close()
		} else {
			err := fmt.Errorf("unexpected compression provider type: %T", v)
			pc.log.WithError(err).Warn("Failed to close compression provider")
		}
		return true
	})

	pc.setConsumerState(consumerClosed)
	pc._getConn().DeleteConsumeHandler(pc.consumerID)
	if pc.nackTracker != nil {
		pc.nackTracker.Close()
	}
	close(pc.closeCh)
}