be/src/common/init.cc (526 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "common/init.h" #include <csignal> #include <regex> #include <boost/filesystem.hpp> #include <gperftools/heap-profiler.h> #include <third_party/lss/linux_syscall_support.h> #include "common/global-flags.h" #include "common/logging.h" #include "common/status.h" #include "exec/kudu/kudu-util.h" #include "exprs/scalar-expr-evaluator.h" #include "exprs/string-functions.h" #include "exprs/timezone_db.h" #include "gutil/atomicops.h" #include "gutil/strings/substitute.h" #include "rpc/authentication.h" #include "rpc/thrift-util.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/datetime-simple-date-format-parser.h" #include "runtime/exec-env.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/lib-cache.h" #include "runtime/mem-tracker.h" #include "service/impala-server.h" #include "util/cgroup-util.h" #include "util/cpu-info.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" #include "util/jni-util.h" #include "util/logging-support.h" #include "util/mem-info.h" #include "util/memory-metrics.h" #include "util/minidump.h" #include "util/network-util.h" #include "util/openssl-util.h" #include "util/os-info.h" #include "util/os-util.h" #include "util/parse-util.h" #include "util/periodic-counter-updater.h" #include "util/pretty-printer.h" #include "util/redactor.h" #include "util/test-info.h" #include "util/thread.h" #include "util/time.h" #include "util/zip-util.h" #include "common/names.h" using namespace impala; namespace filesystem = boost::filesystem; DECLARE_bool(enable_process_lifetime_heap_profiling); DECLARE_string(heap_profile_dir); DECLARE_string(hostname); DECLARE_bool(use_resolved_hostname); // TODO: rename this to be more generic when we have a good CM release to do so. DECLARE_int32(logbufsecs); DECLARE_int32(max_log_files); DECLARE_int32(max_minidumps); DECLARE_string(redaction_rules_file); DECLARE_bool(redirect_stdout_stderr); DECLARE_string(re2_mem_limit); DECLARE_string(reserved_words_version); DECLARE_bool(symbolize_stacktrace); DECLARE_string(debug_actions); DECLARE_int64(thrift_rpc_max_message_size); DECLARE_int64(thrift_external_rpc_max_message_size); DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds " "between memory maintenance iterations"); DEFINE_int64(pause_monitor_sleep_time_ms, 500, "Sleep time in milliseconds for " "pause monitor thread."); DEFINE_int64(pause_monitor_warn_threshold_ms, 10000, "If the pause monitor sleeps " "more than this time period, a warning is logged. If set to 0 or less, pause monitor" " is disabled."); DEFINE_string(local_library_dir, "/tmp", "Scratch space for local fs operations. Currently used for copying " "UDF binaries locally from HDFS and also for initializing the timezone db"); DEFINE_bool(jvm_automatic_add_opens, true, "Adds necessary --add-opens options for core Java modules necessary to correctly " "calculate catalog metadata cache object sizes."); DEFINE_string_hidden(java_weigher, "auto", "Choose between 'jamm' (com.github.jbellis:jamm) and 'sizeof' (org.ehcache:sizeof) " "weighers for determining catalog metadata cache entry size. 'auto' uses 'sizeof' " "for Java 8 - 11, and 'jamm' for Java 15+."); // Defined by glog. This allows users to specify the log level using a glob. For // example -vmodule=*scanner*=3 would enable full logging for scanners. If redaction // is enabled, this option won't be allowed because some logging dumps table data // in ways the authors of redaction rules can't anticipate. DECLARE_string(vmodule); using std::string; // Log maintenance thread that runs periodically. It flushes glog every logbufsecs sec. // glog only automatically flushes the log file if logbufsecs has passed since the // previous flush when a new log is written. That means that on a quiet system, logs // will be buffered indefinitely. It also rotates log files. static unique_ptr<impala::Thread> log_maintenance_thread; // Memory Maintenance thread that runs periodically to free up memory. It does the // following things every memory_maintenance_sleep_time_ms secs: // 1) Releases BufferPool memory that is not currently in use. // 2) Frees excess memory that TCMalloc has left in its pageheap. static unique_ptr<impala::Thread> memory_maintenance_thread; // Shutdown signal handler thread that calls sigwait() on IMPALA_SHUTDOWN_SIGNAL and // initiates a graceful shutdown with a virtually unlimited deadline (one year). static unique_ptr<impala::Thread> shutdown_signal_handler_thread; // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged. static unique_ptr<impala::Thread> pause_monitor; // Thread only used in backend tests to implement a test timeout. static unique_ptr<impala::Thread> be_timeout_thread; // Fault injection thread that is spawned if FLAGS_debug_actions has label // 'LOG_MAINTENANCE_STDERR'. static unique_ptr<impala::Thread> log_fault_inject_thread; // Timeout after 2 hours - backend tests should generally run in minutes or tens of // minutes at worst. #if defined(UNDEFINED_SANITIZER_FULL) static const int64_t BE_TEST_TIMEOUT_S = 60L * 60L * 4L; #else static const int64_t BE_TEST_TIMEOUT_S = 60L * 60L * 2L; #endif #ifdef CODE_COVERAGE_ENABLED extern "C" { void __gcov_flush(); } #endif [[noreturn]] static void LogFaultInjectionThread() { const int64_t sleep_duration = 1; while (true) { sleep(sleep_duration); const int64_t now = MonotonicMillis(); Status status = DebugAction(FLAGS_debug_actions, "LOG_MAINTENANCE_STDERR"); if (!status.ok()) { // Fault injection activated. Print the error message several times to cerr. for (int i = 0; i < 128; i++) { std::cerr << now << " " << i << " " << " LOG_MAINTENANCE_STDERR " << status.msg().msg() << endl; } // Check that impalad can always find INFO and ERROR log path. DCHECK(impala::HasLog(google::INFO)); DCHECK(impala::HasLog(google::ERROR)); } } } [[noreturn]] static void LogMaintenanceThread() { int64_t last_flush = MonotonicMillis(); const int64_t sleep_duration = std::min(1, FLAGS_logbufsecs); while (true) { sleep(sleep_duration); const int64_t now = MonotonicMillis(); bool max_log_file_exceeded = RedirectStdoutStderr() && impala::CheckLogSize(false); if ((now - last_flush) / 1000 < FLAGS_logbufsecs && !max_log_file_exceeded) { continue; } google::FlushLogFiles(google::GLOG_INFO); // Check log size again and force log rotation this time if they still big after // FlushLogFiles. if (max_log_file_exceeded && impala::CheckLogSize(true)) impala::ForceRotateLog(); // No need to rotate log files in tests. if (impala::TestInfo::is_test()) continue; // Reattach stdout and stderr if necessary. if (impala::RedirectStdoutStderr()) impala::AttachStdoutStderr(); // Check for log rotation in every interval of the maintenance thread impala::CheckAndRotateLogFiles(FLAGS_max_log_files); // Check for minidump rotation in every interval of the maintenance thread. This is // necessary since an arbitrary number of minidumps can be written by sending SIGUSR1 // to the process. impala::CheckAndRotateMinidumps(FLAGS_max_minidumps); // update last_flush. last_flush = MonotonicMillis(); } } [[noreturn]] static void MemoryMaintenanceThread() { while (true) { SleepForMs(FLAGS_memory_maintenance_sleep_time_ms); impala::ExecEnv* env = impala::ExecEnv::GetInstance(); // ExecEnv may not have been created yet or this may be the catalogd or statestored, // which don't have ExecEnvs. if (env != nullptr) { BufferPool* buffer_pool = env->buffer_pool(); if (buffer_pool != nullptr) buffer_pool->Maintenance(); // The process limit as measured by our trackers may get out of sync with the // process usage if memory is allocated or freed without updating a MemTracker. // The metric is refreshed whenever memory is consumed or released via a MemTracker, // so on a system with queries executing it will be refreshed frequently. However // if the system is idle, we need to refresh the tracker occasionally since // untracked memory may be allocated or freed, e.g. by background threads. if (env->process_mem_tracker() != nullptr) { env->process_mem_tracker()->RefreshConsumptionFromMetric(); } } // Periodically refresh values of the aggregate memory metrics to ensure they are // somewhat up-to-date. AggregateMemoryMetrics::Refresh(); } } [[noreturn]] static void ImpalaShutdownSignalHandler() { sigset_t signals; CHECK_EQ(0, sigemptyset(&signals)); CHECK_EQ(0, sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL)); DCHECK(ExecEnv::GetInstance() != nullptr); DCHECK(ExecEnv::GetInstance()->impala_server() != nullptr); ImpalaServer* impala_server = ExecEnv::GetInstance()->impala_server(); while (true) { int signal; int err = sigwait(&signals, &signal); CHECK(err == 0) << "sigwait(): " << GetStrErrMsg(err) << ": " << err; CHECK_EQ(IMPALA_SHUTDOWN_SIGNAL, signal); ShutdownStatusPB shutdown_status; Status status = impala_server->StartShutdown(-1, &shutdown_status); if (!status.ok()) { LOG(ERROR) << "Shutdown signal received but unable to initiate shutdown. Status: " << status.GetDetail(); continue; } LOG(INFO) << "Shutdown signal received. Current Shutdown Status: " << ImpalaServer::ShutdownStatusToString(shutdown_status); } } static void PauseMonitorLoop() { if (FLAGS_pause_monitor_warn_threshold_ms <= 0) return; int64_t time_before_sleep = MonotonicMillis(); while (true) { SleepForMs(FLAGS_pause_monitor_sleep_time_ms); int64_t sleep_time = MonotonicMillis() - time_before_sleep; time_before_sleep += sleep_time; if (sleep_time > FLAGS_pause_monitor_warn_threshold_ms) { LOG(WARNING) << "A process pause was detected for approximately " << PrettyPrinter::Print(sleep_time, TUnit::TIME_MS); } } } // Signal handler for SIGTERM, that prints the message before doing an exit. [[noreturn]] static void HandleSigTerm(int signum, siginfo_t* info, void* context) { const char* msg = "Caught signal: SIGTERM. Daemon will exit.\n"; sys_write(STDOUT_FILENO, msg, strlen(msg)); #ifdef CODE_COVERAGE_ENABLED // On some systems __gcov_flush() only flushes a small subset of the coverage data. // If you run into this problem, there is a workaround that you can use at your own // risk: instead of calling __gcov_flush() and _exit(0) try to invoke exit(0) (no // underscore). You should only do this in your dev environment. __gcov_flush(); #endif // _exit() is async signal safe and is equivalent to the behaviour of the default // SIGTERM handler. exit() can run arbitrary code and is *not* safe to use here. _exit(0); } // Helper method that checks the return value of a syscall passed through // 'syscall_ret_val'. If it indicates an error, it writes an error message to stderr along // with the error string fetched via errno and calls exit(). void AbortIfError(const int syscall_ret_val, const string& msg) { if (syscall_ret_val == 0) return; cerr << Substitute("$0 Error: $1", msg, GetStrErrMsg()); exit(1); } // Blocks the IMPALA_SHUTDOWN_SIGNAL signal. Should be called by the process before // spawning any other threads to make sure it gets blocked in all threads and will only be // caught by the thread waiting on it. void BlockImpalaShutdownSignal() { const string error_msg = "Failed to block IMPALA_SHUTDOWN_SIGNAL for all threads."; sigset_t signals; AbortIfError(sigemptyset(&signals), error_msg); AbortIfError(sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL), error_msg); AbortIfError(pthread_sigmask(SIG_BLOCK, &signals, nullptr), error_msg); } // Returns Java major version, such as 8, 11, or 17. static int GetJavaMajorVersion() { string cmd = "java"; const char* java_home = getenv("JAVA_HOME"); if (java_home != NULL) { cmd = (filesystem::path(java_home) / "bin" / "java").string(); } cmd += " -version 2>&1"; string msg; if (!RunShellProcess(cmd, &msg, false, {"JAVA_TOOL_OPTIONS"})) { LOG(INFO) << Substitute("Unable to determine Java version (default to 8): $0", msg); return 8; } // Find a version string in the first line. string first_line; std::getline(istringstream(msg), first_line); // Need to allow for a wide variety of formats for different JDK implementations. // Example: openjdk version "11.0.19" 2023-04-18 std::regex java_version_pattern("\"([0-9]{1,3})\\.[0-9]+\\.[0-9]+[^\"]*\""); std::smatch matches; if (!std::regex_search(first_line, matches, java_version_pattern)) { LOG(INFO) << Substitute("Unable to determine Java version (default to 8): $0", msg); return 8; } DCHECK_EQ(matches.size(), 2); return std::stoi(matches.str(1)); } // Append the javaagent arg to JAVA_TOOL_OPTIONS to load jamm. static Status JavaAddJammAgent() { stringstream val_out; char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); if (current_val_c != NULL) { val_out << current_val_c << " "; } istringstream classpath {getenv("CLASSPATH")}; string jamm_path, test_path; while (getline(classpath, test_path, ':')) { Status status = FileSystemUtil::FindFileInPath(test_path, "jamm-.*.jar", &jamm_path); // Error during FindFileInPath is not fatal if jamm path is found in another path. if (!status.ok()) { LOG(ERROR) << "Error when processing class path: " << status.msg().msg(); } if (!jamm_path.empty()) break; } if (jamm_path.empty()) { return Status("Could not find jamm-*.jar in Java CLASSPATH"); } val_out << "-javaagent:" << jamm_path; if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); } return Status::OK(); } // Append add-opens args to JAVA_TOOL_OPTIONS for ehcache and jamm. static Status JavaAddOpens(bool useSizeOf) { if (!FLAGS_jvm_automatic_add_opens) return Status::OK(); stringstream val_out; char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); if (current_val_c != NULL) { val_out << current_val_c; } for (const string& param : { // Needed for jamm and ehcache (jamm needs it to access lambdas) "--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.nio=ALL-UNNAMED", "--add-opens=java.base/java.util.regex=ALL-UNNAMED", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" }) { val_out << " " << param; } if (useSizeOf) { for (const string& param : { // Only needed for ehcache "--add-opens=java.base/java.io=ALL-UNNAMED", "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", "--add-opens=java.base/java.lang.module=ALL-UNNAMED", "--add-opens=java.base/java.lang.ref=ALL-UNNAMED", "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", "--add-opens=java.base/java.net=ALL-UNNAMED", "--add-opens=java.base/java.nio.charset=ALL-UNNAMED", "--add-opens=java.base/java.nio.file.attribute=ALL-UNNAMED", "--add-opens=java.base/java.security=ALL-UNNAMED", "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED", "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", "--add-opens=java.base/java.util.jar=ALL-UNNAMED", "--add-opens=java.base/java.util.zip=ALL-UNNAMED", "--add-opens=java.base/java.util=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.math=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.module=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.perf=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.platform=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.platform.cgroupv1=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED", "--add-opens=java.base/jdk.internal.util.jar=ALL-UNNAMED", "--add-opens=java.base/sun.net.www.protocol.jar=ALL-UNNAMED", "--add-opens=java.base/sun.nio.fs=ALL-UNNAMED", "--add-opens=jdk.dynalink/jdk.dynalink.beans=ALL-UNNAMED", "--add-opens=jdk.dynalink/jdk.dynalink.linker.support=ALL-UNNAMED", "--add-opens=jdk.dynalink/jdk.dynalink.linker=ALL-UNNAMED", "--add-opens=jdk.dynalink/jdk.dynalink.support=ALL-UNNAMED", "--add-opens=jdk.dynalink/jdk.dynalink=ALL-UNNAMED", "--add-opens=jdk.management.jfr/jdk.management.jfr=ALL-UNNAMED", "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED" }) { val_out << " " << param; } } if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); } return Status::OK(); } static Status InitializeJavaWeigher() { // This is set up so the default if things go wrong is to continue using ehcache.sizeof. int version = GetJavaMajorVersion(); if (FLAGS_java_weigher == "auto") { // Update for backend-gflag-util.cc setting use_jamm_weigher. FLAGS_java_weigher = (version >= 15) ? "jamm" : "sizeof"; } LOG(INFO) << "Using Java weigher " << FLAGS_java_weigher; if (FLAGS_java_weigher == "jamm") { RETURN_IF_ERROR(JavaAddJammAgent()); } if (version >= 9) { // add-opens is only supported in Java 9+. RETURN_IF_ERROR(JavaAddOpens(FLAGS_java_weigher != "jamm")); } return Status::OK(); } static Status JavaSetProcessName(const string& name) { string current_val; char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); if (current_val_c != NULL) { current_val = current_val_c; } if (!current_val.empty() && current_val.find("-Dsun.java.command") != string::npos) { LOG(WARNING) << "Overriding sun.java.command in JAVA_TOOL_OPTIONS to " << name; } stringstream val_out; if (!current_val.empty()) { val_out << current_val << " "; } // Set sun.java.command so jps reports the name correctly, and ThreadNameAnnotator can // use the process name for the main thread (and correctly restore the process name). val_out << "-Dsun.java.command=" << name; if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); } return Status::OK(); } void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm, TestInfo::Mode test_mode, bool external_fe) { srand(time(NULL)); BlockImpalaShutdownSignal(); CpuInfo::Init(); DiskInfo::Init(); MemInfo::Init(); OsInfo::Init(); TestInfo::Init(test_mode); // Set the default hostname. The user can override this with the hostname flag. ABORT_IF_ERROR(GetHostname(&FLAGS_hostname)); #ifdef NDEBUG // Symbolize stacktraces by default in debug mode. FLAGS_symbolize_stacktrace = false; # else FLAGS_symbolize_stacktrace = true; #endif if (external_fe) { // Change defaults for flags when loaded as part of external frontend. // Write logs to stderr by default (otherwise logs get written to // FeSupport.INFO/ERROR). FLAGS_logtostderr = true; // Do not redirct stdout/stderr by default. FLAGS_redirect_stdout_stderr = false; } google::SetVersionString(impala::GetBuildVersion()); google::ParseCommandLineFlags(&argc, &argv, true); if (!FLAGS_redaction_rules_file.empty()) { if (VLOG_ROW_IS_ON || !FLAGS_vmodule.empty()) { CLEAN_EXIT_WITH_ERROR("Redaction cannot be used in combination with log level 3 or " "higher or the -vmodule option because these log levels may log data in " "ways redaction rules may not anticipate."); } const string& error_message = SetRedactionRulesFromFile(FLAGS_redaction_rules_file); if (!error_message.empty()) CLEAN_EXIT_WITH_ERROR(error_message); } if (FLAGS_read_size < READ_SIZE_MIN_VALUE) { CLEAN_EXIT_WITH_ERROR(Substitute("read_size can not be lower than $0", READ_SIZE_MIN_VALUE)); } bool is_percent = false; // not used int64_t re2_mem_limit = ParseUtil::ParseMemSpec(FLAGS_re2_mem_limit, &is_percent, 0); if (re2_mem_limit <= 0) { CLEAN_EXIT_WITH_ERROR( Substitute("Invalid mem limit for re2's regex engine: $0", FLAGS_re2_mem_limit)); } else { StringFunctions::SetRE2MemLimit(re2_mem_limit); } if (FLAGS_reserved_words_version != "2.11.0" && FLAGS_reserved_words_version != "3.0.0") { CLEAN_EXIT_WITH_ERROR(Substitute("Invalid flag reserved_words_version. The value must" " be one of [\"2.11.0\", \"3.0.0\"], while the provided value is $0.", FLAGS_reserved_words_version)); } // Enforce a minimum value for thrift_max_message_size, as configuring the limit to // a small value is very unlikely to work. if (!impala::TestInfo::is_test() && FLAGS_thrift_rpc_max_message_size > 0 && FLAGS_thrift_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { CLEAN_EXIT_WITH_ERROR( Substitute("Invalid $0: $1 is less than the minimum value of $2.", "thrift_rpc_max_message_size", FLAGS_thrift_rpc_max_message_size, ThriftDefaultMaxMessageSize())); } // Enforce a minimum value for thrift_external_max_message_size, as configuring the // limit to a small value is very unlikely to work. if (!impala::TestInfo::is_test() && FLAGS_thrift_external_rpc_max_message_size > 0 && FLAGS_thrift_external_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { CLEAN_EXIT_WITH_ERROR( Substitute("Invalid $0: $1 is less than the minimum value of $2.", "thrift_external_rpc_max_message_size", FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize())); } impala::InitGoogleLoggingSafe(argv[0]); // Breakpad needs flags and logging to initialize. if (!external_fe) { ABORT_IF_ERROR(RegisterMinidump(argv[0])); } impala::InitThreading(); impala::datetime_parse_util::SimpleDateFormatTokenizer::InitCtx(); impala::SeedOpenSSLRNG(); ABORT_IF_ERROR(impala::InitAuth(argv[0])); // Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading. Status thread_spawn_status = Thread::Create("common", "log-maintenance-thread", &LogMaintenanceThread, &log_maintenance_thread); if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); thread_spawn_status = Thread::Create("common", "pause-monitor", &PauseMonitorLoop, &pause_monitor); if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); // Initialize log fault injection if such debug action exist. if (strstr(FLAGS_debug_actions.c_str(), "LOG_MAINTENANCE_STDERR") != NULL) { thread_spawn_status = Thread::Create("common", "log-fault-inject-thread", &LogFaultInjectionThread, &log_fault_inject_thread); if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); } // Implement timeout for backend tests. if (impala::TestInfo::is_be_test()) { thread_spawn_status = Thread::Create("common", "be-test-timeout-thread", []() { SleepForMs(BE_TEST_TIMEOUT_S * 1000L); LOG(FATAL) << "Backend test timed out after " << BE_TEST_TIMEOUT_S << "s"; }, &be_timeout_thread); if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); } PeriodicCounterUpdater::Init(); LOG(INFO) << impala::GetVersionString(); LOG(INFO) << "Using hostname: " << FLAGS_hostname; LOG(INFO) << "Using locale: " << std::locale("").name(); impala::LogCommandLineFlags(); // When a process calls send(2) on a socket closed on the other end, linux generates // SIGPIPE. MSG_NOSIGNAL can be passed to send(2) to disable it, which thrift does. But // OpenSSL doesn't have place for this parameter so the signal must be disabled // manually. signal(SIGPIPE, SIG_IGN); InitThriftLogging(); LOG(INFO) << CpuInfo::DebugString(); LOG(INFO) << DiskInfo::DebugString(); LOG(INFO) << MemInfo::DebugString(); LOG(INFO) << OsInfo::DebugString(); LOG(INFO) << CGroupUtil::DebugString(); LOG(INFO) << "Process ID: " << getpid(); LOG(INFO) << "Default AES cipher mode for spill-to-disk: " << EncryptionKey::ModeToString(EncryptionKey::GetSupportedDefaultMode()); // After initializing logging and printing the machine information, verify the // minimal CPU requirements and exit if they are not met. Status cpu_requirement_status = CpuInfo::EnforceCpuRequirements(); if (!cpu_requirement_status.ok()) { CLEAN_EXIT_WITH_ERROR(cpu_requirement_status.GetDetail()); } if (FLAGS_use_resolved_hostname) { IpAddr ip_address; Status status = HostnameToIpAddr(FLAGS_hostname, &ip_address); if (!status.ok()) CLEAN_EXIT_WITH_ERROR(status.GetDetail()); LOG(INFO) << Substitute("Resolved hostname $0 to $1", FLAGS_hostname, ip_address); FLAGS_hostname = ip_address; } // Required for the FE's Catalog ABORT_IF_ERROR(impala::LibCache::Init(external_fe)); Status fs_cache_init_status = impala::HdfsFsCache::Init(); if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail()); if (init_jvm) { if (!external_fe) { ABORT_IF_ERROR(InitializeJavaWeigher()); ABORT_IF_ERROR(JavaSetProcessName(filesystem::path(argv[0]).filename().string())); JniUtil::InitLibhdfs(); } ABORT_IF_ERROR(JniUtil::Init()); InitJvmLoggingSupport(); if (!external_fe) { ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor()); } ZipUtil::InitJvm(); } if (argc == -1) { // Should not be called. We need BuiltinsInit() so the builtin symbols are // not stripped. DCHECK(false); ScalarExprEvaluator::InitBuiltinsDummy(); } if (impala::KuduIsAvailable()) impala::InitKuduLogging(); #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER) // tcmalloc and address sanitizer can not be used together if (FLAGS_enable_process_lifetime_heap_profiling) { HeapProfilerStart(FLAGS_heap_profile_dir.c_str()); } #endif // Signal handler for handling the SIGTERM. We want to log a message when catalogd or // impalad or statestored is being shutdown using a SIGTERM. struct sigaction action; memset(&action, 0, sizeof(struct sigaction)); action.sa_sigaction = &HandleSigTerm; action.sa_flags = SA_SIGINFO; if (sigaction(SIGTERM, &action, nullptr) == -1) { stringstream error_msg; error_msg << "Failed to register action for SIGTERM: " << GetStrErrMsg(); CLEAN_EXIT_WITH_ERROR(error_msg.str()); } if (external_fe || test_mode == TestInfo::FE_TEST) { // Explicitly load the timezone database for external FEs and FE tests. // Impala daemons load it through ImpaladMain ABORT_IF_ERROR(TimezoneDatabase::Initialize()); } } Status impala::StartMemoryMaintenanceThread() { DCHECK(AggregateMemoryMetrics::TOTAL_USED != nullptr) << "Mem metrics not registered."; return Thread::Create("common", "memory-maintenance-thread", &MemoryMaintenanceThread, &memory_maintenance_thread); } Status impala::StartImpalaShutdownSignalHandlerThread() { return Thread::Create("common", "shutdown-signal-handler", &ImpalaShutdownSignalHandler, &shutdown_signal_handler_thread); } #if defined(ADDRESS_SANITIZER) // Default ASAN_OPTIONS. Override by setting environment variable $ASAN_OPTIONS. extern "C" const char *__asan_default_options() { // IMPALA-2746: backend tests don't pass with leak sanitizer enabled. return "handle_segv=0 detect_leaks=0 allocator_may_return_null=1"; } #endif #if defined(THREAD_SANITIZER) extern "C" const char* __tsan_default_options() { // Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS. // TSAN and Java don't play nicely together because JVM code is not instrumented with // TSAN. TSAN requires all libs to be compiled with '-fsanitize=thread' (see // https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code), // which is not currently possible for Java code. See // https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520 for efforts to get // TSAN to run against Java code. The flag ignore_noninstrumented_modules tells TSAN to // ignore all interceptors called from any non-instrumented libraries (e.g. Java). return "ignore_noninstrumented_modules=" #if defined(THREAD_SANITIZER_FULL) "0 " #else "1 " #endif "halt_on_error=1 history_size=7 allocator_may_return_null=1 " "suppressions=" THREAD_SANITIZER_SUPPRESSIONS; } #endif // Default UBSAN_OPTIONS. Override by setting environment variable $UBSAN_OPTIONS. #if defined(UNDEFINED_SANITIZER) extern "C" const char *__ubsan_default_options() { return "print_stacktrace=1 suppressions=" UNDEFINED_SANITIZER_SUPPRESSIONS; } #endif