bistro/server/main.cpp (112 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <glog/logging.h>
#include <folly/experimental/ThreadedRepeatingFunctionRunner.h>
#include <folly/init/Init.h>
#include <folly/ScopeGuard.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "bistro/bistro/Bistro.h"
#include "bistro/bistro/config/FileConfigLoader.h"
#include "bistro/bistro/monitor/Monitor.h"
#include "bistro/bistro/nodes/NodesLoader.h"
#include "bistro/bistro/runners/NoOpRunner.h"
#include "bistro/bistro/runners/BenchmarkRunner.h"
#include "bistro/bistro/runners/LocalRunner.h"
#include "bistro/bistro/runners/RemoteWorkerRunner.h"
#include "bistro/bistro/scheduler/SchedulerPolicies.h"
#include "bistro/bistro/server/HTTPMonitorServer.h"
#include "bistro/bistro/server/ThriftMonitor.h"
#include "bistro/bistro/statuses/SQLiteTaskStore.h"
#include "bistro/bistro/statuses/TaskStatuses.h"
DECLARE_int32(server_port); // from server_socket.cpp
DEFINE_string(config_file, "", "File to use for resource and job config");
DEFINE_int32(config_update_ms, 10000, "How often to re-read the config");
DEFINE_int32(nodes_update_ms, 300000, "How often to re-read the nodes");
DEFINE_int32(
nodes_retry_ms, 30000,
"How often to re-try reading the nodes after failures"
);
DEFINE_bool(clean_statuses, false, "If true, don't read/write statuses");
DEFINE_string(status_table, "", "Name of DB table that stores statuses.");
DEFINE_bool(
dry_run,
false,
"If true, don't actually run any tasks. Also implies clean_statuses"
);
DEFINE_bool(
benchmark_run,
false,
"Runs sleep tasks with configurable duration and failure, Also implies"
"clean_statuses"
);
DEFINE_string(worker_command, "", "If set, use local task mode");
DEFINE_string(data_dir, "/data/bistro", "Data for local tasks is stored here");
using namespace std;
using namespace facebook::bistro;
int main(int argc, char* argv[]) {
FLAGS_logtostderr = 1;
folly::init(&argc, &argv);
registerDefaultSchedulerPolicies();
boost::filesystem::path config_file(FLAGS_config_file);
auto config_loader = make_shared<FileConfigLoader>(
std::chrono::milliseconds(FLAGS_config_update_ms),
boost::filesystem::absolute(config_file)
);
auto nodes_loader = make_shared<NodesLoader>(
config_loader,
std::chrono::milliseconds(FLAGS_nodes_update_ms),
std::chrono::milliseconds(FLAGS_nodes_retry_ms)
);
shared_ptr<TaskStore> task_store;
if (FLAGS_clean_statuses || FLAGS_dry_run || FLAGS_benchmark_run) {
task_store.reset(new NoOpTaskStore());
} else {
CHECK(!FLAGS_status_table.empty()) << "You must pass --status_table";
task_store.reset(new SQLiteTaskStore(FLAGS_data_dir, FLAGS_status_table));
}
auto task_statuses = make_shared<TaskStatuses>(task_store);
auto monitor = make_shared<Monitor>(
config_loader,
nodes_loader,
task_statuses
);
shared_ptr<TaskRunner> task_runner;
if (FLAGS_dry_run) {
task_runner.reset(new NoOpRunner());
} else if (FLAGS_benchmark_run) {
task_runner.reset(new BenchmarkRunner());
} else if (!FLAGS_worker_command.empty()) {
task_runner.reset(new LocalRunner(FLAGS_worker_command, FLAGS_data_dir));
} else {
task_runner.reset(new RemoteWorkerRunner(task_statuses, monitor));
}
// Initialize the scheduler thread itself
Bistro bistro(
config_loader,
nodes_loader,
task_statuses,
task_runner,
monitor
);
folly::ThreadedRepeatingFunctionRunner bistro_thread;
bistro_thread.add(
"BistroSchedule", bind(&Bistro::scheduleOnceSystemTime, &bistro)
);
SCOPE_EXIT { bistro_thread.stop(); };
auto http_monitor = make_shared<HTTPMonitor>(
config_loader,
nodes_loader,
task_statuses,
task_runner,
monitor
);
HTTPMonitorServer http_monitor_server(http_monitor);
// Initialize the thrift monitor
auto handler = std::make_unique<ThriftMonitor>(
config_loader,
nodes_loader,
task_statuses,
task_runner,
monitor
);
auto server = make_shared<apache::thrift::ThriftServer>();
server->setPort(FLAGS_server_port);
server->setInterface(std::move(handler));
server->serve();
}