pulsar-function-go/pf/topicName.go (72 lines of code) (raw):

// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // package pf import ( "errors" "fmt" "strconv" "strings" ) // TopicName abstract a struct contained in a Topic type TopicName struct { Domain string Namespace string Name string Partition int } const ( publicTenant = "public" defaultNamespace = "default" partitionedTopicSuffix = "-partition-" ) // ParseTopicName parse the given topic name and return TopicName. func ParseTopicName(topic string) (*TopicName, error) { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name if !strings.Contains(topic, "://") { // The short topic name can be: // - <topic> // - <tenant>/<namespace>/<topic> // - <tenant>/<cluster>/<namespace>/<topic> parts := strings.Split(topic, "/") if len(parts) == 3 || len(parts) == 4 { topic = "persistent://" + topic } else if len(parts) == 1 { topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0] } else { return nil, errors.New( "Invalid short topic name '" + topic + "', it should be in the format of <tenant>/<namespace>/<topic> or <topic>") } } tn := &TopicName{} // The fully qualified topic name can be in two different forms: // new: persistent://tenant/namespace/topic // legacy: persistent://tenant/cluster/namespace/topic parts := strings.SplitN(topic, "://", 2) domain := parts[0] if domain != "persistent" && domain != "non-persistent" { return nil, errors.New("Invalid topic domain: " + domain) } tn.Domain = domain rest := parts[1] var err error // The rest of the name can be in different forms: // new: tenant/namespace/<localName> // legacy: tenant/cluster/namespace/<localName> // Examples of localName: // 1. some/name/xyz// // 2. /xyz-123/feeder-2 parts = strings.SplitN(rest, "/", 4) if len(parts) == 3 { // New topic name without cluster name tn.Namespace = parts[0] + "/" + parts[1] } else if len(parts) == 4 { // Legacy topic name that includes cluster name tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2]) } else { return nil, errors.New("Invalid topic name: " + topic) } tn.Name = topic tn.Partition, err = getPartitionIndex(topic) if err != nil { return nil, err } return tn, nil } // NameWithoutPartition returns the topic name, sans the partition portion func (tn *TopicName) NameWithoutPartition() string { if tn.Partition < 0 { return tn.Name } idx := strings.LastIndex(tn.Name, partitionedTopicSuffix) if idx > 0 { return tn.Name[:idx] } return tn.Name } func getPartitionIndex(topic string) (int, error) { if strings.Contains(topic, partitionedTopicSuffix) { idx := strings.LastIndex(topic, "-") + 1 return strconv.Atoi(topic[idx:]) } return -1, nil }