go/protocol/internal/topic.go (176 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package internal import ( "maps" "regexp" "strings" "github.com/Azure/iot-operations-sdks/go/protocol/errors" ) type ( // Structure to apply tokens to a named topic pattern. TopicPattern struct { name string pattern string tokens map[string]string } // Structure to provide a topic filter that can parse out its named tokens. TopicFilter struct { filter string regex *regexp.Regexp names []string tokens map[string]string } ) const ( topicLabel = `[^ "+#{}/]+` topicToken = `\{` + topicLabel + `\}` topicLevel = `(` + topicLabel + `|` + topicToken + `)` topicMatch = `(` + topicLabel + `)` ) var ( matchLabel = regexp.MustCompile( `^` + topicLabel + `$`, ) matchToken = regexp.MustCompile( topicToken, // Lacks anchors because it is used for replacements. ) matchTopic = regexp.MustCompile( `^` + topicLabel + `(/` + topicLabel + `)*$`, ) matchPattern = regexp.MustCompile( `^` + topicLevel + `(/` + topicLevel + `)*$`, ) ) // Perform initial validation of a topic pattern component. func ValidateTopicPatternComponent( name, msgOnErr, pattern string, ) error { if !matchPattern.MatchString(pattern) { return &errors.Client{ Message: msgOnErr, Kind: errors.ConfigurationInvalid{ PropertyName: name, PropertyValue: pattern, }, } } return nil } // Create a new topic pattern and perform initial validations. func NewTopicPattern( name, pattern string, tokens map[string]string, namespace string, ) (*TopicPattern, error) { if namespace != "" { if !ValidTopic(namespace) { return nil, &errors.Client{ Message: "invalid topic namespace", Kind: errors.ConfigurationInvalid{ PropertyName: "TopicNamespace", PropertyValue: namespace, }, } } pattern = namespace + `/` + pattern } if !matchPattern.MatchString(pattern) { return nil, &errors.Client{ Message: "invalid topic pattern", Kind: errors.ConfigurationInvalid{ PropertyName: name, PropertyValue: pattern, }, } } if err := validateTokens(tokens); err != nil { return nil, err } for token, value := range tokens { pattern = strings.ReplaceAll(pattern, `{`+token+`}`, value) } return &TopicPattern{name, pattern, tokens}, nil } // Fully resolve a topic pattern for publishing. func (tp *TopicPattern) Topic(tokens map[string]string) (string, error) { topic := tp.pattern if err := validateTokens(tokens); err != nil { return "", err } for token, value := range tokens { topic = strings.ReplaceAll(topic, `{`+token+`}`, value) } if !ValidTopic(topic) { missingToken := matchToken.FindString(topic) if missingToken != "" { return "", &errors.Client{ Message: "invalid topic", Kind: errors.ConfigurationInvalid{ PropertyName: missingToken[1 : len(missingToken)-1], }, } } return "", &errors.Client{ Message: "invalid topic", Kind: errors.ConfigurationInvalid{ PropertyName: tp.name, PropertyValue: topic, }, } } return topic, nil } // Generate a filter for subscribing. Unresolved tokens are treated as "+" // wildcards for this purpose. func (tp *TopicPattern) Filter() (*TopicFilter, error) { // Get the remaining token names. names := matchToken.FindAllString(tp.pattern, -1) for i, token := range names { names[i] = token[1 : len(token)-1] } // Build a regexp matching all remaining tokens. escaped := regexp.QuoteMeta(tp.pattern) for _, token := range names { escaped = strings.ReplaceAll(escaped, `\{`+token+`\}`, topicMatch) } regex, err := regexp.Compile(escaped) if err != nil { return nil, err } // Replace remaining tokens with "+". filter := matchToken.ReplaceAllString(tp.pattern, `+`) return &TopicFilter{filter, regex, names, tp.tokens}, nil } // Filter provides the MQTT topic filter string. func (tf *TopicFilter) Filter() string { return tf.filter } // Tokens indicates whether the topic matched and resolves its topic tokens. func (tf *TopicFilter) Tokens(topic string) (map[string]string, bool) { match := tf.regex.FindStringSubmatch(topic) if match == nil { return nil, false } tokens := make(map[string]string, len(tf.names)+len(tf.tokens)) for i, val := range match[1:] { tokens[tf.names[i]] = val } maps.Copy(tokens, tf.tokens) return tokens, true } // Return whether the provided string is a fully-resolved topic. func ValidTopic(topic string) bool { return matchTopic.MatchString(topic) } // Return whether the provided string is a valid share name. func ValidateShareName(shareName string) error { if shareName != "" && !matchLabel.MatchString(shareName) { return &errors.Client{ Message: "invalid share name", Kind: errors.ConfigurationInvalid{ PropertyName: "ShareName", PropertyValue: shareName, }, } } return nil } // Return whether all the topic tokens are valid (to provide more specific // errors compared to just testing the resulting topic). Takes the error kind as // an argument since it may vary between ConfigurationInvalid (tokens provided // in the constructor) and ArgumentInvalid (tokens provided at call time). func validateTokens(tokens map[string]string) error { for k, v := range tokens { // We don't check for the presence of token names in the pattern because // it's valid to provide token values that aren't in the pattern. We do, // however, check to make sure they're valid token names so that we can // warn the user in cases that will never actually be valid. if !matchLabel.MatchString(k) || !matchLabel.MatchString(v) { return &errors.Client{ Message: "invalid topic token", Kind: errors.ConfigurationInvalid{ PropertyName: k, PropertyValue: v, }, } } } return nil }