private void finishActiveMasterInitialization()

in hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java [965:1396]


  private void finishActiveMasterInitialization() throws IOException, InterruptedException,
    KeeperException, ReplicationException, DeserializationException {
    /*
     * We are active master now... go initialize components we need to run.
     */
    startupTaskGroup.addTask("Initializing Master file system");

    this.masterActiveTime = EnvironmentEdgeManager.currentTime();
    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.

    // always initialize the MemStoreLAB as we use a region to store data in master now, see
    // localStore.
    initializeMemStoreChunkCreator(null);
    this.fileSystemManager = new MasterFileSystem(conf);
    this.walManager = new MasterWalManager(this);

    // warm-up HTDs cache on master initialization
    if (preLoadTableDescriptors) {
      startupTaskGroup.addTask("Pre-loading table descriptors");
      this.tableDescriptors.getAll();
    }

    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
    // before it has called its run method and before RegionServer has done the reportForDuty.
    ClusterId clusterId = fileSystemManager.getClusterId();
    startupTaskGroup.addTask("Publishing Cluster ID " + clusterId + " in ZooKeeper");
    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
    this.clusterId = clusterId.toString();

    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
    // hbase.write.hbck1.lock.file to false.
    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
      Pair<Path, FSDataOutputStream> result = null;
      try {
        result = HBaseFsck.checkAndMarkRunningHbck(this.conf,
          HBaseFsck.createLockRetryCounterFactory(this.conf).create());
      } finally {
        if (result != null) {
          Closeables.close(result.getSecond(), true);
        }
      }
    }

    startupTaskGroup.addTask("Initialize ServerManager and schedule SCP for crash servers");
    // The below two managers must be created before loading procedures, as they will be used during
    // loading.
    // initialize master local region
    masterRegion = MasterRegionFactory.create(this);
    rsListStorage = new MasterRegionServerList(masterRegion, this);

    // Initialize the ServerManager and register it as a configuration observer
    this.serverManager = createServerManager(this, rsListStorage);
    this.configurationManager.registerObserver(this.serverManager);

    this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
    if (
      !conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
    ) {
      this.splitWALManager = new SplitWALManager(this);
    }

    tryMigrateMetaLocationsFromZooKeeper();

    createProcedureExecutor();
    Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
      .getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));

    // Create Assignment Manager
    this.assignmentManager = createAssignmentManager(this, masterRegion);
    this.assignmentManager.start();
    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
    // completed, it could still be in the procedure list. This is a bit strange but is another
    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
    List<TransitRegionStateProcedure> ritList =
      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
        .collect(Collectors.toList());
    this.assignmentManager.setupRIT(ritList);

    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
    // be registered in the deadServers set -- and the servernames loaded from the WAL directory
    // and master local region that COULD BE 'alive'(we'll schedule SCPs for each and let SCP figure
    // it out).
    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
    this.regionServerTracker.upgrade(
      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
        .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
      Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
      walManager.getSplittingServersFromWALDir());
    // This manager must be accessed AFTER hbase:meta is confirmed on line..
    this.tableStateManager = new TableStateManager(this);

    startupTaskGroup.addTask("Initializing ZK system trackers");
    initializeZKBasedSystemTrackers();
    startupTaskGroup.addTask("Loading last flushed sequence id of regions");
    try {
      this.serverManager.loadLastFlushedSequenceIds();
    } catch (IOException e) {
      LOG.info("Failed to load last flushed sequence id of regions" + " from file system", e);
    }
    // Set ourselves as active Master now our claim has succeeded up in zk.
    this.activeMaster = true;

    // Start the Zombie master detector after setting master as active, see HBASE-21535
    Thread zombieDetector = new Thread(new MasterInitializationMonitor(this),
      "ActiveMasterInitializationMonitor-" + EnvironmentEdgeManager.currentTime());
    zombieDetector.setDaemon(true);
    zombieDetector.start();

    if (!maintenanceMode) {
      startupTaskGroup.addTask("Initializing master coprocessors");
      setQuotasObserver(conf);
      initializeCoprocessorHost(conf);
    } else {
      // start an in process region server for carrying system regions
      maintenanceRegionServer =
        JVMClusterUtil.createRegionServerThread(getConfiguration(), HRegionServer.class, 0);
      maintenanceRegionServer.start();
    }

    // Checking if meta needs initializing.
    startupTaskGroup.addTask("Initializing meta table if this is a new deploy");
    InitMetaProcedure initMetaProc = null;
    // Print out state of hbase:meta on startup; helps debugging.
    if (!this.assignmentManager.getRegionStates().hasTableRegionStates(TableName.META_TABLE_NAME)) {
      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
      initMetaProc = optProc.orElseGet(() -> {
        // schedule an init meta procedure if meta has not been deployed yet
        InitMetaProcedure temp = new InitMetaProcedure();
        procedureExecutor.submitProcedure(temp);
        return temp;
      });
    }

    // initialize load balancer
    this.balancer.setMasterServices(this);
    this.balancer.initialize();
    this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

    // try migrate replication data
    ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
      new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
    // check whether there are something to migrate and we haven't scheduled a migration procedure
    // yet
    if (
      oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
        .allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
    ) {
      procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
    }
    // start up all service threads.
    startupTaskGroup.addTask("Initializing master service threads");
    startServiceThreads();
    // wait meta to be initialized after we start procedure executor
    if (initMetaProc != null) {
      initMetaProc.await();
    }
    // Wake up this server to check in
    sleeper.skipSleepCycle();

    // Wait for region servers to report in.
    // With this as part of master initialization, it precludes our being able to start a single
    // server that is both Master and RegionServer. Needs more thought. TODO.
    String statusStr = "Wait for region servers to report in";
    MonitoredTask waitRegionServer = startupTaskGroup.addTask(statusStr);
    LOG.info(Objects.toString(waitRegionServer));
    waitForRegionServers(waitRegionServer);

    // Check if master is shutting down because issue initializing regionservers or balancer.
    if (isStopped()) {
      return;
    }

    startupTaskGroup.addTask("Starting assignment manager");
    // FIRST HBASE:META READ!!!!
    // The below cannot make progress w/o hbase:meta being online.
    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
    // if it is down. It may take a while to come online. So, wait here until meta if for sure
    // available. That's what waitForMetaOnline does.
    if (!waitForMetaOnline()) {
      return;
    }

    TableDescriptor metaDescriptor = tableDescriptors.get(TableName.META_TABLE_NAME);
    final ColumnFamilyDescriptor tableFamilyDesc =
      metaDescriptor.getColumnFamily(HConstants.TABLE_FAMILY);
    final ColumnFamilyDescriptor replBarrierFamilyDesc =
      metaDescriptor.getColumnFamily(HConstants.REPLICATION_BARRIER_FAMILY);

    this.assignmentManager.joinCluster();
    // The below depends on hbase:meta being online.
    this.assignmentManager.processOfflineRegions();
    // this must be called after the above processOfflineRegions to prevent race
    this.assignmentManager.wakeMetaLoadedEvent();

    // for migrating from a version without HBASE-25099, and also for honoring the configuration
    // first.
    if (conf.get(HConstants.META_REPLICAS_NUM) != null) {
      int replicasNumInConf =
        conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM);
      TableDescriptor metaDesc = tableDescriptors.get(TableName.META_TABLE_NAME);
      if (metaDesc.getRegionReplication() != replicasNumInConf) {
        // it is possible that we already have some replicas before upgrading, so we must set the
        // region replication number in meta TableDescriptor directly first, without creating a
        // ModifyTableProcedure, otherwise it may cause a double assign for the meta replicas.
        int existingReplicasCount =
          assignmentManager.getRegionStates().getRegionsOfTable(TableName.META_TABLE_NAME).size();
        if (existingReplicasCount > metaDesc.getRegionReplication()) {
          LOG.info("Update replica count of hbase:meta from {}(in TableDescriptor)"
            + " to {}(existing ZNodes)", metaDesc.getRegionReplication(), existingReplicasCount);
          metaDesc = TableDescriptorBuilder.newBuilder(metaDesc)
            .setRegionReplication(existingReplicasCount).build();
          tableDescriptors.update(metaDesc);
        }
        // check again, and issue a ModifyTableProcedure if needed
        if (metaDesc.getRegionReplication() != replicasNumInConf) {
          LOG.info(
            "The {} config is {} while the replica count in TableDescriptor is {}"
              + " for hbase:meta, altering...",
            HConstants.META_REPLICAS_NUM, replicasNumInConf, metaDesc.getRegionReplication());
          procedureExecutor.submitProcedure(new ModifyTableProcedure(
            procedureExecutor.getEnvironment(), TableDescriptorBuilder.newBuilder(metaDesc)
              .setRegionReplication(replicasNumInConf).build(),
            null, metaDesc, false, true));
        }
      }
    }
    // Initialize after meta is up as below scans meta
    FavoredNodesManager fnm = getFavoredNodesManager();
    if (fnm != null) {
      fnm.initializeFromMeta();
    }

    // set cluster status again after user regions are assigned
    this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

    // Start balancer and meta catalog janitor after meta and regions have been assigned.
    startupTaskGroup.addTask("Starting balancer and catalog janitor");
    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
    getChoreService().scheduleChore(clusterStatusChore);
    this.balancerChore = new BalancerChore(this);
    if (!disableBalancerChoreForTest) {
      getChoreService().scheduleChore(balancerChore);
    }
    if (regionNormalizerManager != null) {
      getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
    }
    this.catalogJanitorChore = new CatalogJanitor(this);
    getChoreService().scheduleChore(catalogJanitorChore);
    this.hbckChore = new HbckChore(this);
    getChoreService().scheduleChore(hbckChore);
    this.serverManager.startChore();

    // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
    if (!waitForNamespaceOnline()) {
      return;
    }
    startupTaskGroup.addTask("Starting cluster schema service");
    try {
      initClusterSchemaService();
    } catch (IllegalStateException e) {
      if (
        e.getCause() != null && e.getCause() instanceof NoSuchColumnFamilyException
          && tableFamilyDesc == null && replBarrierFamilyDesc == null
      ) {
        LOG.info("ClusterSchema service could not be initialized. This is "
          + "expected during HBase 1 to 2 upgrade", e);
      } else {
        throw e;
      }
    }

    if (this.cpHost != null) {
      try {
        this.cpHost.preMasterInitialization();
      } catch (IOException e) {
        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
      }
    }

    LOG.info(String.format("Master has completed initialization %.3fsec",
      (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
    this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
    configurationManager.registerObserver(this.balancer);
    configurationManager.registerObserver(this.logCleanerPool);
    configurationManager.registerObserver(this.logCleaner);
    configurationManager.registerObserver(this.regionsRecoveryConfigManager);
    configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
    if (this.sharedHFileCleanerPool != null) {
      configurationManager.registerObserver(this.sharedHFileCleanerPool);
    }
    if (this.hfileCleaners != null) {
      for (HFileCleaner cleaner : hfileCleaners) {
        configurationManager.registerObserver(cleaner);
      }
    }
    // Set master as 'initialized'.
    setInitialized(true);
    startupTaskGroup.markComplete("Initialization successful");
    MonitoredTask status =
      TaskMonitor.get().createStatus("Progress after master initialized", false, true);

    if (tableFamilyDesc == null && replBarrierFamilyDesc == null) {
      // create missing CFs in meta table after master is set to 'initialized'.
      createMissingCFsInMetaDuringUpgrade(metaDescriptor);

      // Throwing this Exception to abort active master is painful but this
      // seems the only way to add missing CFs in meta while upgrading from
      // HBase 1 to 2 (where HBase 2 has HBASE-23055 & HBASE-23782 checked-in).
      // So, why do we abort active master after adding missing CFs in meta?
      // When we reach here, we would have already bypassed NoSuchColumnFamilyException
      // in initClusterSchemaService(), meaning ClusterSchemaService is not
      // correctly initialized but we bypassed it. Similarly, we bypassed
      // tableStateManager.start() as well. Hence, we should better abort
      // current active master because our main task - adding missing CFs
      // in meta table is done (possible only after master state is set as
      // initialized) at the expense of bypassing few important tasks as part
      // of active master init routine. So now we abort active master so that
      // next active master init will not face any issues and all mandatory
      // services will be started during master init phase.
      throw new PleaseRestartMasterException("Aborting active master after missing"
        + " CFs are successfully added in meta. Subsequent active master "
        + "initialization should be uninterrupted");
    }

    if (maintenanceMode) {
      LOG.info("Detected repair mode, skipping final initialization steps.");
      return;
    }

    assignmentManager.checkIfShouldMoveSystemRegionAsync();
    status.setStatus("Starting quota manager");
    initQuotaManager();
    if (QuotaUtil.isQuotaEnabled(conf)) {
      // Create the quota snapshot notifier
      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
      spaceQuotaSnapshotNotifier.initialize(getConnection());
      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
      // Start the chore to read the region FS space reports and act on them
      getChoreService().scheduleChore(quotaObserverChore);

      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
      // Start the chore to read snapshots and add their usage to table/NS quotas
      getChoreService().scheduleChore(snapshotQuotaChore);
    }
    final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
    slowLogMasterService.init();

    WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
    // Create REPLICATION.SINK_TRACKER table if needed.
    ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);

    // clear the dead servers with same host name and port of online server because we are not
    // removing dead server with same hostname and port of rs which is trying to check in before
    // master initialization. See HBASE-5916.
    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();

    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
    status.setStatus("Checking ZNode ACLs");
    zooKeeper.checkAndSetZNodeAcls();

    status.setStatus("Initializing MOB Cleaner");
    initMobCleaner();

    // delete the stale data for replication sync up tool if necessary
    status.setStatus("Cleanup ReplicationSyncUp status if necessary");
    Path replicationSyncUpInfoFile =
      new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
    if (dataFs.exists(replicationSyncUpInfoFile)) {
      // info file is available, load the timestamp and use it to clean up stale data in replication
      // queue storage.
      byte[] data;
      try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
        data = ByteStreams.toByteArray(in);
      }
      ReplicationSyncUpToolInfo info = null;
      try {
        info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
      } catch (JsonParseException e) {
        // usually this should be a partial file, which means the ReplicationSyncUp tool did not
        // finish properly, so not a problem. Here we do not clean up the status as we do not know
        // the reason why the tool did not finish properly, so let users clean the status up
        // manually
        LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
      }
      if (info != null) {
        LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
          info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
            .format(Instant.ofEpochMilli(info.getStartTimeMs())));
        replicationPeerManager.getQueueStorage()
          .removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
        // delete the file after removing the stale data, so next time we do not need to do this
        // again.
        dataFs.delete(replicationSyncUpInfoFile, false);
      }
    }
    status.setStatus("Calling postStartMaster coprocessors");
    if (this.cpHost != null) {
      // don't let cp initialization errors kill the master
      try {
        this.cpHost.postStartMaster();
      } catch (IOException ioe) {
        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
      }
    }

    zombieDetector.interrupt();

    /*
     * After master has started up, lets do balancer post startup initialization. Since this runs in
     * activeMasterManager thread, it should be fine.
     */
    long start = EnvironmentEdgeManager.currentTime();
    this.balancer.postMasterStartupInitialize();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Balancer post startup initialization complete, took "
        + ((EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
    }

    this.rollingUpgradeChore = new RollingUpgradeChore(this);
    getChoreService().scheduleChore(rollingUpgradeChore);

    this.oldWALsDirSizeChore = new OldWALsDirSizeChore(this);
    getChoreService().scheduleChore(this.oldWALsDirSizeChore);

    status.markComplete("Progress after master initialized complete");
  }