lib/RetryableOperationCache.h (79 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #pragma once #include <chrono> #include <mutex> #include <unordered_map> #include "ExecutorService.h" #include "RetryableOperation.h" namespace pulsar { template <typename T> class RetryableOperationCache; template <typename T> using RetryableOperationCachePtr = std::shared_ptr<RetryableOperationCache<T>>; template <typename T> class RetryableOperationCache : public std::enable_shared_from_this<RetryableOperationCache<T>> { friend class LookupServiceTest; friend class RetryableOperationCacheTest; struct PassKey { explicit PassKey() {} }; RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, TimeDuration timeout) : executorProvider_(executorProvider), timeout_(timeout) {} using Self = RetryableOperationCache<T>; public: template <typename... Args> explicit RetryableOperationCache(PassKey, Args&&... args) : RetryableOperationCache(std::forward<Args>(args)...) {} template <typename... Args> static std::shared_ptr<Self> create(Args&&... args) { return std::make_shared<Self>(PassKey{}, std::forward<Args>(args)...); } Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) { std::unique_lock<std::mutex> lock{mutex_}; auto it = operations_.find(key); if (it == operations_.end()) { DeadlineTimerPtr timer; try { timer = executorProvider_->get()->createDeadlineTimer(); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); Promise<Result, T> promise; promise.setFailed(ResultConnectError); return promise.getFuture(); } auto operation = RetryableOperation<T>::create(key, std::move(func), timeout_, timer); auto future = operation->run(); operations_[key] = operation; lock.unlock(); std::weak_ptr<Self> weakSelf{this->shared_from_this()}; future.addListener([this, weakSelf, key, operation](Result, const T&) { auto self = weakSelf.lock(); if (!self) { return; } std::lock_guard<std::mutex> lock{mutex_}; operations_.erase(key); operation->cancel(); }); return future; } else { return it->second->run(); } } void clear() { decltype(operations_) operations; { std::lock_guard<std::mutex> lock{mutex_}; operations.swap(operations_); } // cancel() could trigger the listener to erase the key from operations, so we should use a swap way // to release the lock here for (auto&& kv : operations) { kv.second->cancel(); } } private: ExecutorServiceProviderPtr executorProvider_; const TimeDuration timeout_; std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_; mutable std::mutex mutex_; DECLARE_LOG_OBJECT() }; } // namespace pulsar