src/bthread/semaphore.cpp (122 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "butil/memory/scope_guard.h" #include "bvar/collector.h" #include "bthread/bthread.h" #include "bthread/butex.h" namespace bthread { // Define in bthread/mutex.cpp class ContentionProfiler; extern ContentionProfiler* g_cp; extern bvar::CollectorSpeedLimit g_cp_sl; extern bool is_contention_site_valid(const bthread_contention_site_t& cs); extern void make_contention_site_invalid(bthread_contention_site_t* cs); extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns); static inline int bthread_sem_trywait(bthread_sem_t* sema) { auto whole = (butil::atomic<unsigned>*)sema->butex; while (true) { unsigned num = whole->load(butil::memory_order_relaxed); if (num == 0) { return EAGAIN; } if (whole->compare_exchange_weak(num, num - 1, butil::memory_order_acquire, butil::memory_order_relaxed)) { return 0; } } } static int bthread_sem_wait_impl(bthread_sem_t* sem, const struct timespec* abstime) { bool queue_lifo = false; bool first_wait = true; size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; // -1: don't sample. // 0: default value. // > 0: Start time of sampling. int64_t start_ns = 0; auto whole = (butil::atomic<unsigned>*)sem->butex; while (true) { unsigned num = whole->load(butil::memory_order_relaxed); if (num > 0) { if (whole->compare_exchange_weak(num, num - 1, butil::memory_order_acquire, butil::memory_order_relaxed)) { if (start_ns > 0) { const int64_t end_ns = butil::cpuwide_time_ns(); const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; bthread::submit_contention(csite, end_ns); } return 0; } } // Don't sample when contention profiler is off. if (NULL != bthread::g_cp && start_ns == 0 && sem->enable_csite && !bvar::is_sampling_range_valid(sampling_range)) { // Ask Collector if this (contended) sem waiting should be sampled. sampling_range = bvar::is_collectable(&bthread::g_cp_sl); start_ns = bvar::is_sampling_range_valid(sampling_range) ? butil::cpuwide_time_ns() : -1; } else { start_ns = -1; } if (bthread::butex_wait(sem->butex, 0, abstime, queue_lifo) < 0 && errno != EWOULDBLOCK && errno != EINTR) { // A sema should ignore interruptions in general since // user code is unlikely to check the return value. if (ETIMEDOUT == errno && start_ns > 0) { // Failed to lock due to ETIMEDOUT, submit the elapse directly. const int64_t end_ns = butil::cpuwide_time_ns(); const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; bthread::submit_contention(csite, end_ns); } return errno; } // Ignore EWOULDBLOCK and EINTR. if (first_wait && 0 == errno) { first_wait = false; } if (!first_wait) { // Normally, bthreads are queued in FIFO order. But competing with new // arriving bthreads over sema, a woken up bthread has good chances of // losing. Because new arriving bthreads are already running on CPU and // there can be lots of them. In such case, for fairness, to avoid // starvation, it is queued at the head of the waiter queue. queue_lifo = true; } } } static inline int bthread_sem_post(bthread_sem_t* sem, size_t num) { if (num > 0) { unsigned n = ((butil::atomic<unsigned>*)sem->butex) ->fetch_add(num, butil::memory_order_relaxed); const size_t sampling_range = NULL != bthread::g_cp && sem->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE; const int64_t start_ns = bvar::is_sampling_range_valid(sampling_range) ? butil::cpuwide_time_ns() : -1; bthread::butex_wake_n(sem->butex, n); if (start_ns > 0) { const int64_t end_ns = butil::cpuwide_time_ns(); const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; bthread::submit_contention(csite, end_ns); } } return 0; } } // namespace bthread __BEGIN_DECLS int bthread_sem_init(bthread_sem_t* sem, unsigned value) { sem->butex = bthread::butex_create_checked<unsigned>(); if (!sem->butex) { return ENOMEM; } *sem->butex = value; sem->enable_csite = true; return 0; } int bthread_sem_disable_csite(bthread_sem_t* sema) { sema->enable_csite = false; return 0; } int bthread_sem_destroy(bthread_sem_t* semaphore) { bthread::butex_destroy(semaphore->butex); return 0; } int bthread_sem_trywait(bthread_sem_t* sem) { return bthread::bthread_sem_trywait(sem); } int bthread_sem_wait(bthread_sem_t* sem) { return bthread::bthread_sem_wait_impl(sem, NULL); } int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime) { return bthread::bthread_sem_wait_impl(sem, abstime); } int bthread_sem_post(bthread_sem_t* sem) { return bthread::bthread_sem_post(sem, 1); } int bthread_sem_post_n(bthread_sem_t* sem, size_t n) { return bthread::bthread_sem_post(sem, n); } __END_DECLS