func()

in pulsar/internal/connection.go [532:621]


func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
	c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
	c.setLastDataReceived(time.Now())

	switch *cmd.Type {
	case pb.BaseCommand_SUCCESS:
		c.handleResponse(cmd.Success.GetRequestId(), cmd)

	case pb.BaseCommand_PRODUCER_SUCCESS:
		if !cmd.ProducerSuccess.GetProducerReady() {
			request, ok := c.findPendingRequest(cmd.ProducerSuccess.GetRequestId())
			if ok {
				request.callback(cmd, nil)
			}
		} else {
			c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
		}
	case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
		c.checkServerError(cmd.PartitionMetadataResponse.Error)
		c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)

	case pb.BaseCommand_LOOKUP_RESPONSE:
		lookupResult := cmd.LookupTopicResponse
		c.checkServerError(lookupResult.Error)
		c.handleResponse(lookupResult.GetRequestId(), cmd)

	case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
		c.handleResponse(cmd.ConsumerStatsResponse.GetRequestId(), cmd)

	case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
		c.handleResponse(cmd.GetLastMessageIdResponse.GetRequestId(), cmd)

	case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
		c.handleResponse(cmd.GetTopicsOfNamespaceResponse.GetRequestId(), cmd)

	case pb.BaseCommand_GET_SCHEMA_RESPONSE:
		c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)

	case pb.BaseCommand_GET_OR_CREATE_SCHEMA_RESPONSE:
		c.handleResponse(cmd.GetOrCreateSchemaResponse.GetRequestId(), cmd)

	case pb.BaseCommand_ERROR:
		c.handleResponseError(cmd.GetError())

	case pb.BaseCommand_SEND_ERROR:
		c.handleSendError(cmd.GetSendError())

	case pb.BaseCommand_CLOSE_PRODUCER:
		c.handleCloseProducer(cmd.GetCloseProducer())

	case pb.BaseCommand_CLOSE_CONSUMER:
		c.handleCloseConsumer(cmd.GetCloseConsumer())

	case pb.BaseCommand_TOPIC_MIGRATED:
		c.handleTopicMigrated(cmd.GetTopicMigrated())

	case pb.BaseCommand_AUTH_CHALLENGE:
		c.handleAuthChallenge(cmd.GetAuthChallenge())

	case pb.BaseCommand_SEND_RECEIPT:
		c.handleSendReceipt(cmd.GetSendReceipt())

	case pb.BaseCommand_MESSAGE:
		c.handleMessage(cmd.GetMessage(), headersAndPayload)

	case pb.BaseCommand_ACK_RESPONSE:
		c.handleAckResponse(cmd.GetAckResponse())

	case pb.BaseCommand_PING:
		c.handlePing()
	case pb.BaseCommand_PONG:
		c.handlePong()
	case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
		c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
	case pb.BaseCommand_NEW_TXN_RESPONSE:
		c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
	case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
		c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
	case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
		c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
	case pb.BaseCommand_END_TXN_RESPONSE:
		c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
	case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
		c.handleActiveConsumerChange(cmd.GetActiveConsumerChange())

	default:
		c.log.Errorf("Received invalid command type: %s", cmd.Type)
		c.Close()
	}
}