in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java [644:848]
private void initializeSystemManagers(OzoneConfiguration conf,
SCMConfigurator configurator) throws IOException {
// Use SystemClock when data is persisted
// and used again after system restarts.
systemClock = Clock.system(ZoneOffset.UTC);
if (configurator.getNetworkTopology() != null) {
clusterMap = configurator.getNetworkTopology();
} else {
clusterMap = new NetworkTopologyImpl(conf);
}
// This needs to be done before initializing Ratis.
ratisReporterList = RatisDropwizardExports
.registerRatisMetricReporters(ratisMetricsMap, isStopped::get);
if (configurator.getSCMHAManager() != null) {
scmHAManager = configurator.getSCMHAManager();
} else {
scmHAManager = new SCMHAManagerImpl(conf, securityConfig, this);
}
if (configurator.getLeaseManager() != null) {
leaseManager = configurator.getLeaseManager();
} else {
long timeDuration = conf.getTimeDuration(
OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION,
OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION_DEFAULT
.getDuration(), TimeUnit.MILLISECONDS);
leaseManager = new LeaseManager<>(threadNamePrefix, timeDuration);
}
scmLayoutVersionManager = new HDDSLayoutVersionManager(
scmStorageConfig.getLayoutVersion());
UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
finalizationExecutor;
if (configurator.getUpgradeFinalizationExecutor() != null) {
finalizationExecutor = configurator.getUpgradeFinalizationExecutor();
} else {
finalizationExecutor = new DefaultUpgradeFinalizationExecutor<>();
}
finalizationManager = new FinalizationManagerImpl.Builder()
.setConfiguration(conf)
.setLayoutVersionManager(scmLayoutVersionManager)
.setStorage(scmStorageConfig)
.setHAManager(scmHAManager)
.setFinalizationStore(scmMetadataStore.getMetaTable())
.setFinalizationExecutor(finalizationExecutor)
.build();
// inline upgrade for SequenceIdGenerator
SequenceIdGenerator.upgradeToSequenceId(scmMetadataStore);
// Distributed sequence id generator
sequenceIdGen = new SequenceIdGenerator(
conf, scmHAManager, scmMetadataStore.getSequenceIdTable());
if (configurator.getScmContext() != null) {
scmContext = configurator.getScmContext();
} else {
// non-leader of term 0, in safe mode, preCheck not completed.
scmContext = new SCMContext.Builder()
.setLeader(false)
.setTerm(0)
.setIsInSafeMode(true)
.setIsPreCheckComplete(false)
.setSCM(this)
.setThreadNamePrefix(threadNamePrefix)
.setFinalizationCheckpoint(finalizationManager.getCheckpoint())
.build();
}
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(
ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
TableMapping.class, DNSToSwitchMapping.class);
DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
dnsToSwitchMappingClass, conf);
dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
if (configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(conf, scmStorageConfig, eventQueue,
clusterMap, scmContext, scmLayoutVersionManager,
this::resolveNodeLocation);
}
placementMetrics = SCMContainerPlacementMetrics.create();
containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
clusterMap, true, placementMetrics);
ecContainerPlacementPolicy = ContainerPlacementPolicyFactory.getECPolicy(
conf, scmNodeManager, clusterMap, true, placementMetrics);
placementPolicyValidateProxy = new PlacementPolicyValidateProxy(
containerPlacementPolicy, ecContainerPlacementPolicy);
if (configurator.getPipelineManager() != null) {
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
PipelineManagerImpl.newPipelineManager(
conf,
scmHAManager,
scmNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
scmContext,
serviceManager,
systemClock
);
}
finalizationManager.buildUpgradeContext(scmNodeManager, pipelineManager,
scmContext);
containerReplicaPendingOps =
new ContainerReplicaPendingOps(systemClock);
long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
ScmConfigKeys
.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
ScmConfigKeys
.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long backgroundServiceSafemodeWaitMs = conf.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
final String backgroundServiceName = "ExpiredContainerReplicaOpScrubber";
BackgroundSCMService expiredContainerReplicaOpScrubber =
new BackgroundSCMService.Builder().setClock(systemClock)
.setScmContext(scmContext)
.setServiceName(backgroundServiceName)
.setIntervalInMillis(containerReplicaOpScrubberIntervalMs)
.setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
.setPeriodicalTask(() -> containerReplicaPendingOps
.removeExpiredEntries()).build();
serviceManager.register(expiredContainerReplicaOpScrubber);
if (configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
containerManager = new ContainerManagerImpl(conf, scmHAManager,
sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(),
containerReplicaPendingOps);
}
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
pipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmNodeManager, scmConfig, false);
ecPipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmNodeManager, scmConfig, true);
if (configurator.getWritableContainerFactory() != null) {
writableContainerFactory = configurator.getWritableContainerFactory();
} else {
writableContainerFactory = new WritableContainerFactory(this);
}
if (configurator.getScmBlockManager() != null) {
scmBlockManager = configurator.getScmBlockManager();
} else {
scmBlockManager = new BlockManagerImpl(conf, scmConfig, this);
}
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
} else {
replicationManager = new ReplicationManager(
conf,
containerManager,
containerPlacementPolicy,
ecContainerPlacementPolicy,
eventQueue,
scmContext,
scmNodeManager,
systemClock,
containerReplicaPendingOps);
reconfigurationHandler.register(replicationManager.getConfig());
}
serviceManager.register(replicationManager);
// RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it
// so it can resend them.
containerReplicaPendingOps.registerSubscriber(replicationManager);
if (configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
} else {
scmSafeModeManager = new SCMSafeModeManager(conf,
containerManager, pipelineManager, scmNodeManager, eventQueue,
serviceManager, scmContext);
}
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, containerManager,
scmContext, eventQueue, replicationManager);
statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder()
.setStatefulServiceConfig(
scmMetadataStore.getStatefulServiceConfigTable())
.setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
.setRatisServer(scmHAManager.getRatisServer())
.build();
}