in pulsar/internal/topic_name.go [45:107]
func ParseTopicName(topic string) (*TopicName, error) {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
if !strings.Contains(topic, "://") {
// The short topic name can be:
// - <topic>
// - <tenant>/<namespace>/<topic>
// - <tenant>/<cluster>/<namespace>/<topic>
parts := strings.Split(topic, "/")
if len(parts) == 3 || len(parts) == 4 {
topic = "persistent://" + topic
} else if len(parts) == 1 {
topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
} else {
return nil, errors.New(
"Invalid short topic name '" + topic +
"', it should be in the format of <tenant>/<namespace>/<topic> or <topic>")
}
}
tn := &TopicName{}
// The fully qualified topic name can be in two different forms:
// new: persistent://tenant/namespace/topic
// legacy: persistent://tenant/cluster/namespace/topic
parts := strings.SplitN(topic, "://", 2)
domain := parts[0]
if domain != "persistent" && domain != "non-persistent" {
return nil, errors.New("Invalid topic domain: " + domain)
}
tn.Domain = domain
rest := parts[1]
var err error
// The rest of the name can be in different forms:
// new: tenant/namespace/<localName>
// legacy: tenant/cluster/namespace/<localName>
// Examples of localName:
// 1. some/name/xyz//
// 2. /xyz-123/feeder-2
parts = strings.SplitN(rest, "/", 4)
tn.Tenant = parts[0]
if len(parts) == 3 {
// New topic name without cluster name
tn.Namespace = parts[0] + "/" + parts[1]
tn.Topic = parts[2]
} else if len(parts) == 4 {
// Legacy topic name that includes cluster name
tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
tn.Topic = parts[3]
} else {
return nil, errors.New("Invalid topic name: " + topic)
}
tn.Name = topic
tn.Partition, err = getPartitionIndex(topic)
if err != nil {
return nil, err
}
return tn, nil
}