public LlapDaemon()

in llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java [137:385]


  public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
    boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
                    boolean externalClientCloudSetupEnabled, int externalClientsRpcPort,
      int mngPort, int shufflePort, int webPort, String appName) {
    super("LlapDaemon");

    printAsciiArt();

    Preconditions.checkArgument(numExecutors > 0);
    Preconditions.checkArgument(srvPort == 0 || (srvPort > 1024 && srvPort < 65536),
        "Server RPC Port must be between 1025 and 65535, or 0 automatic selection");
    if (externalClientCloudSetupEnabled) {
      Preconditions.checkArgument(
          externalClientsRpcPort == 0 || (externalClientsRpcPort > 1024 && externalClientsRpcPort < 65536),
          "Server RPC port for external clients must be between 1025 and 65535, or 0 automatic selection");
    }

    Preconditions.checkArgument(mngPort == 0 || (mngPort > 1024 && mngPort < 65536),
        "Management RPC Port must be between 1025 and 65535, or 0 automatic selection");
    Preconditions.checkArgument(localDirs != null && localDirs.length > 0,
        "Work dirs must be specified");
    Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
        "Shuffle Port must be between 1024 and 65535, or 0 for automatic selection");
    int outputFormatServicePort = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
    Preconditions.checkArgument(outputFormatServicePort == 0
        || (outputFormatServicePort > 1024 && outputFormatServicePort < 65536),
        "OutputFormatService Port must be between 1024 and 65535, or 0 for automatic selection");
    String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
    if (hosts.startsWith("@")) {
      String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM);
      LOG.info("Zookeeper Quorum: {}", zkHosts);
      Preconditions.checkArgument(zkHosts != null && !zkHosts.trim().isEmpty(),
          "LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." +
              " hive.zookeeper.quorum must be set.");
    }
    String hostName = MetricsUtils.getHostName();
    try {
      // re-login with kerberos. This makes sure all daemons have the same login user.
      if (UserGroupInformation.isSecurityEnabled()) {
        final String daemonPrincipal = HiveConf.getVar(daemonConf, ConfVars.LLAP_KERBEROS_PRINCIPAL);
        final String daemonKeytab = HiveConf.getVar(daemonConf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
        LlapUtil.loginWithKerberosAndUpdateCurrentUser(daemonPrincipal, daemonKeytab);
      }
      String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
      LOG.info("Starting daemon as user: {}", currentUser);
      daemonId = new DaemonId(currentUser, LlapUtil.generateClusterName(daemonConf),
        hostName, appName, System.currentTimeMillis());
    } catch (IOException ex) {
      throw new RuntimeException(ex);
    }

    this.maxJvmMemory = getTotalHeapSize();
    this.llapIoEnabled = ioEnabled;

    long xmxHeadRoomBytes = determineXmxHeadroom(daemonConf, executorMemoryBytes, maxJvmMemory);
    this.executorMemoryPerInstance = executorMemoryBytes - xmxHeadRoomBytes;
    this.ioMemoryPerInstance = ioMemoryBytes;
    this.numExecutors = numExecutors;
    this.localDirs = localDirs;


    int waitQueueSize = HiveConf.getIntVar(
        daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
    boolean enablePreemption = HiveConf.getBoolVar(
        daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
    int timedWindowAverageDataPoints = HiveConf.getIntVar(
        daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_DATA_POINTS);
    long timedWindowAverageWindowLength = HiveConf.getTimeVar(
        daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_WINDOW_LENGTH, TimeUnit.NANOSECONDS);
    int simpleAverageWindowDataSize = HiveConf.getIntVar(
        daemonConf, ConfVars.LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS);

    Preconditions.checkArgument(timedWindowAverageDataPoints >= 0,
        "hive.llap.daemon.metrics.timed.window.average.data.points should be greater or equal to 0");
    Preconditions.checkArgument(timedWindowAverageDataPoints == 0 || timedWindowAverageWindowLength > 0,
        "hive.llap.daemon.metrics.timed.window.average.window.length should be greater than 0 if " +
            "hive.llap.daemon.metrics.average.timed.window.data.points is set fo greater than 0");
    Preconditions.checkArgument(simpleAverageWindowDataSize >= 0,
        "hive.llap.daemon.metrics.simple.average.data.points should be greater or equal to 0");

    if (ioEnabled) {
      int numThreads = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
      Preconditions.checkArgument(numThreads >= numExecutors,
          "hive.llap.io.threadpool.size (%s) should be greater or equal to hive.llap.daemon.num.executors (%s)",
          numThreads, numExecutors);
    }

    final String logMsg = "Attempting to start LlapDaemon with the following configuration: " +
      "maxJvmMemory=" + maxJvmMemory + " ("
      + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
      ", requestedExecutorMemory=" + executorMemoryBytes +
      " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
      ", llapIoCacheSize=" + ioMemoryBytes + " ("
      + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
      ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
      + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
      ", adjustedExecutorMemory=" + executorMemoryPerInstance +
      " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
      ", numExecutors=" + numExecutors +
      ", llapIoEnabled=" + ioEnabled +
      ", llapIoCacheIsDirect=" + isDirectCache +
      ", rpcListenerPort=" + srvPort +
      ", externalClientCloudSetupEnabled=" + externalClientCloudSetupEnabled +
      ", rpcListenerPortForExternalClients=" + externalClientsRpcPort +
      ", mngListenerPort=" + mngPort +
      ", webPort=" + webPort +
      ", outputFormatSvcPort=" + outputFormatServicePort +
      ", workDirs=" + Arrays.toString(localDirs) +
      ", shufflePort=" + shufflePort +
      ", waitQueueSize= " + waitQueueSize +
      ", enablePreemption= " + enablePreemption +
      ", timedWindowAverageDataPoints= " + timedWindowAverageDataPoints +
      ", timedWindowAverageWindowLength= " + timedWindowAverageWindowLength +
      ", simpleAverageWindowDataSize= " + simpleAverageWindowDataSize +
      ", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")";
    LOG.info(logMsg);
    final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
    // Time based log retrieval may not fetch the above log line so logging to stderr for debugging purpose.
    System.err.println(currTSISO8601 + " " + logMsg);


    long memRequired =
        executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0);
    // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver)
    Preconditions.checkState(maxJvmMemory >= memRequired,
        "Invalid configuration. Xmx value too small. maxAvailable=" + LlapUtil.humanReadableByteCount(maxJvmMemory) +
            ", configured(exec + io if enabled)=" + LlapUtil.humanReadableByteCount(memRequired));

    this.shuffleHandlerConf = new Configuration(daemonConf);
    this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort);
    this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
        StringUtils.arrayToString(localDirs));
    this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED,
        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED));

    // Less frequently set parameter, not passing in as a param.
    int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);

    // Initialize the function localizer.
    ClassLoader executorClassLoader = null;
    if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS)) {
      this.fnLocalizer = new FunctionLocalizer(daemonConf, localDirs[0]);
      executorClassLoader = fnLocalizer.getClassLoader();
      // Set up the hook that will disallow creating non-whitelisted UDFs anywhere in the plan.
      // We are not using a specific hook for GenericUDFBridge - that doesn't work in MiniLlap
      // because the daemon is embedded, so the client also gets this hook and Kryo is brittle.
      SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(fnLocalizer));
    } else {
      this.fnLocalizer = null;
      SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(new StaticPermanentFunctionChecker(daemonConf)));
      executorClassLoader = Thread.currentThread().getContextClassLoader();
    }

    // Initialize the metrics system
    LlapMetricsSystem.initialize("LlapDaemon");
    this.pauseMonitor = new JvmPauseMonitor(daemonConf);
    pauseMonitor.start();
    String displayNameJvm = "LlapDaemonJvmMetrics-" + hostName;
    String sessionId = MetricsUtils.getUUID();
    LlapDaemonJvmMetrics.create(displayNameJvm, sessionId, daemonConf);
    String displayName = "LlapDaemonExecutorMetrics-" + hostName;
    daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
    String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
        HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS);
    List<Integer> intervalList = new ArrayList<>();
    if (strIntervals != null) {
      for (String strInterval : strIntervals) {
        try {
          intervalList.add(Integer.valueOf(strInterval));
        } catch (NumberFormatException e) {
          LOG.warn("Ignoring task pre-emption metrics interval {} from {} as it is invalid",
              strInterval, Arrays.toString(strIntervals));
        }
      }
    }
    this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, waitQueueSize,
        Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength,
        simpleAverageWindowDataSize);
    this.metrics.setMemoryPerInstance(executorMemoryPerInstance);
    this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
    this.metrics.setJvmMaxMemory(maxJvmMemory);
    this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this);
    LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
        " sessionId: " + sessionId);

    int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
    this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf);
    this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress,
        new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory);

    SecretManager sm = null;
    if (UserGroupInformation.isSecurityEnabled()) {
      sm = SecretManager.createSecretManager(daemonConf, daemonId.getClusterString());
      this.llapTokenManager = new DefaultLlapTokenManager(daemonConf, sm);
    } else {
      this.llapTokenManager = new DummyTokenManager();
    }

    this.secretManager = sm;
    this.server = new LlapProtocolServerImpl(secretManager, numHandlers, this, srvAddress, mngAddress, srvPort,
        externalClientsRpcPort, mngPort, daemonId, metrics).withTokenManager(this.llapTokenManager);

    LlapUgiManager llapUgiManager = LlapUgiManager.getInstance(daemonConf);

    QueryTracker queryTracker = new QueryTracker(daemonConf, localDirs,
        daemonId.getClusterString());

    String waitQueueSchedulerClassName = HiveConf.getVar(
        daemonConf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);

    Scheduler<TaskRunnerCallable> executorService = new TaskExecutorService(numExecutors, waitQueueSize,
        waitQueueSchedulerClassName, enablePreemption, executorClassLoader, metrics, null);

    addIfService(queryTracker);
    addIfService(executorService);

    this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors,
        this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
        amReporter, queryTracker, executorService, daemonId, llapUgiManager, socketFactory);
    addIfService(containerRunner);

    // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
    this.registry = new LlapRegistryService(true);

    // disable web UI in test mode until a specific port was configured
    if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)
        && Integer.parseInt(ConfVars.LLAP_DAEMON_WEB_PORT.getDefaultValue()) == webPort) {
      LOG.info("Web UI was disabled in test mode because hive.llap.daemon.web.port was not "
               + "specified or has default value ({})", webPort);
      this.webServices = null;
    } else {
      this.webServices = new LlapWebServices(webPort, this, registry);
      addIfService(webServices);
    }

    if (HiveConf.getVar(daemonConf, ConfVars.HIVE_TEST_LOAD_HOSTNAMES).isEmpty()) {
      this.llapLoadGeneratorService = null;
    } else {
      this.llapLoadGeneratorService = new LlapLoadGeneratorService();
      addIfService(llapLoadGeneratorService);
    }
    // Bring up the server only after all other components have started.
    addIfService(server);
    // AMReporter after the server so that it gets the correct address. It knows how to deal with
    // requests before it is started.
    addIfService(amReporter);
    addIfService(new LocalDirCleaner(localDirs, daemonConf));
  }