lib/TopicName.cc (209 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "TopicName.h" #include <boost/algorithm/string.hpp> #include <exception> #include <iostream> #include <memory> #include <sstream> #include <string> #include <vector> #include "LogUtils.h" #include "NamedEntity.h" #include "NamespaceName.h" DECLARE_LOG_OBJECT() namespace pulsar { const std::string TopicDomain::Persistent = "persistent"; const std::string TopicDomain::NonPersistent = "non-persistent"; static const std::string PARTITION_NAME_SUFFIX = "-partition-"; typedef std::unique_lock<std::mutex> Lock; // static members CURL* TopicName::curl = NULL; std::mutex TopicName::curlHandleMutex; CURL* TopicName::getCurlHandle() { if (curl == NULL) { // this handle can not be shared across threads, so had to get here everytime curl = curl_easy_init(); } return curl; } //******************************************************************** TopicName::TopicName() {} bool TopicName::init(const std::string& topicName) { topicName_ = topicName; if (topicName.find("://") == std::string::npos) { std::string topicNameCopy_ = topicName; std::vector<std::string> pathTokens; boost::algorithm::split(pathTokens, topicNameCopy_, boost::algorithm::is_any_of("/")); if (pathTokens.size() == 3) { topicName_ = TopicDomain::Persistent + "://" + pathTokens[0] + "/" + pathTokens[1] + "/" + pathTokens[2]; } else if (pathTokens.size() == 1) { topicName_ = TopicDomain::Persistent + "://public/default/" + pathTokens[0]; } else { LOG_ERROR( "Topic name is not valid, short topic name should be in the format of '<topic>' or " "'<property>/<namespace>/<topic>' - " << topicName); return false; } } isV2Topic_ = parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_); if (isV2Topic_ && !cluster_.empty()) { LOG_ERROR("V2 Topic name is not valid, cluster is not empty - " << topicName_ << " : cluster " << cluster_); return false; } else if (!isV2Topic_ && cluster_.empty()) { LOG_ERROR("V1 Topic name is not valid, cluster is empty - " << topicName_); return false; } if (localName_.empty()) { LOG_ERROR("Topic name is not valid, topic name is empty - " << topicName_); return false; } if (isV2Topic_ && cluster_.empty()) { namespaceName_ = NamespaceName::get(property_, namespacePortion_); } else { namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_); } partition_ = TopicName::getPartitionIndex(localName_); return true; } bool TopicName::parse(const std::string& topicName, std::string& domain, std::string& property, std::string& cluster, std::string& namespacePortion, std::string& localName) { std::string topicNameCopy = topicName; boost::replace_first(topicNameCopy, "://", "/"); std::vector<std::string> pathTokens; boost::algorithm::split(pathTokens, topicNameCopy, boost::algorithm::is_any_of("/")); if (pathTokens.size() < 4) { LOG_ERROR("Topic name is not valid, does not have enough parts - " << topicName); return false; } domain = pathTokens[0]; size_t numSlashIndexes; bool isV2Topic; if (pathTokens.size() == 4) { // New topic name without cluster name property = pathTokens[1]; cluster = ""; namespacePortion = pathTokens[2]; localName = pathTokens[3]; numSlashIndexes = 3; isV2Topic = true; } else { // Legacy topic name that includes cluster name property = pathTokens[1]; cluster = pathTokens[2]; namespacePortion = pathTokens[3]; localName = pathTokens[4]; numSlashIndexes = 4; isV2Topic = false; } size_t slashIndex = -1; // find `numSlashIndexes` '/', whatever is left is topic local name for (int i = 0; i < numSlashIndexes; i++) { slashIndex = topicNameCopy.find('/', slashIndex + 1); } // get index to next char to '/' slashIndex++; localName = topicNameCopy.substr(slashIndex, (topicNameCopy.size() - slashIndex)); return isV2Topic; } std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) { Lock lock(curlHandleMutex); std::string nameAfterEncoding; if (getCurlHandle()) { char* encodedName = curl_easy_escape(getCurlHandle(), nameBeforeEncoding.c_str(), nameBeforeEncoding.size()); if (encodedName) { nameAfterEncoding.assign(encodedName); curl_free(encodedName); } else { LOG_ERROR("Unable to encode the name using curl_easy_escape, name - " << nameBeforeEncoding); } } else { LOG_ERROR("Unable to get CURL handle to encode the name - " << nameBeforeEncoding); } return nameAfterEncoding; } bool TopicName::isV2Topic() const { return isV2Topic_; } std::string TopicName::getDomain() const { return domain_; } std::string TopicName::getProperty() const { return property_; } std::string TopicName::getCluster() const { return cluster_; } std::string TopicName::getNamespacePortion() const { return namespacePortion_; } std::string TopicName::getLocalName() { return localName_; } std::string TopicName::getEncodedLocalName() const { return getEncodedName(localName_); } bool TopicName::operator==(const TopicName& other) const { return (this->topicName_.compare(other.topicName_) == 0); } bool TopicName::validate() { // Check if domain matches with TopicDomain::Persistent, in future check "memory" when server is // ready. if (domain_.compare(TopicDomain::Persistent) != 0 && domain_.compare(TopicDomain::NonPersistent) != 0) { return false; } // cluster_ can be empty if (!isV2Topic_ && !property_.empty() && !cluster_.empty() && !namespacePortion_.empty() && !localName_.empty()) { // v1 topic format return NamedEntity::checkName(property_) && NamedEntity::checkName(cluster_) && NamedEntity::checkName(namespacePortion_); } else if (isV2Topic_ && !property_.empty() && !namespacePortion_.empty() && !localName_.empty()) { // v2 topic format return NamedEntity::checkName(property_) && NamedEntity::checkName(namespacePortion_); } else { return false; } } std::shared_ptr<TopicName> TopicName::get(const std::string& topicName) { std::shared_ptr<TopicName> ptr(new TopicName()); if (!ptr->init(topicName)) { LOG_ERROR("Topic name initialization failed"); return std::shared_ptr<TopicName>(); } if (ptr->validate()) { return ptr; } else { LOG_ERROR("Topic name validation Failed - " << topicName); return std::shared_ptr<TopicName>(); } } // TODO - for now return empty string if there's any error in format, later think about better error handling std::string TopicName::getLookupName() { std::stringstream ss; std::string seperator("/"); if (isV2Topic_ && cluster_.empty()) { ss << domain_ << seperator << property_ << seperator << namespacePortion_ << seperator << getEncodedLocalName(); } else { ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_ << seperator << getEncodedLocalName(); } return ss.str(); } std::string TopicName::toString() const { std::stringstream ss; std::string seperator("/"); if (isV2Topic_ && cluster_.empty()) { ss << domain_ << "://" << property_ << seperator << namespacePortion_ << seperator << localName_; } else { ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_ << seperator << localName_; } return ss.str(); } bool TopicName::isPersistent() const { return this->domain_ == TopicDomain::Persistent; } std::string TopicName::getTopicPartitionName(unsigned int partition) const { std::stringstream topicPartitionName; // make this topic name as well topicPartitionName << toString() << PARTITION_NAME_SUFFIX << partition; return topicPartitionName.str(); } int TopicName::getPartitionIndex(const std::string& topic) { const auto& suffix = PARTITION_NAME_SUFFIX; const size_t pos = topic.rfind(suffix); if (pos == std::string::npos) { return -1; } try { // TODO: When handling topic name like "xxx-partition-00", it should return -1. // But here it will returns, which is consistent with Java client's behavior // Another corner case: "xxx-partition--2" => 2 (not -1) return std::stoi(topic.substr(topic.rfind('-') + 1)); } catch (const std::exception&) { return -1; } } NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; } std::string TopicName::removeDomain(const std::string& topicName) { auto index = topicName.find("://"); if (index != std::string::npos) { return topicName.substr(index + 3, topicName.length()); } return topicName; } bool TopicName::containsDomain(const std::string& topicName) { return topicName.find("://") != std::string::npos; } } // namespace pulsar