proxygen/httpserver/HTTPServer.cpp (244 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <proxygen/httpserver/HTTPServer.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/system/ThreadName.h>
#include <proxygen/httpserver/HTTPServerAcceptor.h>
#include <proxygen/httpserver/SignalHandler.h>
#include <proxygen/httpserver/filters/CompressionFilter.h>
#include <proxygen/httpserver/filters/RejectConnectFilter.h>
#include <wangle/ssl/SSLContextManager.h>
using folly::EventBaseManager;
using folly::IOThreadPoolExecutor;
using folly::ThreadPoolExecutor;
namespace proxygen {
class AcceptorFactory : public wangle::AcceptorFactory {
public:
AcceptorFactory(std::shared_ptr<HTTPServerOptions> options,
std::shared_ptr<HTTPCodecFactory> codecFactory,
AcceptorConfiguration config,
HTTPSession::InfoCallback* sessionInfoCb)
: options_(options),
codecFactory_(codecFactory),
config_(config),
sessionInfoCb_(sessionInfoCb) {
}
std::shared_ptr<wangle::Acceptor> newAcceptor(
folly::EventBase* eventBase) override {
auto acc = std::shared_ptr<HTTPServerAcceptor>(
HTTPServerAcceptor::make(config_, *options_, codecFactory_).release());
if (sessionInfoCb_) {
acc->setSessionInfoCallback(sessionInfoCb_);
}
acc->init(nullptr, eventBase);
return acc;
}
private:
std::shared_ptr<HTTPServerOptions> options_;
std::shared_ptr<HTTPCodecFactory> codecFactory_;
AcceptorConfiguration config_;
HTTPSession::InfoCallback* sessionInfoCb_;
};
HTTPServer::HTTPServer(HTTPServerOptions options)
: options_(std::make_shared<HTTPServerOptions>(std::move(options))) {
if (options_->threads == 0) {
options_->threads = std::thread::hardware_concurrency();
}
// Insert a filter to fail all the CONNECT request, if required
if (!options_->supportsConnect) {
options_->handlerFactories.insert(
options_->handlerFactories.begin(),
std::make_unique<RejectConnectFilterFactory>());
}
// Add Content Compression filter (gzip and maybe zstd), if needed. Should be
// final filter
if (options_->enableContentCompression) {
CompressionFilterFactory::Options opts;
opts.minimumCompressionSize = options_->contentCompressionMinimumSize;
opts.zlibCompressionLevel = options_->contentCompressionLevel;
opts.compressibleContentTypes = options_->contentCompressionTypes;
opts.enableGzip = options_->enableGzipCompression;
if (options_->enableZstdCompression) {
opts.enableZstd = options_->enableZstdCompression;
opts.independentChunks = options_->useZstdIndependentChunks;
opts.zstdCompressionLevel = options_->zstdContentCompressionLevel;
}
options_->handlerFactories.insert(
options_->handlerFactories.begin(),
std::make_unique<CompressionFilterFactory>(opts));
}
}
HTTPServer::~HTTPServer() {
CHECK(!mainEventBase_) << "Forgot to stop() server?";
}
void HTTPServer::bind(std::vector<IPConfig>&& addrs) {
addresses_ = std::move(addrs);
}
void HTTPServer::bind(std::vector<IPConfig> const& addrs) {
addresses_ = addrs;
}
class HandlerCallbacks : public ThreadPoolExecutor::Observer {
public:
explicit HandlerCallbacks(std::shared_ptr<HTTPServerOptions> options)
: options_(options) {
}
void threadStarted(ThreadPoolExecutor::ThreadHandle* h) override {
auto evb = IOThreadPoolExecutor::getEventBase(h);
CHECK(evb) << "Invariant violated - started thread must have an EventBase";
evb->runInEventBaseThread([=]() {
for (auto& factory : options_->handlerFactories) {
factory->onServerStart(evb);
}
});
}
void threadStopped(ThreadPoolExecutor::ThreadHandle* h) override {
IOThreadPoolExecutor::getEventBase(h)->runInEventBaseThread([&]() {
for (auto& factory : options_->handlerFactories) {
factory->onServerStop();
}
});
}
private:
std::shared_ptr<HTTPServerOptions> options_;
};
folly::Expected<folly::Unit, std::exception_ptr> HTTPServer::startTcpServer(
std::shared_ptr<wangle::AcceptorFactory> inputAcceptorFactory,
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor) {
auto accExe = std::make_shared<IOThreadPoolExecutor>(1);
if (!ioExecutor) {
ioExecutor = std::make_shared<IOThreadPoolExecutor>(
options_->threads,
std::make_shared<folly::NamedThreadFactory>("HTTPSrvExec"));
}
auto exeObserver = std::make_shared<HandlerCallbacks>(options_);
// Observer has to be set before bind(), so onServerStart() callbacks run
ioExecutor->addObserver(exeObserver);
try {
FOR_EACH_RANGE(i, 0, addresses_.size()) {
auto accConfig = HTTPServerAcceptor::makeConfig(addresses_[i], *options_);
// If user specified an acceptor factory to use, we will use it.
// Otherwise, we create one for each address.
auto acceptorFactory = inputAcceptorFactory;
if (!acceptorFactory) {
auto codecFactory = addresses_[i].codecFactory;
acceptorFactory = std::make_shared<AcceptorFactory>(
options_, codecFactory, accConfig, sessionInfoCb_);
}
bootstrap_.push_back(wangle::ServerBootstrap<wangle::DefaultPipeline>());
bootstrap_[i].childHandler(acceptorFactory);
if (accConfig.enableTCPFastOpen) {
// We need to do this because wangle's bootstrap has 2 acceptor configs
// and the socketConfig gets passed to the SocketFactory. The number of
// configs should really be one, and when that happens, we can remove
// this code path.
bootstrap_[i].socketConfig.enableTCPFastOpen = true;
bootstrap_[i].socketConfig.fastOpenQueueSize =
accConfig.fastOpenQueueSize;
}
bootstrap_[i].group(accExe, ioExecutor);
if (accConfig.reusePort) {
bootstrap_[i].setReusePort(true);
}
if (options_->preboundSockets_.size() > i) {
bootstrap_[i].bind(std::move(options_->preboundSockets_[i]));
} else {
bootstrap_[i].bind(addresses_[i].address);
}
}
} catch (const std::exception&) {
stop();
return folly::makeUnexpected(std::current_exception());
}
return folly::unit;
}
void HTTPServer::start(
std::function<void()> onSuccess,
std::function<void(std::exception_ptr)> onError,
std::shared_ptr<wangle::AcceptorFactory> acceptorFactory,
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor) {
mainEventBase_ = EventBaseManager::get()->getEventBase();
auto tcpStarted = startTcpServer(acceptorFactory, ioExecutor);
if (tcpStarted.hasError()) {
if (onError) {
onError(tcpStarted.error());
return;
}
std::rethrow_exception(tcpStarted.error());
}
// Install signal handler if required
if (!options_->shutdownOn.empty()) {
signalHandler_ = std::make_unique<SignalHandler>(this);
signalHandler_->install(options_->shutdownOn);
}
// Start the main event loop.
if (onSuccess) {
mainEventBase_->runInLoop([onSuccess(std::move(onSuccess))]() {
// IMPORTANT: Since we may be racing with stop(), we must assume that
// mainEventBase_ can become null the moment that onSuccess is called,
// so this **has** to be queued to run from inside loopForever().
onSuccess();
});
}
mainEventBase_->loopForever();
}
void HTTPServer::stopListening() {
for (auto& bootstrap : bootstrap_) {
bootstrap.stop();
}
}
void HTTPServer::stop() {
stopListening();
for (auto& bootstrap : bootstrap_) {
bootstrap.join();
}
if (signalHandler_) {
signalHandler_.reset();
}
if (mainEventBase_) {
// This HTTPServer object may be destoyed by the main thread once
// terminateLoopSoon() is called, so terminateLoopSoon() should be the last
// operation here.
std::exchange(mainEventBase_, nullptr)->terminateLoopSoon();
}
}
const std::vector<const folly::AsyncSocketBase*> HTTPServer::getSockets()
const {
std::vector<const folly::AsyncSocketBase*> sockets;
FOR_EACH_RANGE(i, 0, bootstrap_.size()) {
auto& bootstrapSockets = bootstrap_[i].getSockets();
FOR_EACH_RANGE(j, 0, bootstrapSockets.size()) {
sockets.push_back(bootstrapSockets[j].get());
}
}
return sockets;
}
int HTTPServer::getListenSocket() const {
if (bootstrap_.size() == 0) {
return -1;
}
auto& bootstrapSockets = bootstrap_[0].getSockets();
if (bootstrapSockets.size() == 0) {
return -1;
}
auto serverSocket =
std::dynamic_pointer_cast<folly::AsyncServerSocket>(bootstrapSockets[0]);
auto socketFds = serverSocket->getNetworkSockets();
if (socketFds.size() == 0) {
return -1;
}
return socketFds[0].toFd();
}
void HTTPServer::updateTLSCredentials() {
for (auto& bootstrap : bootstrap_) {
bootstrap.forEachWorker([&](wangle::Acceptor* acceptor) {
if (!acceptor || !acceptor->isSSL()) {
return;
}
auto evb = acceptor->getEventBase();
if (!evb) {
return;
}
evb->runInEventBaseThread(
[acceptor] { acceptor->resetSSLContextConfigs(); });
});
}
}
void HTTPServer::updateTicketSeeds(wangle::TLSTicketKeySeeds seeds) {
for (auto& bootstrap : bootstrap_) {
bootstrap.forEachWorker([&](wangle::Acceptor* acceptor) {
if (!acceptor || !acceptor->isSSL()) {
return;
}
auto evb = acceptor->getEventBase();
if (!evb) {
return;
}
evb->runInEventBaseThread([acceptor, seeds] {
acceptor->setTLSTicketSecrets(
seeds.oldSeeds, seeds.currentSeeds, seeds.newSeeds);
});
});
}
}
} // namespace proxygen