in be/src/common/init.cc [483:692]
void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
TestInfo::Mode test_mode, bool external_fe) {
srand(time(NULL));
BlockImpalaShutdownSignal();
CpuInfo::Init();
DiskInfo::Init();
MemInfo::Init();
OsInfo::Init();
TestInfo::Init(test_mode);
// Set the default hostname. The user can override this with the hostname flag.
ABORT_IF_ERROR(GetHostname(&FLAGS_hostname));
#ifdef NDEBUG
// Symbolize stacktraces by default in debug mode.
FLAGS_symbolize_stacktrace = false;
# else
FLAGS_symbolize_stacktrace = true;
#endif
if (external_fe) {
// Change defaults for flags when loaded as part of external frontend.
// Write logs to stderr by default (otherwise logs get written to
// FeSupport.INFO/ERROR).
FLAGS_logtostderr = true;
// Do not redirct stdout/stderr by default.
FLAGS_redirect_stdout_stderr = false;
}
google::SetVersionString(impala::GetBuildVersion());
google::ParseCommandLineFlags(&argc, &argv, true);
if (!FLAGS_redaction_rules_file.empty()) {
if (VLOG_ROW_IS_ON || !FLAGS_vmodule.empty()) {
CLEAN_EXIT_WITH_ERROR("Redaction cannot be used in combination with log level 3 or "
"higher or the -vmodule option because these log levels may log data in "
"ways redaction rules may not anticipate.");
}
const string& error_message = SetRedactionRulesFromFile(FLAGS_redaction_rules_file);
if (!error_message.empty()) CLEAN_EXIT_WITH_ERROR(error_message);
}
if (FLAGS_read_size < READ_SIZE_MIN_VALUE) {
CLEAN_EXIT_WITH_ERROR(Substitute("read_size can not be lower than $0",
READ_SIZE_MIN_VALUE));
}
bool is_percent = false; // not used
int64_t re2_mem_limit = ParseUtil::ParseMemSpec(FLAGS_re2_mem_limit, &is_percent, 0);
if (re2_mem_limit <= 0) {
CLEAN_EXIT_WITH_ERROR(
Substitute("Invalid mem limit for re2's regex engine: $0", FLAGS_re2_mem_limit));
} else {
StringFunctions::SetRE2MemLimit(re2_mem_limit);
}
if (FLAGS_reserved_words_version != "2.11.0" && FLAGS_reserved_words_version != "3.0.0")
{
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid flag reserved_words_version. The value must"
" be one of [\"2.11.0\", \"3.0.0\"], while the provided value is $0.",
FLAGS_reserved_words_version));
}
// Enforce a minimum value for thrift_max_message_size, as configuring the limit to
// a small value is very unlikely to work.
if (!impala::TestInfo::is_test() && FLAGS_thrift_rpc_max_message_size > 0
&& FLAGS_thrift_rpc_max_message_size < ThriftDefaultMaxMessageSize()) {
CLEAN_EXIT_WITH_ERROR(
Substitute("Invalid $0: $1 is less than the minimum value of $2.",
"thrift_rpc_max_message_size", FLAGS_thrift_rpc_max_message_size,
ThriftDefaultMaxMessageSize()));
}
// Enforce a minimum value for thrift_external_max_message_size, as configuring the
// limit to a small value is very unlikely to work.
if (!impala::TestInfo::is_test() && FLAGS_thrift_external_rpc_max_message_size > 0
&& FLAGS_thrift_external_rpc_max_message_size < ThriftDefaultMaxMessageSize()) {
CLEAN_EXIT_WITH_ERROR(
Substitute("Invalid $0: $1 is less than the minimum value of $2.",
"thrift_external_rpc_max_message_size",
FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize()));
}
impala::InitGoogleLoggingSafe(argv[0]);
// Breakpad needs flags and logging to initialize.
if (!external_fe) {
ABORT_IF_ERROR(RegisterMinidump(argv[0]));
}
impala::InitThreading();
impala::datetime_parse_util::SimpleDateFormatTokenizer::InitCtx();
impala::SeedOpenSSLRNG();
ABORT_IF_ERROR(impala::InitAuth(argv[0]));
// Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading.
Status thread_spawn_status = Thread::Create("common", "log-maintenance-thread",
&LogMaintenanceThread, &log_maintenance_thread);
if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
thread_spawn_status =
Thread::Create("common", "pause-monitor", &PauseMonitorLoop, &pause_monitor);
if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
// Initialize log fault injection if such debug action exist.
if (strstr(FLAGS_debug_actions.c_str(), "LOG_MAINTENANCE_STDERR") != NULL) {
thread_spawn_status = Thread::Create("common", "log-fault-inject-thread",
&LogFaultInjectionThread, &log_fault_inject_thread);
if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
}
// Implement timeout for backend tests.
if (impala::TestInfo::is_be_test()) {
thread_spawn_status = Thread::Create("common", "be-test-timeout-thread",
[]() {
SleepForMs(BE_TEST_TIMEOUT_S * 1000L);
LOG(FATAL) << "Backend test timed out after " << BE_TEST_TIMEOUT_S << "s";
},
&be_timeout_thread);
if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
}
PeriodicCounterUpdater::Init();
LOG(INFO) << impala::GetVersionString();
LOG(INFO) << "Using hostname: " << FLAGS_hostname;
LOG(INFO) << "Using locale: " << std::locale("").name();
impala::LogCommandLineFlags();
// When a process calls send(2) on a socket closed on the other end, linux generates
// SIGPIPE. MSG_NOSIGNAL can be passed to send(2) to disable it, which thrift does. But
// OpenSSL doesn't have place for this parameter so the signal must be disabled
// manually.
signal(SIGPIPE, SIG_IGN);
InitThriftLogging();
LOG(INFO) << CpuInfo::DebugString();
LOG(INFO) << DiskInfo::DebugString();
LOG(INFO) << MemInfo::DebugString();
LOG(INFO) << OsInfo::DebugString();
LOG(INFO) << CGroupUtil::DebugString();
LOG(INFO) << "Process ID: " << getpid();
LOG(INFO) << "Default AES cipher mode for spill-to-disk: "
<< EncryptionKey::ModeToString(EncryptionKey::GetSupportedDefaultMode());
// After initializing logging and printing the machine information, verify the
// minimal CPU requirements and exit if they are not met.
Status cpu_requirement_status = CpuInfo::EnforceCpuRequirements();
if (!cpu_requirement_status.ok()) {
CLEAN_EXIT_WITH_ERROR(cpu_requirement_status.GetDetail());
}
if (FLAGS_use_resolved_hostname) {
IpAddr ip_address;
Status status = HostnameToIpAddr(FLAGS_hostname, &ip_address);
if (!status.ok()) CLEAN_EXIT_WITH_ERROR(status.GetDetail());
LOG(INFO) << Substitute("Resolved hostname $0 to $1", FLAGS_hostname, ip_address);
FLAGS_hostname = ip_address;
}
// Required for the FE's Catalog
ABORT_IF_ERROR(impala::LibCache::Init(external_fe));
Status fs_cache_init_status = impala::HdfsFsCache::Init();
if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail());
if (init_jvm) {
if (!external_fe) {
ABORT_IF_ERROR(InitializeJavaWeigher());
ABORT_IF_ERROR(JavaSetProcessName(filesystem::path(argv[0]).filename().string()));
JniUtil::InitLibhdfs();
}
ABORT_IF_ERROR(JniUtil::Init());
InitJvmLoggingSupport();
if (!external_fe) {
ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor());
}
ZipUtil::InitJvm();
}
if (argc == -1) {
// Should not be called. We need BuiltinsInit() so the builtin symbols are
// not stripped.
DCHECK(false);
ScalarExprEvaluator::InitBuiltinsDummy();
}
if (impala::KuduIsAvailable()) impala::InitKuduLogging();
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// tcmalloc and address sanitizer can not be used together
if (FLAGS_enable_process_lifetime_heap_profiling) {
HeapProfilerStart(FLAGS_heap_profile_dir.c_str());
}
#endif
// Signal handler for handling the SIGTERM. We want to log a message when catalogd or
// impalad or statestored is being shutdown using a SIGTERM.
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));
action.sa_sigaction = &HandleSigTerm;
action.sa_flags = SA_SIGINFO;
if (sigaction(SIGTERM, &action, nullptr) == -1) {
stringstream error_msg;
error_msg << "Failed to register action for SIGTERM: " << GetStrErrMsg();
CLEAN_EXIT_WITH_ERROR(error_msg.str());
}
if (external_fe || test_mode == TestInfo::FE_TEST) {
// Explicitly load the timezone database for external FEs and FE tests.
// Impala daemons load it through ImpaladMain
ABORT_IF_ERROR(TimezoneDatabase::Initialize());
}
}