func baseCommand()

in pulsar/internal/commands.go [173:236]


func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
	cmd := &pb.BaseCommand{
		Type: &cmdType,
	}
	switch cmdType {
	case pb.BaseCommand_CONNECT:
		cmd.Connect = msg.(*pb.CommandConnect)
	case pb.BaseCommand_LOOKUP:
		cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
	case pb.BaseCommand_PARTITIONED_METADATA:
		cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
	case pb.BaseCommand_PRODUCER:
		cmd.Producer = msg.(*pb.CommandProducer)
	case pb.BaseCommand_SUBSCRIBE:
		cmd.Subscribe = msg.(*pb.CommandSubscribe)
	case pb.BaseCommand_FLOW:
		cmd.Flow = msg.(*pb.CommandFlow)
	case pb.BaseCommand_PING:
		cmd.Ping = msg.(*pb.CommandPing)
	case pb.BaseCommand_PONG:
		cmd.Pong = msg.(*pb.CommandPong)
	case pb.BaseCommand_SEND:
		cmd.Send = msg.(*pb.CommandSend)
	case pb.BaseCommand_SEND_ERROR:
		cmd.SendError = msg.(*pb.CommandSendError)
	case pb.BaseCommand_CLOSE_PRODUCER:
		cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
	case pb.BaseCommand_CLOSE_CONSUMER:
		cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
	case pb.BaseCommand_ACK:
		cmd.Ack = msg.(*pb.CommandAck)
	case pb.BaseCommand_SEEK:
		cmd.Seek = msg.(*pb.CommandSeek)
	case pb.BaseCommand_UNSUBSCRIBE:
		cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
	case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
		cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
	case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE:
		cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace)
	case pb.BaseCommand_GET_LAST_MESSAGE_ID:
		cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
	case pb.BaseCommand_AUTH_RESPONSE:
		cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
	case pb.BaseCommand_GET_OR_CREATE_SCHEMA:
		cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
	case pb.BaseCommand_GET_SCHEMA:
		cmd.GetSchema = msg.(*pb.CommandGetSchema)
	case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
		cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
	case pb.BaseCommand_NEW_TXN:
		cmd.NewTxn = msg.(*pb.CommandNewTxn)
	case pb.BaseCommand_ADD_PARTITION_TO_TXN:
		cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
	case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
		cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
	case pb.BaseCommand_END_TXN:
		cmd.EndTxn = msg.(*pb.CommandEndTxn)

	default:
		panic(fmt.Sprintf("Missing command type: %v", cmdType))
	}

	return cmd
}