pulsaradmin/pkg/utils/topic_name.go (109 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package utils
import (
"fmt"
"net/url"
"strconv"
"strings"
"github.com/pkg/errors"
)
const (
PUBLICTENANT = "public"
DEFAULTNAMESPACE = "default"
PARTITIONEDTOPICSUFFIX = "-partition-"
)
type TopicName struct {
domain TopicDomain
tenant string
namespace string
topic string
partitionIndex int
namespaceName *NameSpaceName
}
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
func GetTopicName(completeName string) (*TopicName, error) {
var topicName TopicName
// The short topic name can be:
// - <topic>
// - <tenant>/<namespace>/<topic>
if !strings.Contains(completeName, "://") {
parts := strings.Split(completeName, "/")
switch len(parts) {
case 3:
completeName = persistent.String() + "://" + completeName
case 1:
completeName = persistent.String() + "://" + PUBLICTENANT + "/" + DEFAULTNAMESPACE + "/" + parts[0]
default:
return nil, errors.Errorf("Invalid short topic name '%s', it should be "+
"in the format of <tenant>/<namespace>/<topic> or <topic>", completeName)
}
}
// The fully qualified topic name can be:
// <domain>://<tenant>/<namespace>/<topic>
parts := strings.SplitN(completeName, "://", 2)
domain, err := ParseTopicDomain(parts[0])
if err != nil {
return nil, err
}
topicName.domain = domain
rest := parts[1]
parts = strings.SplitN(rest, "/", 3)
if len(parts) == 3 {
topicName.tenant = parts[0]
topicName.namespace = parts[1]
topicName.topic = parts[2]
topicName.partitionIndex = getPartitionIndex(completeName)
} else {
return nil, errors.Errorf("invalid topic name '%s', it should be in the format of "+
"<tenant>/<namespace>/<topic>", rest)
}
if topicName.topic == "" {
return nil, errors.New("topic name can not be empty")
}
n, err := GetNameSpaceName(topicName.tenant, topicName.namespace)
if err != nil {
return nil, err
}
topicName.namespaceName = n
return &topicName, nil
}
func (t *TopicName) String() string {
return fmt.Sprintf("%s://%s/%s/%s", t.domain, t.tenant, t.namespace, t.topic)
}
func (t *TopicName) GetDomain() TopicDomain {
return t.domain
}
func (t *TopicName) GetTenant() string {
return t.tenant
}
func (t *TopicName) GetNamespace() string {
return t.namespace
}
func (t *TopicName) IsPersistent() bool {
return t.domain == persistent
}
func (t *TopicName) GetRestPath() string {
return fmt.Sprintf("%s/%s/%s/%s", t.domain, t.tenant, t.namespace, t.topic)
}
func (t *TopicName) GetEncodedTopic() string {
return url.QueryEscape(t.topic)
}
func (t *TopicName) GetLocalName() string {
return t.topic
}
func (t *TopicName) GetPartition(index int) (*TopicName, error) {
if index < 0 {
return nil, errors.New("invalid partition index number")
}
if strings.Contains(t.String(), PARTITIONEDTOPICSUFFIX) {
return t, nil
}
topicNameWithPartition := t.String() + PARTITIONEDTOPICSUFFIX + strconv.Itoa(index)
return GetTopicName(topicNameWithPartition)
}
func (t *TopicName) GetPartitionIndex() int {
return t.partitionIndex
}
func getPartitionIndex(topic string) int {
if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) {
parts := strings.Split(topic, "-")
index, err := strconv.Atoi(parts[len(parts)-1])
if err == nil {
return index
}
}
return -1
}