cpp/logger/lfrb/Futex.cpp (231 lines of code) (raw):
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <profilo/logger/lfrb/Futex.h>
#include <stdint.h>
#include <string.h>
#include <cerrno>
#ifdef __linux__
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#else
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>
#endif
using namespace std::chrono;
namespace facebook {
namespace profilo {
namespace logger {
namespace lfrb {
namespace {
////////////////////////////////////////////////////
// native implementation using the futex() syscall
#ifdef __linux__
/// Certain toolchains (like Android's) don't include the full futex API in
/// their headers even though they support it. Make sure we have our constants
/// even if the headers don't have them.
#ifndef FUTEX_WAIT_BITSET
#define FUTEX_WAIT_BITSET 9
#endif
#ifndef FUTEX_WAKE_BITSET
#define FUTEX_WAKE_BITSET 10
#endif
#ifndef FUTEX_PRIVATE_FLAG
#define FUTEX_PRIVATE_FLAG 128
#endif
#ifndef FUTEX_CLOCK_REALTIME
#define FUTEX_CLOCK_REALTIME 256
#endif
int nativeFutexWake(void* addr, int count, uint32_t wakeMask) {
int rv = syscall(
__NR_futex,
addr, /* addr1 */
FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
count, /* val */
nullptr, /* timeout */
nullptr, /* addr2 */
wakeMask); /* val3 */
/* NOTE: we ignore errors on wake for the case of a futex
guarding its own destruction, similar to this
glibc bug with sem_post/sem_wait:
https://sourceware.org/bugzilla/show_bug.cgi?id=12674 */
if (rv < 0) {
return 0;
}
return rv;
}
template <class Clock>
struct timespec timeSpecFromTimePoint(time_point<Clock> absTime) {
auto epoch = absTime.time_since_epoch();
if (epoch.count() < 0) {
// kernel timespec_valid requires non-negative seconds and nanos in [0,1G)
epoch = Clock::duration::zero();
}
// timespec-safe seconds and nanoseconds;
// chrono::{nano,}seconds are `long long int`
// whereas timespec uses smaller types
using time_t_seconds = duration<std::time_t, seconds::period>;
using long_nanos = duration<long int, nanoseconds::period>;
auto secs = duration_cast<time_t_seconds>(epoch);
auto nanos = duration_cast<long_nanos>(epoch - secs);
struct timespec result = {secs.count(), nanos.count()};
return result;
}
FutexResult nativeFutexWaitImpl(
void* addr,
uint32_t expected,
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
assert(absSystemTime == nullptr || absSteadyTime == nullptr);
int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG;
struct timespec ts;
struct timespec* timeout = nullptr;
if (absSystemTime != nullptr) {
op |= FUTEX_CLOCK_REALTIME;
ts = timeSpecFromTimePoint(*absSystemTime);
timeout = &ts;
} else if (absSteadyTime != nullptr) {
ts = timeSpecFromTimePoint(*absSteadyTime);
timeout = &ts;
}
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
// value - http://locklessinc.com/articles/futex_cheat_sheet/
int rv = syscall(
__NR_futex,
addr, /* addr1 */
op, /* op */
expected, /* val */
timeout, /* timeout */
nullptr, /* addr2 */
waitMask); /* val3 */
if (rv == 0) {
return FutexResult::AWOKEN;
} else {
switch (errno) {
case ETIMEDOUT:
assert(timeout != nullptr);
return FutexResult::TIMEDOUT;
case EINTR:
return FutexResult::INTERRUPTED;
case EWOULDBLOCK:
return FutexResult::VALUE_CHANGED;
default:
assert(false);
// EINVAL, EACCESS, or EFAULT. EINVAL means there was an invalid
// op (should be impossible) or an invalid timeout (should have
// been sanitized by timeSpecFromTimePoint). EACCESS or EFAULT
// means *addr points to invalid memory, which is unlikely because
// the caller should have segfaulted already. We can either
// crash, or return a value that lets the process continue for
// a bit. We choose the latter. VALUE_CHANGED probably turns the
// caller into a spin lock.
return FutexResult::VALUE_CHANGED;
}
}
}
#endif // __linux__
#if !defined(__linux__)
///////////////////////////////////////////////////////
// compatibility implementation using standard C++ API
// Our emulated futex uses 4096 lists of wait nodes. There are two levels
// of locking: a per-list mutex that controls access to the list and a
// per-node mutex, condvar, and bool that are used for the actual wakeups.
// The per-node mutex allows us to do precise wakeups without thundering
// herds.
struct EmulatedFutexWaitNode {
void* const addr_;
const uint32_t waitMask_;
// tricky: hold both bucket and node mutex to write, either to read
bool signaled_;
std::mutex mutex_;
std::condition_variable cond_;
EmulatedFutexWaitNode(void* addr, uint32_t waitMask)
: addr_(addr), waitMask_(waitMask), signaled_(false) {}
};
struct EmulatedFutexBucket {
std::mutex mutex_;
std::list<std::unique_ptr<EmulatedFutexWaitNode>> waiters_;
static const size_t kNumBuckets = 4096;
static EmulatedFutexBucket& bucketFor(void* addr) {
static auto gBuckets = new EmulatedFutexBucket[kNumBuckets];
uint64_t mixedBits =
std::hash<uintptr_t>{}(reinterpret_cast<uintptr_t>(addr));
return gBuckets[mixedBits % kNumBuckets];
}
};
int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
int numAwoken = 0;
for (auto iter = bucket.waiters_.begin();
numAwoken < count && iter != bucket.waiters_.end();) {
auto current = iter;
auto& node = *iter++;
if (node->addr_ == addr && (node->waitMask_ & waitMask) != 0) {
++numAwoken;
// we unlink, but waiter destroys the node
bucket.waiters_.erase(current);
std::unique_lock<std::mutex> nodeLock(node->mutex_);
node->signaled_ = true;
node->cond_.notify_one();
}
}
return numAwoken;
}
template <typename F>
FutexResult emulatedFutexWaitImpl(
F* futex,
uint32_t expected,
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
static_assert(
std::is_same<F, Futex<std::atomic>>::value ||
std::is_same<F, Futex<EmulatedFutexAtomic>>::value,
"Type F must be either Futex<std::atomic> or Futex<EmulatedFutexAtomic>");
void* addr = static_cast<void*>(futex);
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
auto node = std::make_unique<EmulatedFutexWaitNode>(addr, waitMask);
{
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
if (futex->load(std::memory_order_relaxed) != expected) {
return FutexResult::VALUE_CHANGED;
}
bucket.waiters_.push_back(std::move(node));
} // bucketLock scope
std::cv_status status = std::cv_status::no_timeout;
{
std::unique_lock<std::mutex> nodeLock(node->mutex_);
while (!node->signaled_ && status != std::cv_status::timeout) {
if (absSystemTime != nullptr) {
status = node->cond_.wait_until(nodeLock, *absSystemTime);
} else if (absSteadyTime != nullptr) {
status = node->cond_.wait_until(nodeLock, *absSteadyTime);
} else {
node->cond_.wait(nodeLock);
}
}
} // nodeLock scope
if (status == std::cv_status::timeout) {
// it's not really a timeout until we unlink the unsignaled node
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
if (!node->signaled_) {
bucket.waiters_.remove_if([&](auto& item) { return item == node; });
return FutexResult::TIMEDOUT;
}
}
return FutexResult::AWOKEN;
}
#endif
} // namespace
/////////////////////////////////
// Futex<> specializations
template <>
int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
#ifdef __linux__
return nativeFutexWake(this, count, wakeMask);
#else
return emulatedFutexWake(this, count, wakeMask);
#endif
}
#if !defined(__linux__)
template <>
int Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask) {
return emulatedFutexWake(this, count, wakeMask);
}
#endif
template <>
FutexResult Futex<std::atomic>::futexWaitImpl(
uint32_t expected,
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
#ifdef __linux__
return nativeFutexWaitImpl(
this, expected, absSystemTime, absSteadyTime, waitMask);
#else
return emulatedFutexWaitImpl(
this, expected, absSystemTime, absSteadyTime, waitMask);
#endif
}
#if !defined(__linux__)
template <>
FutexResult Futex<EmulatedFutexAtomic>::futexWaitImpl(
uint32_t expected,
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
return emulatedFutexWaitImpl(
this, expected, absSystemTime, absSteadyTime, waitMask);
}
#endif
} // namespace lfrb
} // namespace logger
} // namespace profilo
} // namespace facebook