mcrouter/FileObserver.cpp (113 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "FileObserver.h"
#include <memory>
#include <vector>
#include <glog/logging.h>
#include <folly/io/async/EventBase.h>
#include "mcrouter/FileDataProvider.h"
#include "mcrouter/McrouterLogFailure.h"
namespace facebook {
namespace memcache {
namespace mcrouter {
namespace {
struct FileObserverData {
FileObserverData(
const std::shared_ptr<folly::FunctionScheduler>& scheduler__,
std::string functionName__,
std::shared_ptr<FileDataProvider> provider__,
std::function<void(std::string)> onUpdate__,
std::chrono::milliseconds sleepBeforeUpdate__,
FileObserverHandle handle)
: scheduler(scheduler__),
functionName(functionName__),
provider(std::move(provider__)),
onUpdate(std::move(onUpdate__)),
sleepBeforeUpdate(sleepBeforeUpdate__),
token(handle) {}
std::weak_ptr<folly::FunctionScheduler> scheduler;
std::string functionName;
std::shared_ptr<FileDataProvider> provider;
std::function<void(std::string)> onUpdate;
std::chrono::milliseconds sleepBeforeUpdate;
std::weak_ptr<FileObserverToken> token;
};
void checkForUpdate(const std::shared_ptr<FileObserverData>& data) {
const auto scheduler = data->scheduler.lock();
assert(scheduler);
if (!scheduler) {
// Should never happens since we're executed by the scheduler.
LOG_FAILURE(
"mcrouter",
failure::Category::kSystemError,
"Global function scheduler not available");
return;
}
const auto tokenRef = data->token.lock();
if (!tokenRef) {
scheduler->cancelFunction(data->functionName);
return;
}
try {
if (!data->provider->hasUpdate()) {
return;
}
} catch (...) {
LOG_FAILURE(
"mcrouter",
failure::Category::kOther,
"Error while observing file for update. Will stop polling for updates");
data->token.reset();
return;
}
static std::atomic<uint64_t> updateId(0);
scheduler->addFunctionOnce(
[data = data]() {
try {
if (const auto ref = data->token.lock()) {
data->onUpdate(data->provider->load());
}
} catch (...) {
LOG_FAILURE(
"mcrouter",
failure::Category::kOther,
"Error while observing file for update");
}
},
folly::to<std::string>(
"carbon-file-observer-update-", updateId.fetch_add(1)),
data->sleepBeforeUpdate);
}
} // anonymous namespace
FileObserverHandle startObservingFile(
const std::string& filePath,
const std::shared_ptr<folly::FunctionScheduler>& scheduler,
std::chrono::milliseconds pollPeriod,
std::chrono::milliseconds sleepBeforeUpdate,
std::function<void(std::string)> onUpdate) {
std::shared_ptr<FileDataProvider> provider;
try {
provider = std::make_shared<FileDataProvider>(filePath);
onUpdate(provider->load());
} catch (const std::exception& e) {
VLOG(0) << "Can not start watching " << filePath
<< " for modifications: " << e.what();
return FileObserverHandle();
}
VLOG(0) << "Watching " << filePath << " for modifications.";
FileObserverHandle handle = std::make_shared<FileObserverToken>();
static std::atomic<uint64_t> uniqueId(0);
auto data = std::make_shared<FileObserverData>(
scheduler,
folly::to<std::string>("carbon-file-observer-", uniqueId.fetch_add(1)),
std::move(provider),
std::move(onUpdate),
sleepBeforeUpdate,
handle);
scheduler->addFunction(
[data = data]() { checkForUpdate(data); },
pollPeriod,
data->functionName,
pollPeriod);
return handle;
}
} // namespace mcrouter
} // namespace memcache
} // namespace facebook