source/timed_single_thread_context.cpp (80 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unifex/timed_single_thread_context.hpp>
namespace unifex {
timed_single_thread_context::timed_single_thread_context()
: thread_([this] { this->run(); })
{}
timed_single_thread_context::~timed_single_thread_context() {
{
std::lock_guard lock{mutex_};
stop_ = true;
cv_.notify_one();
}
thread_.join();
UNIFEX_ASSERT(head_ == nullptr);
}
void timed_single_thread_context::enqueue(task_base* task) noexcept {
std::lock_guard lock{mutex_};
if (head_ == nullptr || task->dueTime_ < head_->dueTime_) {
// Insert at the head of the queue.
task->next_ = head_;
task->prevNextPtr_ = &head_;
if (head_ != nullptr) {
head_->prevNextPtr_ = &task->next_;
}
head_ = task;
// New minimum due-time has changed, wake the thread.
cv_.notify_one();
} else {
auto* queuedTask = head_;
while (queuedTask->next_ != nullptr &&
queuedTask->next_->dueTime_ <= task->dueTime_) {
queuedTask = queuedTask->next_;
}
// Insert after queuedTask
task->prevNextPtr_ = &queuedTask->next_;
task->next_ = queuedTask->next_;
if (task->next_ != nullptr) {
task->next_->prevNextPtr_ = &task->next_;
}
queuedTask->next_ = task;
}
}
void timed_single_thread_context::run() {
std::unique_lock lock{mutex_};
while (!stop_) {
if (head_ != nullptr) {
auto now = clock_t::now();
auto nextDueTime = head_->dueTime_;
if (nextDueTime <= now) {
// Ready to run
// Dequeue item
auto* task = head_;
head_ = task->next_;
if (head_ != nullptr) {
head_->prevNextPtr_ = &head_;
}
// Flag the task as dequeued.
task->prevNextPtr_ = nullptr;
lock.unlock();
task->execute();
lock.lock();
} else {
// Not yet ready to run. Sleep until it's ready.
cv_.wait_until(lock, nextDueTime);
}
} else {
// Queue is empty.
cv_.wait(lock);
}
}
}
void _timed_single_thread_context::cancel_callback::operator()() noexcept {
std::unique_lock lock{task_->context_->mutex_};
auto now = clock_t::now();
if (now < task_->dueTime_) {
task_->dueTime_ = now;
if (task_->prevNextPtr_ != nullptr) {
// Task is still in the queue, dequeue and requeue it.
// Remove from the queue.
*task_->prevNextPtr_ = task_->next_;
if (task_->next_ != nullptr) {
task_->next_->prevNextPtr_ = task_->prevNextPtr_;
}
task_->prevNextPtr_ = nullptr;
lock.unlock();
// And requeue with an updated time.
task_->dueTime_ = now;
task_->context_->enqueue(task_);
}
}
}
} // namespace unifex