lib/RetryableLookupService.h (64 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #pragma once #include "LookupDataResult.h" #include "LookupService.h" #include "NamespaceName.h" #include "RetryableOperationCache.h" #include "TopicName.h" namespace pulsar { class RetryableLookupService : public LookupService { private: friend class LookupServiceTest; struct PassKey { explicit PassKey() {} }; public: template <typename... Args> explicit RetryableLookupService(PassKey, Args&&... args) : RetryableLookupService(std::forward<Args>(args)...) {} void close() override { lookupCache_->clear(); partitionLookupCache_->clear(); namespaceLookupCache_->clear(); getSchemaCache_->clear(); } template <typename... Args> static std::shared_ptr<RetryableLookupService> create(Args&&... args) { return std::make_shared<RetryableLookupService>(PassKey{}, std::forward<Args>(args)...); } LookupResultFuture getBroker(const TopicName& topicName) override { return lookupCache_->run("get-broker-" + topicName.toString(), [this, topicName] { return lookupService_->getBroker(topicName); }); } Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override { return partitionLookupCache_->run( "get-partition-metadata-" + topicName->toString(), [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); } Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { return namespaceLookupCache_->run( "get-topics-of-namespace-" + nsName->toString(), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override { return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] { return lookupService_->getSchema(topicName, version); }); } private: const std::shared_ptr<LookupService> lookupService_; RetryableOperationCachePtr<LookupResult> lookupCache_; RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_; RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_; RetryableOperationCachePtr<SchemaInfo> getSchemaCache_; RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds, ExecutorServiceProviderPtr executorProvider) : lookupService_(lookupService), lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeoutSeconds)), partitionLookupCache_( RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeoutSeconds)), namespaceLookupCache_( RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeoutSeconds)), getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeoutSeconds)) {} }; } // namespace pulsar