in pulsar/internal/commands.go [173:236]
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
}
switch cmdType {
case pb.BaseCommand_CONNECT:
cmd.Connect = msg.(*pb.CommandConnect)
case pb.BaseCommand_LOOKUP:
cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
case pb.BaseCommand_PARTITIONED_METADATA:
cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
case pb.BaseCommand_PRODUCER:
cmd.Producer = msg.(*pb.CommandProducer)
case pb.BaseCommand_SUBSCRIBE:
cmd.Subscribe = msg.(*pb.CommandSubscribe)
case pb.BaseCommand_FLOW:
cmd.Flow = msg.(*pb.CommandFlow)
case pb.BaseCommand_PING:
cmd.Ping = msg.(*pb.CommandPing)
case pb.BaseCommand_PONG:
cmd.Pong = msg.(*pb.CommandPong)
case pb.BaseCommand_SEND:
cmd.Send = msg.(*pb.CommandSend)
case pb.BaseCommand_SEND_ERROR:
cmd.SendError = msg.(*pb.CommandSendError)
case pb.BaseCommand_CLOSE_PRODUCER:
cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
case pb.BaseCommand_CLOSE_CONSUMER:
cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
case pb.BaseCommand_ACK:
cmd.Ack = msg.(*pb.CommandAck)
case pb.BaseCommand_SEEK:
cmd.Seek = msg.(*pb.CommandSeek)
case pb.BaseCommand_UNSUBSCRIBE:
cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE:
cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace)
case pb.BaseCommand_GET_LAST_MESSAGE_ID:
cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
case pb.BaseCommand_GET_OR_CREATE_SCHEMA:
cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
case pb.BaseCommand_GET_SCHEMA:
cmd.GetSchema = msg.(*pb.CommandGetSchema)
case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
case pb.BaseCommand_NEW_TXN:
cmd.NewTxn = msg.(*pb.CommandNewTxn)
case pb.BaseCommand_ADD_PARTITION_TO_TXN:
cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
case pb.BaseCommand_END_TXN:
cmd.EndTxn = msg.(*pb.CommandEndTxn)
default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
return cmd
}