in llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java [137:385]
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
boolean externalClientCloudSetupEnabled, int externalClientsRpcPort,
int mngPort, int shufflePort, int webPort, String appName) {
super("LlapDaemon");
printAsciiArt();
Preconditions.checkArgument(numExecutors > 0);
Preconditions.checkArgument(srvPort == 0 || (srvPort > 1024 && srvPort < 65536),
"Server RPC Port must be between 1025 and 65535, or 0 automatic selection");
if (externalClientCloudSetupEnabled) {
Preconditions.checkArgument(
externalClientsRpcPort == 0 || (externalClientsRpcPort > 1024 && externalClientsRpcPort < 65536),
"Server RPC port for external clients must be between 1025 and 65535, or 0 automatic selection");
}
Preconditions.checkArgument(mngPort == 0 || (mngPort > 1024 && mngPort < 65536),
"Management RPC Port must be between 1025 and 65535, or 0 automatic selection");
Preconditions.checkArgument(localDirs != null && localDirs.length > 0,
"Work dirs must be specified");
Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
"Shuffle Port must be between 1024 and 65535, or 0 for automatic selection");
int outputFormatServicePort = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
Preconditions.checkArgument(outputFormatServicePort == 0
|| (outputFormatServicePort > 1024 && outputFormatServicePort < 65536),
"OutputFormatService Port must be between 1024 and 65535, or 0 for automatic selection");
String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (hosts.startsWith("@")) {
String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM);
LOG.info("Zookeeper Quorum: {}", zkHosts);
Preconditions.checkArgument(zkHosts != null && !zkHosts.trim().isEmpty(),
"LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." +
" hive.zookeeper.quorum must be set.");
}
String hostName = MetricsUtils.getHostName();
try {
// re-login with kerberos. This makes sure all daemons have the same login user.
if (UserGroupInformation.isSecurityEnabled()) {
final String daemonPrincipal = HiveConf.getVar(daemonConf, ConfVars.LLAP_KERBEROS_PRINCIPAL);
final String daemonKeytab = HiveConf.getVar(daemonConf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
LlapUtil.loginWithKerberosAndUpdateCurrentUser(daemonPrincipal, daemonKeytab);
}
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.info("Starting daemon as user: {}", currentUser);
daemonId = new DaemonId(currentUser, LlapUtil.generateClusterName(daemonConf),
hostName, appName, System.currentTimeMillis());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
this.maxJvmMemory = getTotalHeapSize();
this.llapIoEnabled = ioEnabled;
long xmxHeadRoomBytes = determineXmxHeadroom(daemonConf, executorMemoryBytes, maxJvmMemory);
this.executorMemoryPerInstance = executorMemoryBytes - xmxHeadRoomBytes;
this.ioMemoryPerInstance = ioMemoryBytes;
this.numExecutors = numExecutors;
this.localDirs = localDirs;
int waitQueueSize = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
int timedWindowAverageDataPoints = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_DATA_POINTS);
long timedWindowAverageWindowLength = HiveConf.getTimeVar(
daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_WINDOW_LENGTH, TimeUnit.NANOSECONDS);
int simpleAverageWindowDataSize = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS);
Preconditions.checkArgument(timedWindowAverageDataPoints >= 0,
"hive.llap.daemon.metrics.timed.window.average.data.points should be greater or equal to 0");
Preconditions.checkArgument(timedWindowAverageDataPoints == 0 || timedWindowAverageWindowLength > 0,
"hive.llap.daemon.metrics.timed.window.average.window.length should be greater than 0 if " +
"hive.llap.daemon.metrics.average.timed.window.data.points is set fo greater than 0");
Preconditions.checkArgument(simpleAverageWindowDataSize >= 0,
"hive.llap.daemon.metrics.simple.average.data.points should be greater or equal to 0");
if (ioEnabled) {
int numThreads = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
Preconditions.checkArgument(numThreads >= numExecutors,
"hive.llap.io.threadpool.size (%s) should be greater or equal to hive.llap.daemon.num.executors (%s)",
numThreads, numExecutors);
}
final String logMsg = "Attempting to start LlapDaemon with the following configuration: " +
"maxJvmMemory=" + maxJvmMemory + " ("
+ LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
", requestedExecutorMemory=" + executorMemoryBytes +
" (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
", llapIoCacheSize=" + ioMemoryBytes + " ("
+ LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
+ LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
", adjustedExecutorMemory=" + executorMemoryPerInstance +
" (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
", numExecutors=" + numExecutors +
", llapIoEnabled=" + ioEnabled +
", llapIoCacheIsDirect=" + isDirectCache +
", rpcListenerPort=" + srvPort +
", externalClientCloudSetupEnabled=" + externalClientCloudSetupEnabled +
", rpcListenerPortForExternalClients=" + externalClientsRpcPort +
", mngListenerPort=" + mngPort +
", webPort=" + webPort +
", outputFormatSvcPort=" + outputFormatServicePort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
", waitQueueSize= " + waitQueueSize +
", enablePreemption= " + enablePreemption +
", timedWindowAverageDataPoints= " + timedWindowAverageDataPoints +
", timedWindowAverageWindowLength= " + timedWindowAverageWindowLength +
", simpleAverageWindowDataSize= " + simpleAverageWindowDataSize +
", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")";
LOG.info(logMsg);
final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
// Time based log retrieval may not fetch the above log line so logging to stderr for debugging purpose.
System.err.println(currTSISO8601 + " " + logMsg);
long memRequired =
executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0);
// TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver)
Preconditions.checkState(maxJvmMemory >= memRequired,
"Invalid configuration. Xmx value too small. maxAvailable=" + LlapUtil.humanReadableByteCount(maxJvmMemory) +
", configured(exec + io if enabled)=" + LlapUtil.humanReadableByteCount(memRequired));
this.shuffleHandlerConf = new Configuration(daemonConf);
this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort);
this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
StringUtils.arrayToString(localDirs));
this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED,
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED));
// Less frequently set parameter, not passing in as a param.
int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
// Initialize the function localizer.
ClassLoader executorClassLoader = null;
if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS)) {
this.fnLocalizer = new FunctionLocalizer(daemonConf, localDirs[0]);
executorClassLoader = fnLocalizer.getClassLoader();
// Set up the hook that will disallow creating non-whitelisted UDFs anywhere in the plan.
// We are not using a specific hook for GenericUDFBridge - that doesn't work in MiniLlap
// because the daemon is embedded, so the client also gets this hook and Kryo is brittle.
SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(fnLocalizer));
} else {
this.fnLocalizer = null;
SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(new StaticPermanentFunctionChecker(daemonConf)));
executorClassLoader = Thread.currentThread().getContextClassLoader();
}
// Initialize the metrics system
LlapMetricsSystem.initialize("LlapDaemon");
this.pauseMonitor = new JvmPauseMonitor(daemonConf);
pauseMonitor.start();
String displayNameJvm = "LlapDaemonJvmMetrics-" + hostName;
String sessionId = MetricsUtils.getUUID();
LlapDaemonJvmMetrics.create(displayNameJvm, sessionId, daemonConf);
String displayName = "LlapDaemonExecutorMetrics-" + hostName;
daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS);
List<Integer> intervalList = new ArrayList<>();
if (strIntervals != null) {
for (String strInterval : strIntervals) {
try {
intervalList.add(Integer.valueOf(strInterval));
} catch (NumberFormatException e) {
LOG.warn("Ignoring task pre-emption metrics interval {} from {} as it is invalid",
strInterval, Arrays.toString(strIntervals));
}
}
}
this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, waitQueueSize,
Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength,
simpleAverageWindowDataSize);
this.metrics.setMemoryPerInstance(executorMemoryPerInstance);
this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
this.metrics.setJvmMaxMemory(maxJvmMemory);
this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this);
LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
" sessionId: " + sessionId);
int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf);
this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress,
new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory);
SecretManager sm = null;
if (UserGroupInformation.isSecurityEnabled()) {
sm = SecretManager.createSecretManager(daemonConf, daemonId.getClusterString());
this.llapTokenManager = new DefaultLlapTokenManager(daemonConf, sm);
} else {
this.llapTokenManager = new DummyTokenManager();
}
this.secretManager = sm;
this.server = new LlapProtocolServerImpl(secretManager, numHandlers, this, srvAddress, mngAddress, srvPort,
externalClientsRpcPort, mngPort, daemonId, metrics).withTokenManager(this.llapTokenManager);
LlapUgiManager llapUgiManager = LlapUgiManager.getInstance(daemonConf);
QueryTracker queryTracker = new QueryTracker(daemonConf, localDirs,
daemonId.getClusterString());
String waitQueueSchedulerClassName = HiveConf.getVar(
daemonConf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
Scheduler<TaskRunnerCallable> executorService = new TaskExecutorService(numExecutors, waitQueueSize,
waitQueueSchedulerClassName, enablePreemption, executorClassLoader, metrics, null);
addIfService(queryTracker);
addIfService(executorService);
this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors,
this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
amReporter, queryTracker, executorService, daemonId, llapUgiManager, socketFactory);
addIfService(containerRunner);
// Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
this.registry = new LlapRegistryService(true);
// disable web UI in test mode until a specific port was configured
if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)
&& Integer.parseInt(ConfVars.LLAP_DAEMON_WEB_PORT.getDefaultValue()) == webPort) {
LOG.info("Web UI was disabled in test mode because hive.llap.daemon.web.port was not "
+ "specified or has default value ({})", webPort);
this.webServices = null;
} else {
this.webServices = new LlapWebServices(webPort, this, registry);
addIfService(webServices);
}
if (HiveConf.getVar(daemonConf, ConfVars.HIVE_TEST_LOAD_HOSTNAMES).isEmpty()) {
this.llapLoadGeneratorService = null;
} else {
this.llapLoadGeneratorService = new LlapLoadGeneratorService();
addIfService(llapLoadGeneratorService);
}
// Bring up the server only after all other components have started.
addIfService(server);
// AMReporter after the server so that it gets the correct address. It knows how to deal with
// requests before it is started.
addIfService(amReporter);
addIfService(new LocalDirCleaner(localDirs, daemonConf));
}