glean/cpp/sender.cpp (174 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "glean/cpp/sender.h"
#include "glean/if/gen-cpp2/GleanServiceAsyncClient.h"
#include <folly/FileUtil.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/futures/Retrying.h>
#include <thrift/lib/cpp2/protocol/Serializer.h>
#if FACEBOOK
#include "glean/facebook/cpp/service.h"
#endif
namespace facebook {
namespace glean {
using namespace facebook::glean::cpp;
namespace {
class ThriftSender : public Sender {
public:
struct Config {
std::string repo_name;
std::string repo_hash;
double min_retry_delay;
size_t max_errors;
};
ThriftSender(
std::unique_ptr<thrift::GleanServiceAsyncClient> cli,
const Config& cfg)
: client(std::move(cli)), config(cfg) {}
void rebaseAndSend(BatchBase& batch, bool wait = false) override {
if (future && (wait || future->isReady())) {
// We've already sent a batch and received back a substitution.
batch.rebase(std::move(*future).get());
future.reset();
}
if (!future) {
// We aren't waiting on a substitution (either we haven't sent anything
// yet or we just rebased). Send the next piece.
thrift::Repo repo;
repo.name() = config.repo_name;
repo.hash() = config.repo_hash;
thrift::ComputedBatch cbatch;
cbatch.repo() = std::move(repo);
cbatch.remember() = true;
cbatch.batch() = batch.serialize();
future = std::make_unique<folly::Future<thrift::Subst>>(
send(
std::make_shared<thrift::ComputedBatch>(std::move(cbatch)))
.via(folly::getIOExecutor().get()));
}
}
void flush(BatchBase& batch) override {
rebaseAndSend(batch, true);
if (future) {
future->wait();
future.reset();
}
}
private:
// Communicate with the server, retrying if necessary.
template<typename F>
folly::invoke_result_t<F, thrift::GleanServiceAsyncClient *>
communicate(F f) const {
// I *think* ...Backoff mutates its closure so we need to create a new one
// for each communication request.
auto backoff =
// hardcode all the things for now
folly::futures::retryingPolicyCappedJitteredExponentialBackoff(
config.max_errors,
std::chrono::seconds(1), // minimum wait
std::chrono::seconds(30), // maximum wait
0.2); // jitter
return folly::futures::retryingUnsafe(
// We can't use backoff directly because we only want to retry on
// TTransportException, not on any other exception.
[backoff = std::move(backoff)]
(size_t n, const folly::exception_wrapper& ew) {
if (auto *e = ew.get_exception<
apache::thrift::transport::TTransportException>()) {
LOG(ERROR) << "communication error (" << n << "): " << e->what();
return backoff(n, ew);
} else {
return folly::makeFuture(false);
}
},
[f=std::move(f), client=client.get()](size_t) { return f(client); }
);
}
// Retry a communication request after a delay.
template<typename F>
auto retry(thrift::BatchRetry&& retry, F f) const {
const auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(
std::max(retry.get_seconds(), config.min_retry_delay)));
return folly::futures::sleep(duration).deferValue(
[f = std::move(f)](auto&&) { return std::move(f)(); });
}
[[noreturn]] static void abort(const char *what) {
LOG(FATAL) << what;
}
// Send the batch and then wait for the substitution
folly::Future<thrift::Subst> send(
const std::shared_ptr<thrift::ComputedBatch>& batch) const {
return communicate([batch](auto client) {
return client->future_sendBatch(*batch);
})
.thenValue([batch, this](thrift::SendResponse&& response) {
switch (response.getType()) {
case thrift::SendResponse::handle:
// Server accepted the batch, now wait
return finish(response.get_handle(), batch).semi();
case thrift::SendResponse::retry:
// Server asked to retry after a delay
return retry(
response.move_retry(), [batch, this] { return send(batch); });
default:
abort("invalid SendResponse");
}
})
.thenError([](const folly::exception_wrapper& error) -> folly::Future<thrift::Subst> {
LOG(FATAL) << "unexpected error: " << error.what();
});
}
// Wait for the substitution for the given handle. Resend the batch if the
// server forgot the handle.
folly::Future<thrift::Subst> finish(
const std::string& handle,
const std::shared_ptr<thrift::ComputedBatch>& batch) const {
return communicate([handle](auto client) {
return client->future_finishBatch(handle);
})
.thenValue([handle, batch, this](thrift::FinishResponse&& response) {
switch (response.getType()) {
case thrift::FinishResponse::subst:
// We're done
return folly::makeSemiFuture(response.move_subst());
case thrift::FinishResponse::retry:
// Server asked to retry after a delay
return retry(response.move_retry(), [handle, batch, this] {
return finish(handle, batch);
});
default:
abort("invalid FinishResponse");
}
})
.thenError([handle, batch, this](const folly::exception_wrapper& error) -> folly::SemiFuture<thrift::Subst> {
if (error.is_compatible_with<facebook::glean::thrift::UnknownBatchHandle>()) {
// Server forgot the handle, resend the batch
LOG(ERROR) << "server reports unknown handle " << handle;
return this->send(batch).semi();
}
LOG(FATAL) << "unexpected error:" << error.what();
});
}
const std::unique_ptr<thrift::GleanServiceAsyncClient> client;
const Config config;
std::unique_ptr<folly::Future<thrift::Subst>> future;
};
}
#if FACEBOOK
std::unique_ptr<Sender> thriftSender(
const std::string& service,
const std::string& repo_name,
const std::string& repo_hash,
double min_retry_delay,
size_t max_errors) {
return std::make_unique<ThriftSender>(
cpp::service(service),
ThriftSender::Config{
repo_name, repo_hash, min_retry_delay, max_errors
}
);
}
#endif
namespace {
class FileWriter : public Sender {
public:
explicit FileWriter(std::string p) : path(std::move(p)) {}
void rebaseAndSend(BatchBase&, bool) override {
// don't do anything
// NOTE: we ignore the 'wait' flag for now
}
void flush(BatchBase& batch) override {
auto r = batch.serialize();
folly::writeFile(
apache::thrift::CompactSerializer::serialize<std::string>(r),
path.c_str());
}
private:
std::string path;
};
}
std::unique_ptr<Sender> fileWriter(std::string path) {
return std::make_unique<FileWriter>(std::move(path));
}
}
}