lib/HTTPLookupService.cc (353 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "HTTPLookupService.h" #include <pulsar/Version.h> #include <boost/property_tree/json_parser.hpp> #include <boost/property_tree/ptree.hpp> #include "CurlWrapper.h" #include "ExecutorService.h" #include "Int64SerDes.h" #include "JsonUtils.h" #include "LogUtils.h" #include "NamespaceName.h" #include "SchemaUtils.h" #include "ServiceNameResolver.h" #include "TopicName.h" namespace ptree = boost::property_tree; DECLARE_LOG_OBJECT() namespace pulsar { const static std::string V1_PATH = "/lookup/v2/destination/"; const static std::string V2_PATH = "/lookup/v2/topic/"; const static std::string ADMIN_PATH_V1 = "/admin/"; const static std::string ADMIN_PATH_V2 = "/admin/v2/"; const static std::string PARTITION_METHOD_NAME = "partitions"; const static int NUMBER_OF_LOOKUP_THREADS = 1; HTTPLookupService::HTTPLookupService(const std::string &serviceUrl, const ClientConfiguration &clientConfiguration, const AuthenticationPtr &authData) : executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)), serviceNameResolver_(serviceUrl), authenticationPtr_(authData), lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()), tlsPrivateFilePath_(clientConfiguration.getTlsPrivateKeyFilePath()), tlsCertificateFilePath_(clientConfiguration.getTlsCertificateFilePath()), tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()), isUseTls_(clientConfiguration.isUseTls()), tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()), tlsValidateHostname_(clientConfiguration.isValidateHostName()) {} auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFuture { LookupResultPromise promise; const auto &url = serviceNameResolver_.resolveHost(); std::stringstream completeUrlStream; if (topicName.isV2Topic()) { completeUrlStream << url << V2_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/' << topicName.getNamespacePortion() << '/' << topicName.getEncodedLocalName(); } else { completeUrlStream << url << V1_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/' << topicName.getCluster() << '/' << topicName.getNamespacePortion() << '/' << topicName.getEncodedLocalName(); } const auto completeUrl = completeUrlStream.str(); auto self = shared_from_this(); executorProvider_->get()->postWork([this, self, promise, completeUrl] { std::string responseData; Result result = sendHTTPRequest(completeUrl, responseData); if (result != ResultOk) { promise.setFailed(result); } else { const auto lookupDataResultPtr = parseLookupData(responseData); const auto brokerAddress = (serviceNameResolver_.useTls() ? lookupDataResultPtr->getBrokerUrlTls() : lookupDataResultPtr->getBrokerUrl()); promise.setValue({brokerAddress, brokerAddress}); } }); return promise.getFuture(); } Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync( const TopicNamePtr &topicName) { LookupPromise promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); if (topicName->isV2Topic()) { completeUrlStream << url << ADMIN_PATH_V2 << topicName->getDomain() << '/' << topicName->getProperty() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME; } else { completeUrlStream << url << ADMIN_PATH_V1 << topicName->getDomain() << '/' << topicName->getProperty() << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME; } completeUrlStream << "?checkAllowAutoCreation=true"; executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData)); return promise.getFuture(); } Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync( const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) { NamespaceTopicsPromise promise; std::stringstream completeUrlStream; auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) { switch (mode) { case CommandGetTopicsOfNamespace_Mode_PERSISTENT: return "PERSISTENT"; case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT: return "NON_PERSISTENT"; case CommandGetTopicsOfNamespace_Mode_ALL: return "ALL"; default: return "PERSISTENT"; } }; const auto &url = serviceNameResolver_.resolveHost(); if (nsName->isV2()) { completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' << "topics?mode=" << convertRegexSubMode(mode); } else { completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' << "destinations?mode=" << convertRegexSubMode(mode); } executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest, shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); } Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topicName, const std::string &version) { Promise<Result, SchemaInfo> promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); if (topicName->isV2Topic()) { completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" << topicName->getProperty() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << "/schema"; } else { completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" << topicName->getProperty() << '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << "/schema"; } if (!version.empty()) { completeUrlStream << "/" << fromBigEndianBytes(version); } executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); } void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl) { std::string responseData; Result result = sendHTTPRequest(completeUrl, responseData); if (result != ResultOk) { promise.setFailed(result); } else { promise.setValue(parseNamespaceTopicsData(responseData)); } } Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData) { long responseCode = -1; return sendHTTPRequest(completeUrl, responseData, responseCode); } Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData, long &responseCode) { // Authorization data AuthenticationDataPtr authDataContent; Result authResult = authenticationPtr_->getAuthData(authDataContent); if (authResult != ResultOk) { LOG_ERROR("Failed to getAuthData: " << authResult); return authResult; } CurlWrapper curl; if (!curl.init()) { LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); return ResultLookupError; } std::unique_ptr<CurlWrapper::TlsContext> tlsContext; if (isUseTls_) { tlsContext.reset(new CurlWrapper::TlsContext); tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_; tlsContext->validateHostname = tlsValidateHostname_; tlsContext->allowInsecure = tlsAllowInsecure_; if (authDataContent->hasDataForTls()) { tlsContext->certPath = authDataContent->getTlsCertificates(); tlsContext->keyPath = authDataContent->getTlsPrivateKey(); } else { tlsContext->certPath = tlsCertificateFilePath_; tlsContext->keyPath = tlsPrivateFilePath_; } } LOG_INFO("Curl Lookup Request sent for " << completeUrl); CurlWrapper::Options options; options.timeoutInSeconds = lookupTimeoutInSeconds_; options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR; options.maxLookupRedirects = maxLookupRedirects_; auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get()); const auto &error = result.error; if (!error.empty()) { LOG_ERROR(completeUrl << " failed: " << error); return ResultConnectError; } responseData = result.responseData; responseCode = result.responseCode; auto res = result.code; if (res == CURLE_OK) { LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode); } else if (res == CURLE_TOO_MANY_REDIRECTS) { LOG_ERROR("Response received for url " << completeUrl << ": " << curl_easy_strerror(res) << ", curl error: " << result.serverError << ", redirect URL: " << result.redirectUrl); } else { LOG_ERROR("Response failed for url " << completeUrl << ": " << curl_easy_strerror(res) << ", curl error: " << result.serverError); } switch (res) { case CURLE_OK: return ResultOk; case CURLE_COULDNT_CONNECT: return ResultRetryable; case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: return ResultConnectError; case CURLE_READ_ERROR: return ResultReadError; case CURLE_OPERATION_TIMEDOUT: return ResultTimeout; default: return ResultLookupError; } } LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) { ptree::ptree root; std::stringstream stream; stream << json; try { ptree::read_json(stream, root); } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << json); return LookupDataResultPtr(); } LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>(); lookupDataResultPtr->setPartitions(root.get<int>("partitions", 0)); LOG_INFO("parsePartitionData = " << *lookupDataResultPtr); return lookupDataResultPtr; } LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) { ptree::ptree root; std::stringstream stream; stream << json; try { ptree::read_json(stream, root); } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json : " << e.what() << "\nInput Json = " << json); return LookupDataResultPtr(); } const std::string defaultNotFoundString = "Url Not found"; const std::string brokerUrl = root.get<std::string>("brokerUrl", defaultNotFoundString); if (brokerUrl == defaultNotFoundString) { LOG_ERROR("malformed json! - brokerUrl not present" << json); return LookupDataResultPtr(); } std::string brokerUrlTls = root.get<std::string>("brokerUrlTls", defaultNotFoundString); if (brokerUrlTls == defaultNotFoundString) { brokerUrlTls = root.get<std::string>("brokerUrlSsl", defaultNotFoundString); if (brokerUrlTls == defaultNotFoundString) { LOG_ERROR("malformed json! - brokerUrlTls not present" << json); return LookupDataResultPtr(); } } LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>(); lookupDataResultPtr->setBrokerUrl(brokerUrl); lookupDataResultPtr->setBrokerUrlTls(brokerUrlTls); LOG_INFO("parseLookupData = " << *lookupDataResultPtr); return lookupDataResultPtr; } NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) { LOG_DEBUG("GetNamespaceTopics json = " << json); ptree::ptree root; std::stringstream stream; stream << json; try { ptree::read_json(stream, root); } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Topics of Namespace: " << e.what() << "\nInput Json = " << json); return NamespaceTopicsPtr(); } // passed in json is like: ["topic1", "topic2"...] // root will be an array of topics std::set<std::string> topicSet; // get all topics for (const auto &item : root) { // remove partition part const std::string topicName = item.second.get_value<std::string>(); int pos = topicName.find("-partition-"); std::string filteredName = topicName.substr(0, pos); // filter duped topic name if (topicSet.find(filteredName) == topicSet.end()) { topicSet.insert(filteredName); } } NamespaceTopicsPtr topicsResultPtr = std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); return topicsResultPtr; } void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std::string completeUrl, RequestType requestType) { std::string responseData; Result result = sendHTTPRequest(completeUrl, responseData); if (result != ResultOk) { promise.setFailed(result); } else { promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData)); } } void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) { std::string responseData; long responseCode = -1; Result result = sendHTTPRequest(completeUrl, responseData, responseCode); if (responseCode == 404) { promise.setFailed(ResultTopicNotFound); } else if (result != ResultOk) { promise.setFailed(result); } else { ptree::ptree root; std::stringstream stream(responseData); try { ptree::read_json(stream, root); } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << responseData); promise.setFailed(ResultInvalidMessage); return; } const std::string defaultNotFoundString = "Not found"; auto schemaTypeStr = root.get<std::string>("type", defaultNotFoundString); if (schemaTypeStr == defaultNotFoundString) { LOG_ERROR("malformed json! - type not present" << responseData); promise.setFailed(ResultInvalidMessage); return; } auto schemaData = root.get<std::string>("data", defaultNotFoundString); if (schemaData == defaultNotFoundString) { LOG_ERROR("malformed json! - data not present" << responseData); promise.setFailed(ResultInvalidMessage); return; } auto schemaType = enumSchemaType(schemaTypeStr); if (schemaType == KEY_VALUE) { ptree::ptree kvRoot; std::stringstream kvStream(schemaData); try { ptree::read_json(kvStream, kvRoot); } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << schemaData); promise.setFailed(ResultInvalidMessage); return; } const auto keyData = toJson(kvRoot.get_child("key")); const auto valueData = toJson(kvRoot.get_child("value")); schemaData = mergeKeyValueSchema(keyData, valueData); } StringMap properties; auto propertiesTree = root.get_child("properties"); for (const auto &item : propertiesTree) { properties[item.first] = item.second.get_value<std::string>(); } SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData, properties); promise.setValue(schemaInfo); } } } // namespace pulsar