in pulsar/internal/connection.go [532:621]
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
c.setLastDataReceived(time.Now())
switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
c.handleResponse(cmd.Success.GetRequestId(), cmd)
case pb.BaseCommand_PRODUCER_SUCCESS:
if !cmd.ProducerSuccess.GetProducerReady() {
request, ok := c.findPendingRequest(cmd.ProducerSuccess.GetRequestId())
if ok {
request.callback(cmd, nil)
}
} else {
c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
}
case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
c.checkServerError(cmd.PartitionMetadataResponse.Error)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)
case pb.BaseCommand_LOOKUP_RESPONSE:
lookupResult := cmd.LookupTopicResponse
c.checkServerError(lookupResult.Error)
c.handleResponse(lookupResult.GetRequestId(), cmd)
case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
c.handleResponse(cmd.ConsumerStatsResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
c.handleResponse(cmd.GetLastMessageIdResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
c.handleResponse(cmd.GetTopicsOfNamespaceResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_SCHEMA_RESPONSE:
c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
case pb.BaseCommand_GET_OR_CREATE_SCHEMA_RESPONSE:
c.handleResponse(cmd.GetOrCreateSchemaResponse.GetRequestId(), cmd)
case pb.BaseCommand_ERROR:
c.handleResponseError(cmd.GetError())
case pb.BaseCommand_SEND_ERROR:
c.handleSendError(cmd.GetSendError())
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
case pb.BaseCommand_CLOSE_CONSUMER:
c.handleCloseConsumer(cmd.GetCloseConsumer())
case pb.BaseCommand_TOPIC_MIGRATED:
c.handleTopicMigrated(cmd.GetTopicMigrated())
case pb.BaseCommand_AUTH_CHALLENGE:
c.handleAuthChallenge(cmd.GetAuthChallenge())
case pb.BaseCommand_SEND_RECEIPT:
c.handleSendReceipt(cmd.GetSendReceipt())
case pb.BaseCommand_MESSAGE:
c.handleMessage(cmd.GetMessage(), headersAndPayload)
case pb.BaseCommand_ACK_RESPONSE:
c.handleAckResponse(cmd.GetAckResponse())
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
c.handlePong()
case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
case pb.BaseCommand_NEW_TXN_RESPONSE:
c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_END_TXN_RESPONSE:
c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
c.handleActiveConsumerChange(cmd.GetActiveConsumerChange())
default:
c.log.Errorf("Received invalid command type: %s", cmd.Type)
c.Close()
}
}