lib/Future.h (100 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_FUTURE_H_
#define LIB_FUTURE_H_
#include <atomic>
#include <chrono>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
namespace pulsar {
template <typename Result, typename Type>
class InternalState {
public:
using Listener = std::function<void(Result, const Type &)>;
using Pair = std::pair<Result, Type>;
using Lock = std::unique_lock<std::mutex>;
// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
InternalState() {}
void addListener(Listener listener) {
Lock lock{mutex_};
listeners_.emplace_back(listener);
lock.unlock();
if (completed()) {
Type value;
Result result = get(value);
triggerListeners(result, value);
}
}
bool complete(Result result, const Type &value) {
bool expected = false;
if (!completed_.compare_exchange_strong(expected, true)) {
return false;
}
triggerListeners(result, value);
promise_.set_value(std::make_pair(result, value));
return true;
}
bool completed() const noexcept { return completed_; }
Result get(Type &result) {
const auto &pair = future_.get();
result = pair.second;
return pair.first;
}
// Only public for test
void triggerListeners(Result result, const Type &value) {
while (true) {
Lock lock{mutex_};
if (listeners_.empty()) {
return;
}
bool expected = false;
if (!listenerRunning_.compare_exchange_strong(expected, true)) {
// There is another thread that polled a listener that is running, skip polling and release
// the lock. Here we wait for some time to avoid busy waiting.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
auto listener = std::move(listeners_.front());
listeners_.pop_front();
lock.unlock();
listener(result, value);
listenerRunning_ = false;
}
}
private:
std::atomic_bool completed_{false};
std::promise<Pair> promise_;
std::shared_future<Pair> future_{promise_.get_future()};
std::list<Listener> listeners_;
mutable std::mutex mutex_;
std::atomic_bool listenerRunning_{false};
};
template <typename Result, typename Type>
using InternalStatePtr = std::shared_ptr<InternalState<Result, Type>>;
template <typename Result, typename Type>
class Future {
public:
using Listener = typename InternalState<Result, Type>::Listener;
Future &addListener(Listener listener) {
state_->addListener(listener);
return *this;
}
Result get(Type &result) { return state_->get(result); }
private:
InternalStatePtr<Result, Type> state_;
Future(InternalStatePtr<Result, Type> state) : state_(state) {}
template <typename U, typename V>
friend class Promise;
};
template <typename Result, typename Type>
class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type>>()) {}
bool setValue(const Type &value) const { return state_->complete({}, value); }
bool setFailed(Result result) const { return state_->complete(result, {}); }
bool isComplete() const { return state_->completed(); }
Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }
private:
const InternalStatePtr<Result, Type> state_;
};
} // namespace pulsar
#endif