in hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java [965:1396]
private void finishActiveMasterInitialization() throws IOException, InterruptedException,
KeeperException, ReplicationException, DeserializationException {
/*
* We are active master now... go initialize components we need to run.
*/
startupTaskGroup.addTask("Initializing Master file system");
this.masterActiveTime = EnvironmentEdgeManager.currentTime();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// always initialize the MemStoreLAB as we use a region to store data in master now, see
// localStore.
initializeMemStoreChunkCreator(null);
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
// warm-up HTDs cache on master initialization
if (preLoadTableDescriptors) {
startupTaskGroup.addTask("Pre-loading table descriptors");
this.tableDescriptors.getAll();
}
// Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
// only after it has checked in with the Master. At least a few tests ask Master for clusterId
// before it has called its run method and before RegionServer has done the reportForDuty.
ClusterId clusterId = fileSystemManager.getClusterId();
startupTaskGroup.addTask("Publishing Cluster ID " + clusterId + " in ZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.clusterId = clusterId.toString();
// Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
// hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
// hbase.write.hbck1.lock.file to false.
if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
Pair<Path, FSDataOutputStream> result = null;
try {
result = HBaseFsck.checkAndMarkRunningHbck(this.conf,
HBaseFsck.createLockRetryCounterFactory(this.conf).create());
} finally {
if (result != null) {
Closeables.close(result.getSecond(), true);
}
}
}
startupTaskGroup.addTask("Initialize ServerManager and schedule SCP for crash servers");
// The below two managers must be created before loading procedures, as they will be used during
// loading.
// initialize master local region
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);
// Initialize the ServerManager and register it as a configuration observer
this.serverManager = createServerManager(this, rsListStorage);
this.configurationManager.registerObserver(this.serverManager);
this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
if (
!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
this.splitWALManager = new SplitWALManager(this);
}
tryMigrateMetaLocationsFromZooKeeper();
createProcedureExecutor();
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
.getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));
// Create Assignment Manager
this.assignmentManager = createAssignmentManager(this, masterRegion);
this.assignmentManager.start();
// TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
// completed, it could still be in the procedure list. This is a bit strange but is another
// story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
List<TransitRegionStateProcedure> ritList =
procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
.filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
.collect(Collectors.toList());
this.assignmentManager.setupRIT(ritList);
// Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
// be registered in the deadServers set -- and the servernames loaded from the WAL directory
// and master local region that COULD BE 'alive'(we'll schedule SCPs for each and let SCP figure
// it out).
// We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
// TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
walManager.getSplittingServersFromWALDir());
// This manager must be accessed AFTER hbase:meta is confirmed on line..
this.tableStateManager = new TableStateManager(this);
startupTaskGroup.addTask("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
startupTaskGroup.addTask("Loading last flushed sequence id of regions");
try {
this.serverManager.loadLastFlushedSequenceIds();
} catch (IOException e) {
LOG.info("Failed to load last flushed sequence id of regions" + " from file system", e);
}
// Set ourselves as active Master now our claim has succeeded up in zk.
this.activeMaster = true;
// Start the Zombie master detector after setting master as active, see HBASE-21535
Thread zombieDetector = new Thread(new MasterInitializationMonitor(this),
"ActiveMasterInitializationMonitor-" + EnvironmentEdgeManager.currentTime());
zombieDetector.setDaemon(true);
zombieDetector.start();
if (!maintenanceMode) {
startupTaskGroup.addTask("Initializing master coprocessors");
setQuotasObserver(conf);
initializeCoprocessorHost(conf);
} else {
// start an in process region server for carrying system regions
maintenanceRegionServer =
JVMClusterUtil.createRegionServerThread(getConfiguration(), HRegionServer.class, 0);
maintenanceRegionServer.start();
}
// Checking if meta needs initializing.
startupTaskGroup.addTask("Initializing meta table if this is a new deploy");
InitMetaProcedure initMetaProc = null;
// Print out state of hbase:meta on startup; helps debugging.
if (!this.assignmentManager.getRegionStates().hasTableRegionStates(TableName.META_TABLE_NAME)) {
Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
.filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
initMetaProc = optProc.orElseGet(() -> {
// schedule an init meta procedure if meta has not been deployed yet
InitMetaProcedure temp = new InitMetaProcedure();
procedureExecutor.submitProcedure(temp);
return temp;
});
}
// initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.initialize();
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
// try migrate replication data
ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
// check whether there are something to migrate and we haven't scheduled a migration procedure
// yet
if (
oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
.allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
) {
procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
}
// start up all service threads.
startupTaskGroup.addTask("Initializing master service threads");
startServiceThreads();
// wait meta to be initialized after we start procedure executor
if (initMetaProc != null) {
initMetaProc.await();
}
// Wake up this server to check in
sleeper.skipSleepCycle();
// Wait for region servers to report in.
// With this as part of master initialization, it precludes our being able to start a single
// server that is both Master and RegionServer. Needs more thought. TODO.
String statusStr = "Wait for region servers to report in";
MonitoredTask waitRegionServer = startupTaskGroup.addTask(statusStr);
LOG.info(Objects.toString(waitRegionServer));
waitForRegionServers(waitRegionServer);
// Check if master is shutting down because issue initializing regionservers or balancer.
if (isStopped()) {
return;
}
startupTaskGroup.addTask("Starting assignment manager");
// FIRST HBASE:META READ!!!!
// The below cannot make progress w/o hbase:meta being online.
// This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
// as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
// if it is down. It may take a while to come online. So, wait here until meta if for sure
// available. That's what waitForMetaOnline does.
if (!waitForMetaOnline()) {
return;
}
TableDescriptor metaDescriptor = tableDescriptors.get(TableName.META_TABLE_NAME);
final ColumnFamilyDescriptor tableFamilyDesc =
metaDescriptor.getColumnFamily(HConstants.TABLE_FAMILY);
final ColumnFamilyDescriptor replBarrierFamilyDesc =
metaDescriptor.getColumnFamily(HConstants.REPLICATION_BARRIER_FAMILY);
this.assignmentManager.joinCluster();
// The below depends on hbase:meta being online.
this.assignmentManager.processOfflineRegions();
// this must be called after the above processOfflineRegions to prevent race
this.assignmentManager.wakeMetaLoadedEvent();
// for migrating from a version without HBASE-25099, and also for honoring the configuration
// first.
if (conf.get(HConstants.META_REPLICAS_NUM) != null) {
int replicasNumInConf =
conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM);
TableDescriptor metaDesc = tableDescriptors.get(TableName.META_TABLE_NAME);
if (metaDesc.getRegionReplication() != replicasNumInConf) {
// it is possible that we already have some replicas before upgrading, so we must set the
// region replication number in meta TableDescriptor directly first, without creating a
// ModifyTableProcedure, otherwise it may cause a double assign for the meta replicas.
int existingReplicasCount =
assignmentManager.getRegionStates().getRegionsOfTable(TableName.META_TABLE_NAME).size();
if (existingReplicasCount > metaDesc.getRegionReplication()) {
LOG.info("Update replica count of hbase:meta from {}(in TableDescriptor)"
+ " to {}(existing ZNodes)", metaDesc.getRegionReplication(), existingReplicasCount);
metaDesc = TableDescriptorBuilder.newBuilder(metaDesc)
.setRegionReplication(existingReplicasCount).build();
tableDescriptors.update(metaDesc);
}
// check again, and issue a ModifyTableProcedure if needed
if (metaDesc.getRegionReplication() != replicasNumInConf) {
LOG.info(
"The {} config is {} while the replica count in TableDescriptor is {}"
+ " for hbase:meta, altering...",
HConstants.META_REPLICAS_NUM, replicasNumInConf, metaDesc.getRegionReplication());
procedureExecutor.submitProcedure(new ModifyTableProcedure(
procedureExecutor.getEnvironment(), TableDescriptorBuilder.newBuilder(metaDesc)
.setRegionReplication(replicasNumInConf).build(),
null, metaDesc, false, true));
}
}
}
// Initialize after meta is up as below scans meta
FavoredNodesManager fnm = getFavoredNodesManager();
if (fnm != null) {
fnm.initializeFromMeta();
}
// set cluster status again after user regions are assigned
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
// Start balancer and meta catalog janitor after meta and regions have been assigned.
startupTaskGroup.addTask("Starting balancer and catalog janitor");
this.clusterStatusChore = new ClusterStatusChore(this, balancer);
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this);
if (!disableBalancerChoreForTest) {
getChoreService().scheduleChore(balancerChore);
}
if (regionNormalizerManager != null) {
getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
}
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
this.hbckChore = new HbckChore(this);
getChoreService().scheduleChore(hbckChore);
this.serverManager.startChore();
// Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
if (!waitForNamespaceOnline()) {
return;
}
startupTaskGroup.addTask("Starting cluster schema service");
try {
initClusterSchemaService();
} catch (IllegalStateException e) {
if (
e.getCause() != null && e.getCause() instanceof NoSuchColumnFamilyException
&& tableFamilyDesc == null && replBarrierFamilyDesc == null
) {
LOG.info("ClusterSchema service could not be initialized. This is "
+ "expected during HBase 1 to 2 upgrade", e);
} else {
throw e;
}
}
if (this.cpHost != null) {
try {
this.cpHost.preMasterInitialization();
} catch (IOException e) {
LOG.error("Coprocessor preMasterInitialization() hook failed", e);
}
}
LOG.info(String.format("Master has completed initialization %.3fsec",
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
if (this.sharedHFileCleanerPool != null) {
configurationManager.registerObserver(this.sharedHFileCleanerPool);
}
if (this.hfileCleaners != null) {
for (HFileCleaner cleaner : hfileCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'.
setInitialized(true);
startupTaskGroup.markComplete("Initialization successful");
MonitoredTask status =
TaskMonitor.get().createStatus("Progress after master initialized", false, true);
if (tableFamilyDesc == null && replBarrierFamilyDesc == null) {
// create missing CFs in meta table after master is set to 'initialized'.
createMissingCFsInMetaDuringUpgrade(metaDescriptor);
// Throwing this Exception to abort active master is painful but this
// seems the only way to add missing CFs in meta while upgrading from
// HBase 1 to 2 (where HBase 2 has HBASE-23055 & HBASE-23782 checked-in).
// So, why do we abort active master after adding missing CFs in meta?
// When we reach here, we would have already bypassed NoSuchColumnFamilyException
// in initClusterSchemaService(), meaning ClusterSchemaService is not
// correctly initialized but we bypassed it. Similarly, we bypassed
// tableStateManager.start() as well. Hence, we should better abort
// current active master because our main task - adding missing CFs
// in meta table is done (possible only after master state is set as
// initialized) at the expense of bypassing few important tasks as part
// of active master init routine. So now we abort active master so that
// next active master init will not face any issues and all mandatory
// services will be started during master init phase.
throw new PleaseRestartMasterException("Aborting active master after missing"
+ " CFs are successfully added in meta. Subsequent active master "
+ "initialization should be uninterrupted");
}
if (maintenanceMode) {
LOG.info("Detected repair mode, skipping final initialization steps.");
return;
}
assignmentManager.checkIfShouldMoveSystemRegionAsync();
status.setStatus("Starting quota manager");
initQuotaManager();
if (QuotaUtil.isQuotaEnabled(conf)) {
// Create the quota snapshot notifier
spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
spaceQuotaSnapshotNotifier.initialize(getConnection());
this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
// Start the chore to read the region FS space reports and act on them
getChoreService().scheduleChore(quotaObserverChore);
this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
// Start the chore to read snapshots and add their usage to table/NS quotas
getChoreService().scheduleChore(snapshotQuotaChore);
}
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
slowLogMasterService.init();
WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// Create REPLICATION.SINK_TRACKER table if needed.
ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before
// master initialization. See HBASE-5916.
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
// Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
status.setStatus("Checking ZNode ACLs");
zooKeeper.checkAndSetZNodeAcls();
status.setStatus("Initializing MOB Cleaner");
initMobCleaner();
// delete the stale data for replication sync up tool if necessary
status.setStatus("Cleanup ReplicationSyncUp status if necessary");
Path replicationSyncUpInfoFile =
new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
if (dataFs.exists(replicationSyncUpInfoFile)) {
// info file is available, load the timestamp and use it to clean up stale data in replication
// queue storage.
byte[] data;
try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
data = ByteStreams.toByteArray(in);
}
ReplicationSyncUpToolInfo info = null;
try {
info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
} catch (JsonParseException e) {
// usually this should be a partial file, which means the ReplicationSyncUp tool did not
// finish properly, so not a problem. Here we do not clean up the status as we do not know
// the reason why the tool did not finish properly, so let users clean the status up
// manually
LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
}
if (info != null) {
LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
.format(Instant.ofEpochMilli(info.getStartTimeMs())));
replicationPeerManager.getQueueStorage()
.removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
// delete the file after removing the stale data, so next time we do not need to do this
// again.
dataFs.delete(replicationSyncUpInfoFile, false);
}
}
status.setStatus("Calling postStartMaster coprocessors");
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
try {
this.cpHost.postStartMaster();
} catch (IOException ioe) {
LOG.error("Coprocessor postStartMaster() hook failed", ioe);
}
}
zombieDetector.interrupt();
/*
* After master has started up, lets do balancer post startup initialization. Since this runs in
* activeMasterManager thread, it should be fine.
*/
long start = EnvironmentEdgeManager.currentTime();
this.balancer.postMasterStartupInitialize();
if (LOG.isDebugEnabled()) {
LOG.debug("Balancer post startup initialization complete, took "
+ ((EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
}
this.rollingUpgradeChore = new RollingUpgradeChore(this);
getChoreService().scheduleChore(rollingUpgradeChore);
this.oldWALsDirSizeChore = new OldWALsDirSizeChore(this);
getChoreService().scheduleChore(this.oldWALsDirSizeChore);
status.markComplete("Progress after master initialized complete");
}