private void startServices()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java [1854:2036]


  private void startServices() throws IOException {
    if (!isStopped() && !isAborted()) {
      initializeThreads();
    }
    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
    this.secureBulkLoadManager.start();

    // Health checker thread.
    if (isHealthCheckerConfigured()) {
      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
    }
    // Executor status collect thread.
    if (
      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
    ) {
      int sleepTime =
        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
        this.metricsRegionServer.getMetricsSource());
    }

    this.walRoller = new LogRoller(this);
    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
    this.procedureResultReporter = new RemoteProcedureResultReporter(this);

    // Create the CompactedFileDischarger chore executorService. This chore helps to
    // remove the compacted files that will no longer be used in reads.
    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
    // 2 mins so that compacted files can be archived before the TTLCleaner runs
    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
    choreService.scheduleChore(compactedFileDischarger);

    // Start executor services
    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
    final int openPriorityRegionThreads =
      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
    executorService.startExecutorService(
      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
        .setCorePoolSize(openPriorityRegionThreads));
    final int closeRegionThreads =
      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
      final int storeScannerParallelSeekThreads =
        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
      executorService.startExecutorService(
        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
    }
    final int logReplayOpsThreads =
      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
    executorService.startExecutorService(
      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
    // Start the threads for compacted files discharger
    final int compactionDischargerThreads =
      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
      .setCorePoolSize(compactionDischargerThreads));
    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
      final int regionReplicaFlushThreads =
        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
      executorService.startExecutorService(executorService.new ExecutorConfig()
        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
        .setCorePoolSize(regionReplicaFlushThreads));
    }
    final int refreshPeerThreads =
      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
    final int replaySyncReplicationWALThreads =
      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
      .setCorePoolSize(replaySyncReplicationWALThreads));
    final int switchRpcThrottleThreads =
      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
    executorService.startExecutorService(
      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
        .setCorePoolSize(switchRpcThrottleThreads));
    final int claimReplicationQueueThreads =
      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
    executorService.startExecutorService(
      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
        .setCorePoolSize(claimReplicationQueueThreads));
    final int rsSnapshotOperationThreads =
      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
    executorService.startExecutorService(
      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
        .setCorePoolSize(rsSnapshotOperationThreads));
    final int rsFlushOperationThreads =
      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
    executorService.startExecutorService(executorService.new ExecutorConfig()
      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));

    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
      uncaughtExceptionHandler);
    if (this.cacheFlusher != null) {
      this.cacheFlusher.start(uncaughtExceptionHandler);
    }
    Threads.setDaemonThreadRunning(this.procedureResultReporter,
      getName() + ".procedureResultReporter", uncaughtExceptionHandler);

    if (this.compactionChecker != null) {
      choreService.scheduleChore(compactionChecker);
    }
    if (this.periodicFlusher != null) {
      choreService.scheduleChore(periodicFlusher);
    }
    if (this.healthCheckChore != null) {
      choreService.scheduleChore(healthCheckChore);
    }
    if (this.executorStatusChore != null) {
      choreService.scheduleChore(executorStatusChore);
    }
    if (this.nonceManagerChore != null) {
      choreService.scheduleChore(nonceManagerChore);
    }
    if (this.storefileRefresher != null) {
      choreService.scheduleChore(storefileRefresher);
    }
    if (this.fsUtilizationChore != null) {
      choreService.scheduleChore(fsUtilizationChore);
    }
    if (this.namedQueueServiceChore != null) {
      choreService.scheduleChore(namedQueueServiceChore);
    }
    if (this.brokenStoreFileCleaner != null) {
      choreService.scheduleChore(brokenStoreFileCleaner);
    }
    if (this.rsMobFileCleanerChore != null) {
      choreService.scheduleChore(rsMobFileCleanerChore);
    }
    if (replicationMarkerChore != null) {
      LOG.info("Starting replication marker chore");
      choreService.scheduleChore(replicationMarkerChore);
    }

    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
    // an unhandled exception, it will just exit.
    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
      uncaughtExceptionHandler);

    // Create the log splitting worker and start it
    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
    // quite a while inside Connection layer. The worker won't be available for other
    // tasks even after current task is preempted after a split task times out.
    Configuration sinkConf = HBaseConfiguration.create(conf);
    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
    if (
      this.csm != null
        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
    ) {
      // SplitLogWorker needs csm. If none, don't start this.
      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
      splitLogWorker.start();
      LOG.debug("SplitLogWorker started");
    }

    // Memstore services.
    startHeapMemoryManager();
    // Call it after starting HeapMemoryManager.
    initializeMemStoreChunkCreator(hMemManager);
  }