src/bthread/execution_queue_inl.h (464 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: 2015/10/27 17:39:48
#ifndef BTHREAD_EXECUTION_QUEUE_INL_H
#define BTHREAD_EXECUTION_QUEUE_INL_H
#include "butil/atomicops.h" // butil::atomic
#include "butil/macros.h" // BAIDU_CACHELINE_ALIGNMENT
#include "butil/memory/scoped_ptr.h" // butil::scoped_ptr
#include "butil/logging.h" // LOG
#include "butil/time.h" // butil::cpuwide_time_ns
#include "butil/object_pool.h" // butil::get_object
#include "bvar/bvar.h" // bvar::Adder
#include "bthread/butex.h" // butex_construct
#include "butil/synchronization/condition_variable.h"
namespace bthread {
template <typename T>
struct ExecutionQueueId {
uint64_t value;
};
struct TaskNode;
class ExecutionQueueBase;
typedef void (*clear_task_mem)(TaskNode*);
struct BAIDU_CACHELINE_ALIGNMENT TaskNode {
enum TaskStatus {
UNEXECUTED = 0,
EXECUTING = 1,
EXECUTED = 2
};
TaskNode()
: version(0)
, status(UNEXECUTED)
, stop_task(false)
, iterated(false)
, high_priority(false)
, in_place(false)
, next(UNCONNECTED)
, q(NULL)
{}
~TaskNode() {}
int cancel(int64_t expected_version) {
BAIDU_SCOPED_LOCK(mutex);
if (version != expected_version) {
return -1;
}
if (status == UNEXECUTED) {
status = EXECUTED;
return 0;
}
return status == EXECUTED ? -1 : 1;
}
void set_executed() {
BAIDU_SCOPED_LOCK(mutex);
status = EXECUTED;
}
bool peek_to_execute() {
BAIDU_SCOPED_LOCK(mutex);
if (status == UNEXECUTED) {
status = EXECUTING;
return true;
}
return false;
}
butil::Mutex mutex; // to guard version and status
int64_t version;
uint8_t status;
bool stop_task;
bool iterated;
bool high_priority;
bool in_place;
TaskNode* next;
ExecutionQueueBase* q;
union {
char static_task_mem[56]; // Make sizeof TaskNode exactly 128 bytes
char* dynamic_task_mem;
};
void clear_before_return(clear_task_mem clear_func) {
if (!stop_task) {
clear_func(this);
CHECK(iterated);
}
q = NULL;
std::unique_lock<butil::Mutex> lck(mutex);
++version;
const int saved_status = status;
status = UNEXECUTED;
lck.unlock();
CHECK_NE(saved_status, UNEXECUTED);
LOG_IF(WARNING, saved_status == EXECUTING)
<< "Return a executing node, did you return before "
"iterator reached the end?";
}
static TaskNode* const UNCONNECTED;
};
// Specialize TaskNodeAllocator for types with different sizes
template <size_t size, bool small_object> struct TaskAllocatorBase {
};
template <size_t size>
struct TaskAllocatorBase<size, true> {
inline static void* allocate(TaskNode* node)
{ return node->static_task_mem; }
inline static void* get_allocated_mem(TaskNode* node)
{ return node->static_task_mem; }
inline static void deallocate(TaskNode*) {}
};
template<size_t size>
struct TaskAllocatorBase<size, false> {
inline static void* allocate(TaskNode* node) {
node->dynamic_task_mem = (char*)malloc(size);
return node->dynamic_task_mem;
}
inline static void* get_allocated_mem(TaskNode* node)
{ return node->dynamic_task_mem; }
inline static void deallocate(TaskNode* node) {
free(node->dynamic_task_mem);
}
};
template <typename T>
struct TaskAllocator : public TaskAllocatorBase<
sizeof(T), sizeof(T) <= sizeof(TaskNode().static_task_mem)>
{};
class TaskIteratorBase;
class BAIDU_CACHELINE_ALIGNMENT ExecutionQueueBase {
DISALLOW_COPY_AND_ASSIGN(ExecutionQueueBase);
struct Forbidden {};
friend class TaskIteratorBase;
struct Dereferencer {
void operator()(ExecutionQueueBase* queue) {
if (queue != NULL) {
queue->dereference();
}
}
};
public:
// User cannot create ExecutionQueue fron construct
ExecutionQueueBase(Forbidden)
: _head(NULL)
, _versioned_ref(0) // join() depends on even version
, _high_priority_tasks(0)
, _pthread_started(false)
, _cond(&_mutex)
, _current_head(NULL) {
_join_butex = butex_create_checked<butil::atomic<int> >();
_join_butex->store(0, butil::memory_order_relaxed);
}
~ExecutionQueueBase() {
butex_destroy(_join_butex);
}
bool stopped() const { return _stopped.load(butil::memory_order_acquire); }
int stop();
static int join(uint64_t id);
protected:
typedef int (*execute_func_t)(void*, void*, TaskIteratorBase&);
typedef scoped_ptr<ExecutionQueueBase, Dereferencer> scoped_ptr_t;
int dereference();
static int create(uint64_t* id, const ExecutionQueueOptions* options,
execute_func_t execute_func,
clear_task_mem clear_func,
void* meta, void* type_specific_function);
static scoped_ptr_t address(uint64_t id) WARN_UNUSED_RESULT;
void start_execute(TaskNode* node);
TaskNode* allocate_node();
void return_task_node(TaskNode* node);
private:
bool _more_tasks(TaskNode* old_head, TaskNode** new_tail,
bool has_uniterated);
void _release_additional_reference() {
dereference();
}
void _on_recycle();
int _execute(TaskNode* head, bool high_priority, int* niterated);
static void* _execute_tasks(void* arg);
static void* _execute_tasks_pthread(void* arg);
static inline uint32_t _version_of_id(uint64_t id) WARN_UNUSED_RESULT {
return (uint32_t)(id >> 32);
}
static inline uint32_t _version_of_vref(int64_t vref) WARN_UNUSED_RESULT {
return (uint32_t)(vref >> 32);
}
static inline uint32_t _ref_of_vref(int64_t vref) WARN_UNUSED_RESULT {
return (int32_t)(vref & 0xFFFFFFFFul);
}
static inline int64_t _make_vref(uint32_t version, int32_t ref) {
// 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
return (((uint64_t)version) << 32) | (uint32_t/*1*/)ref;
}
// Don't change the order of _head, _versioned_ref and _stopped unless you
// see improvement of performance in test
BAIDU_CACHELINE_ALIGNMENT butil::atomic<TaskNode*> _head;
BAIDU_CACHELINE_ALIGNMENT butil::atomic<uint64_t> _versioned_ref;
BAIDU_CACHELINE_ALIGNMENT butil::atomic<bool> _stopped;
butil::atomic<int64_t> _high_priority_tasks;
uint64_t _this_id;
void* _meta;
void* _type_specific_function;
execute_func_t _execute_func;
clear_task_mem _clear_func;
ExecutionQueueOptions _options;
butil::atomic<int>* _join_butex;
// For pthread mode.
pthread_t _pid;
bool _pthread_started;
butil::Mutex _mutex;
butil::ConditionVariable _cond;
TaskNode* _current_head; // Current task head of each execution.
};
template <typename T>
class ExecutionQueue : public ExecutionQueueBase {
struct Forbidden {};
friend class TaskIterator<T>;
typedef ExecutionQueueBase Base;
ExecutionQueue();
public:
typedef ExecutionQueue<T> self_type;
struct Dereferencer {
void operator()(self_type* queue) {
if (queue != NULL) {
queue->dereference();
}
}
};
typedef scoped_ptr<self_type, Dereferencer> scoped_ptr_t;
typedef bthread::ExecutionQueueId<T> id_t;
typedef TaskIterator<T> iterator;
typedef int (*execute_func_t)(void*, iterator&);
typedef TaskAllocator<T> allocator;
BAIDU_CASSERT(sizeof(execute_func_t) == sizeof(void*),
sizeof_function_must_be_equal_to_sizeof_voidptr);
static void clear_task_mem(TaskNode* node) {
T* const task = (T*)allocator::get_allocated_mem(node);
task->~T();
allocator::deallocate(node);
}
static int execute_task(void* meta, void* specific_function,
TaskIteratorBase& it) {
execute_func_t f = (execute_func_t)specific_function;
return f(meta, static_cast<iterator&>(it));
}
inline static int create(id_t* id, const ExecutionQueueOptions* options,
execute_func_t execute_func, void* meta) {
return Base::create(&id->value, options, execute_task,
clear_task_mem, meta, (void*)execute_func);
}
inline static scoped_ptr_t address(id_t id) WARN_UNUSED_RESULT {
Base::scoped_ptr_t ptr = Base::address(id.value);
Base* b = ptr.release();
scoped_ptr_t ret((self_type*)b);
return ret.Pass();
}
int execute(typename butil::add_const_reference<T>::type task) {
return execute(task, NULL, NULL);
}
int execute(typename butil::add_const_reference<T>::type task,
const TaskOptions* options, TaskHandle* handle) {
return execute(std::forward<T>(const_cast<T&>(task)), options, handle);
}
int execute(T&& task) {
return execute(std::forward<T>(task), NULL, NULL);
}
int execute(T&& task,
const TaskOptions* options, TaskHandle* handle) {
if (stopped()) {
return EINVAL;
}
TaskNode* node = allocate_node();
if (BAIDU_UNLIKELY(node == NULL)) {
return ENOMEM;
}
void* const mem = allocator::allocate(node);
if (BAIDU_UNLIKELY(!mem)) {
return_task_node(node);
return ENOMEM;
}
new (mem) T(std::forward<T>(task));
node->stop_task = false;
TaskOptions opt;
if (options) {
opt = *options;
}
node->high_priority = opt.high_priority;
node->in_place = opt.in_place_if_possible;
if (handle) {
handle->node = node;
handle->version = node->version;
}
start_execute(node);
return 0;
}
};
inline ExecutionQueueOptions::ExecutionQueueOptions()
: use_pthread(false)
, bthread_attr(BTHREAD_ATTR_NORMAL)
, executor(NULL)
{}
template <typename T>
inline int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>&),
void* meta) {
return ExecutionQueue<T>::create(id, options, execute, meta);
}
template <typename T>
typename ExecutionQueue<T>::scoped_ptr_t
execution_queue_address(ExecutionQueueId<T> id) {
return ExecutionQueue<T>::address(id);
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task) {
return execution_queue_execute(id, task, NULL);
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options) {
return execution_queue_execute(id, task, options, NULL);
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle) {
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(task, options, handle);
} else {
return EINVAL;
}
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task) {
return execution_queue_execute(id, std::forward<T>(task), NULL);
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options) {
return execution_queue_execute(id, std::forward<T>(task), options, NULL);
}
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options,
TaskHandle* handle) {
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(std::forward<T>(task), options, handle);
} else {
return EINVAL;
}
}
template <typename T>
inline int execution_queue_stop(ExecutionQueueId<T> id) {
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->stop();
} else {
return EINVAL;
}
}
template <typename T>
inline int execution_queue_join(ExecutionQueueId<T> id) {
return ExecutionQueue<T>::join(id.value);
}
inline TaskOptions::TaskOptions()
: high_priority(false)
, in_place_if_possible(false)
{}
inline TaskOptions::TaskOptions(bool high_priority, bool in_place_if_possible)
: high_priority(high_priority)
, in_place_if_possible(in_place_if_possible)
{}
//--------------------- TaskIterator ------------------------
inline TaskIteratorBase::operator bool() const {
return !_is_stopped && !_should_break && _cur_node != NULL
&& !_cur_node->stop_task;
}
template <typename T>
inline typename TaskIterator<T>::reference
TaskIterator<T>::operator*() const {
T* const ptr = (T* const)TaskAllocator<T>::get_allocated_mem(cur_node());
return *ptr;
}
template <typename T>
TaskIterator<T>& TaskIterator<T>::operator++() {
TaskIteratorBase::operator++();
return *this;
}
template <typename T>
void TaskIterator<T>::operator++(int) {
operator++();
}
inline TaskHandle::TaskHandle()
: node(NULL)
, version(0)
{}
inline int execution_queue_cancel(const TaskHandle& h) {
if (h.node == NULL) {
return -1;
}
return h.node->cancel(h.version);
}
// ---------------------ExecutionQueueBase--------------------
inline bool ExecutionQueueBase::_more_tasks(
TaskNode* old_head, TaskNode** new_tail,
bool has_uniterated) {
CHECK(old_head->next == NULL);
// Try to set _head to NULL to mark that the execute is done.
TaskNode* new_head = old_head;
TaskNode* desired = NULL;
bool return_when_no_more = false;
if (has_uniterated) {
desired = old_head;
return_when_no_more = true;
}
if (_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new tasks.
return return_when_no_more;
}
CHECK_NE(new_head, old_head);
// Above acquire fence pairs release fence of exchange in Write() to make
// sure that we see all fields of requests set.
// Someone added new requests.
// Reverse the list until old_head.
TaskNode* tail = NULL;
if (new_tail) {
*new_tail = new_head;
}
TaskNode* p = new_head;
do {
while (p->next == TaskNode::UNCONNECTED) {
// TODO(gejun): elaborate this
sched_yield();
}
TaskNode* const saved_next = p->next;
p->next = tail;
tail = p;
p = saved_next;
CHECK(p != NULL);
} while (p != old_head);
// Link old list with new list.
old_head->next = tail;
return true;
}
inline int ExecutionQueueBase::dereference() {
const uint64_t vref = _versioned_ref.fetch_sub(
1, butil::memory_order_release);
const int32_t nref = _ref_of_vref(vref);
// We need make the fast path as fast as possible, don't put any extra
// code before this point
if (nref > 1) {
return 0;
}
const uint64_t id = _this_id;
if (__builtin_expect(nref == 1, 1)) {
const uint32_t ver = _version_of_vref(vref);
const uint32_t id_ver = _version_of_id(id);
// Besides first successful stop() adds 1 to version, one of
// those dereferencing nref from 1->0 adds another 1 to version.
// Notice "one of those": The wait-free address() may make ref of a
// version-unmatched slot change from 1 to 0 for mutiple times, we
// have to use version as a guard variable to prevent returning the
// executor to pool more than once.
if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
// sees nref:1->0, try to set version=id_ver+2,--nref.
// No retry: if version changes, the slot is already returned by
// another one who sees nref:1->0 concurrently; if nref changes,
// which must be non-zero, the slot will be returned when
// nref changes from 1->0 again.
// Example:
// stop(): --nref, sees nref:1->0 (1)
// try to set version=id_ver+2 (2)
// address(): ++nref, unmatched version (3)
// --nref, sees nref:1->0 (4)
// try to set version=id_ver+2 (5)
// 1,2,3,4,5 or 1,3,4,2,5:
// stop() succeeds, address() fails at (5).
// 1,3,2,4,5: stop() fails with (2), the slot will be
// returned by (5) of address()
// 1,3,4,5,2: stop() fails with (2), the slot is already
// returned by (5) of address().
uint64_t expected_vref = vref - 1;
if (_versioned_ref.compare_exchange_strong(
expected_vref, _make_vref(id_ver + 2, 0),
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
_on_recycle();
// We don't return m immediately when the reference count
// reaches 0 as there might be in processing tasks. Instead
// _on_recycle would push a `stop_task' after which is executed
// m would be finally returned and reset
return 1;
}
return 0;
}
LOG(FATAL) << "Invalid id=" << id;
return -1;
}
LOG(FATAL) << "Over dereferenced id=" << id;
return -1;
}
} // namespace bthread
namespace butil {
// `TaskNode::cancel' may access the TaskNode object returned to the ObjectPool<TaskNode>,
// so ObjectPool<TaskNode> can not poison the memory region of TaskNode.
template <>
struct ObjectPoolWithASanPoison<bthread::TaskNode> : false_type {};
} // namespace butil
#endif //BTHREAD_EXECUTION_QUEUE_INL_H