glean/interprocess/cpp/worklist.cpp (197 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include "common/hs/util/cpp/wrap.h" #include "glean/interprocess/cpp/worklist.h" #include "glean/interprocess/cpp/worklist_ffi.h" #include <atomic> #include <fstream> #include <boost/interprocess/file_mapping.hpp> #include <boost/interprocess/mapped_region.hpp> #ifdef OSS #include <glog/logging.h> #else #include "common/logging/logging.h" #endif // A stealing counter file has the following structure: // // uint64 uint32 uint32 // +--------+--------+--------+-------+------------+------------+ // | n | start0 | end0 | ..... | start(n-1) | end(n-1) | // +--------+--------+--------+-------+------------+------------+ // // That is, it starts with a 64 bit word which contains the number of workers // and then for each worker there is a pair of (start,end) - the current index // and the index at which to stop - which are both 32 bits. // // We manipulate each such pair as atomic 64-bit values which makes the logic // quite simple. In particular, we can atomically increment the 64 bit value // to get to the next element. // // With files, we don't need the size at the beginning, strictly speaking. We // will, though, if we switch to shmem objects. using namespace facebook::hs; using namespace facebook::glean; extern "C" { struct glean_interprocess_worklist_t { boost::interprocess::file_mapping mapping; boost::interprocess::mapped_region region; std::atomic<uint64_t> *counters; size_t size; static worklist::Counter::Value unpack(uint64_t n) { return {static_cast<uint32_t>(n), static_cast<uint32_t>(n >> 32)}; } static uint64_t pack(worklist::Counter::Value val) { return (uint64_t(val.end) << 32) | val.start; } explicit glean_interprocess_worklist_t(const char *path) { mapping = boost::interprocess::file_mapping( path, boost::interprocess::read_write); region = boost::interprocess::mapped_region( mapping, boost::interprocess::read_only, 0, sizeof(uint64_t)); size = *static_cast<const uint64_t *>(region.get_address()); region = boost::interprocess::mapped_region( mapping, boost::interprocess::read_write, 0, sizeof(uint64_t) * (size + 1)); counters = static_cast<std::atomic<uint64_t> *>(region.get_address()) + 1; assert(std::atomic_is_lock_free(counters)); } static void create( const char *path, const std::vector<worklist::Counter::Value>& values) { std::vector<uint64_t> words; words.reserve(values.size()+1); words.push_back(values.size()); for (const auto& value : values) { words.push_back(pack(value)); } std::ofstream stream(path, std::ios::out | std::ios::binary); stream.write( reinterpret_cast<const char *>(words.data()), words.size() * sizeof(uint64_t)); } worklist::Counter::Value get(size_t worker) const noexcept { return unpack(counters[worker].load()); } std::pair<worklist::Counter::Value, size_t> next(size_t worker) noexcept { auto victim = worker; auto value = unpack(counters[worker].fetch_add(1)); if (value.empty()) { // NOTE: We assume that other workers will only steal from us, never give // us things to do. bool done = false; // The basic idea is to find the worker with the most work and steal half // of it. We do this in a loop until success because we want a lock-free // algorithm. while (!done) { worklist::Counter::Value other = value; // Find the worker with the most work. for (size_t i = (worker+1) % size; i != worker; i = (i+1) % size) { auto x = get(i); if (x.size() > other.size()) { other = x; victim = i; } } if (!other.empty()) { // If we found a worker which has some work left, steal half of it. uint32_t split = other.start + other.size() / 2; auto expected = pack(other); // To steal, just update their start/end pair provided it hasn't // changed. If it has, we'll do the whole thing again. if (counters[victim].compare_exchange_strong( expected, pack({other.start, split}))) { // We've stolen work, now set our start/end pair. We know it hasn't // changed because we have no more work left so nobody else is going // to update it. value = {split, other.end}; counters[worker].store(pack({split+1, other.end})); done = true; } } else { // Nobody has any work left. We've already set the start/end outputs // when we were fetching our own counter. done = true; } } } return {value, victim}; } }; const char *glean_interprocess_worklist_create( const char *path, size_t count, const uint32_t *starts, const uint32_t *ends) { return ffi::wrap([=] { std::vector<worklist::Counter::Value> values; values.reserve(count); for (size_t i = 0; i < count; ++i) { values.push_back({starts[i], ends[i]}); } glean_interprocess_worklist_t::create(path, values); }); } const char *glean_interprocess_worklist_open( const char *path, glean_interprocess_worklist_t **worklist) { return ffi::wrap([=] { auto w = std::make_unique<glean_interprocess_worklist_t>(path); *worklist = w.release(); }); } void glean_interprocess_worklist_close( glean_interprocess_worklist_t *worklist) { ffi::free_(worklist); } void glean_interprocess_worklist_get( const glean_interprocess_worklist_t *worklist, size_t worker, uint32_t *start, uint32_t *end) { auto value = worklist->get(worker); *start = value.start; *end = value.end; } void glean_interprocess_worklist_next( glean_interprocess_worklist_t *worklist, size_t worker, uint32_t *start, uint32_t *end, size_t *victim) { auto r = worklist->next(worker); *start = r.first.start; *end = r.first.end; *victim = r.second; } } namespace facebook { namespace glean { namespace worklist { namespace { struct SerialCounter : public Counter { uint32_t start; uint32_t end; explicit SerialCounter(uint32_t i, uint32_t e) : start(i), end(e) {} folly::Optional<Value> next() override { if (start < end) { auto i = start; ++start; return Value{i,end}; } else { return folly::none; } } }; struct StealingCounter : public Counter { glean_interprocess_worklist_t worklist; size_t worker; StealingCounter( const std::string& path, size_t worker_index, size_t worker_count) : worklist(path.c_str()) { CHECK_EQ(worker_count, worklist.size); worker = worker_index; } folly::Optional<Value> next() override { auto r = worklist.next(worker); if (r.second != worker) { LOG(INFO) << worker << ": stole " << r.first.start+1 << "-" << r.first.end << " from " << r.second; } if (!r.first.empty()) { return r.first; } else { return folly::none; } } }; } std::unique_ptr<Counter> serialCounter(size_t start, size_t end) { return std::make_unique<SerialCounter>(start,end); } std::unique_ptr<Counter> stealingCounter( const std::string& path, size_t worker_index, size_t worker_count) { return std::make_unique<StealingCounter>(path, worker_index, worker_count); } void stealingCounterSetup( const std::string& path, const std::vector<Counter::Value>& values) { glean_interprocess_worklist_t::create(path.c_str(), values); } } } }