private void initializeSystemManagers()

in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java [644:848]


  private void initializeSystemManagers(OzoneConfiguration conf,
      SCMConfigurator configurator) throws IOException {
    // Use SystemClock when data is persisted
    // and used again after system restarts.
    systemClock = Clock.system(ZoneOffset.UTC);

    if (configurator.getNetworkTopology() != null) {
      clusterMap = configurator.getNetworkTopology();
    } else {
      clusterMap = new NetworkTopologyImpl(conf);
    }
    // This needs to be done before initializing Ratis.
    ratisReporterList = RatisDropwizardExports
        .registerRatisMetricReporters(ratisMetricsMap, isStopped::get);
    if (configurator.getSCMHAManager() != null) {
      scmHAManager = configurator.getSCMHAManager();
    } else {
      scmHAManager = new SCMHAManagerImpl(conf, securityConfig, this);
    }

    if (configurator.getLeaseManager() != null) {
      leaseManager = configurator.getLeaseManager();
    } else {
      long timeDuration = conf.getTimeDuration(
          OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION,
          OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION_DEFAULT
              .getDuration(), TimeUnit.MILLISECONDS);
      leaseManager = new LeaseManager<>(threadNamePrefix, timeDuration);
    }

    scmLayoutVersionManager = new HDDSLayoutVersionManager(
        scmStorageConfig.getLayoutVersion());

    UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
        finalizationExecutor;
    if (configurator.getUpgradeFinalizationExecutor() != null) {
      finalizationExecutor = configurator.getUpgradeFinalizationExecutor();
    } else {
      finalizationExecutor = new DefaultUpgradeFinalizationExecutor<>();
    }
    finalizationManager = new FinalizationManagerImpl.Builder()
        .setConfiguration(conf)
        .setLayoutVersionManager(scmLayoutVersionManager)
        .setStorage(scmStorageConfig)
        .setHAManager(scmHAManager)
        .setFinalizationStore(scmMetadataStore.getMetaTable())
        .setFinalizationExecutor(finalizationExecutor)
        .build();

    // inline upgrade for SequenceIdGenerator
    SequenceIdGenerator.upgradeToSequenceId(scmMetadataStore);
    // Distributed sequence id generator
    sequenceIdGen = new SequenceIdGenerator(
        conf, scmHAManager, scmMetadataStore.getSequenceIdTable());

    if (configurator.getScmContext() != null) {
      scmContext = configurator.getScmContext();
    } else {
      // non-leader of term 0, in safe mode, preCheck not completed.
      scmContext = new SCMContext.Builder()
          .setLeader(false)
          .setTerm(0)
          .setIsInSafeMode(true)
          .setIsPreCheckComplete(false)
          .setSCM(this)
          .setThreadNamePrefix(threadNamePrefix)
          .setFinalizationCheckpoint(finalizationManager.getCheckpoint())
          .build();
    }

    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
        conf.getClass(
            ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
            TableMapping.class, DNSToSwitchMapping.class);
    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
        dnsToSwitchMappingClass, conf);
    dnsToSwitchMapping =
        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
            : new CachedDNSToSwitchMapping(newInstance));

    if (configurator.getScmNodeManager() != null) {
      scmNodeManager = configurator.getScmNodeManager();
    } else {
      scmNodeManager = new SCMNodeManager(conf, scmStorageConfig, eventQueue,
          clusterMap, scmContext, scmLayoutVersionManager,
          this::resolveNodeLocation);
    }

    placementMetrics = SCMContainerPlacementMetrics.create();
    containerPlacementPolicy =
        ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
            clusterMap, true, placementMetrics);

    ecContainerPlacementPolicy = ContainerPlacementPolicyFactory.getECPolicy(
        conf, scmNodeManager, clusterMap, true, placementMetrics);

    placementPolicyValidateProxy = new PlacementPolicyValidateProxy(
        containerPlacementPolicy, ecContainerPlacementPolicy);

    if (configurator.getPipelineManager() != null) {
      pipelineManager = configurator.getPipelineManager();
    } else {
      pipelineManager =
          PipelineManagerImpl.newPipelineManager(
              conf,
              scmHAManager,
              scmNodeManager,
              scmMetadataStore.getPipelineTable(),
              eventQueue,
              scmContext,
              serviceManager,
              systemClock
              );
    }

    finalizationManager.buildUpgradeContext(scmNodeManager, pipelineManager,
        scmContext);

    containerReplicaPendingOps =
        new ContainerReplicaPendingOps(systemClock);

    long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
        ScmConfigKeys
            .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
        ScmConfigKeys
            .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT,
        TimeUnit.MILLISECONDS);

    long backgroundServiceSafemodeWaitMs = conf.getTimeDuration(
        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
        TimeUnit.MILLISECONDS);

    final String backgroundServiceName = "ExpiredContainerReplicaOpScrubber";
    BackgroundSCMService expiredContainerReplicaOpScrubber =
        new BackgroundSCMService.Builder().setClock(systemClock)
            .setScmContext(scmContext)
            .setServiceName(backgroundServiceName)
            .setIntervalInMillis(containerReplicaOpScrubberIntervalMs)
            .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
            .setPeriodicalTask(() -> containerReplicaPendingOps
                .removeExpiredEntries()).build();

    serviceManager.register(expiredContainerReplicaOpScrubber);

    if (configurator.getContainerManager() != null) {
      containerManager = configurator.getContainerManager();
    } else {
      containerManager = new ContainerManagerImpl(conf, scmHAManager,
          sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(),
          containerReplicaPendingOps);
    }

    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
    pipelineChoosePolicy = PipelineChoosePolicyFactory
        .getPolicy(scmNodeManager, scmConfig, false);
    ecPipelineChoosePolicy = PipelineChoosePolicyFactory
        .getPolicy(scmNodeManager, scmConfig, true);
    if (configurator.getWritableContainerFactory() != null) {
      writableContainerFactory = configurator.getWritableContainerFactory();
    } else {
      writableContainerFactory = new WritableContainerFactory(this);
    }
    if (configurator.getScmBlockManager() != null) {
      scmBlockManager = configurator.getScmBlockManager();
    } else {
      scmBlockManager = new BlockManagerImpl(conf, scmConfig, this);
    }
    if (configurator.getReplicationManager() != null) {
      replicationManager = configurator.getReplicationManager();
    }  else {
      replicationManager = new ReplicationManager(
          conf,
          containerManager,
          containerPlacementPolicy,
          ecContainerPlacementPolicy,
          eventQueue,
          scmContext,
          scmNodeManager,
          systemClock,
          containerReplicaPendingOps);
      reconfigurationHandler.register(replicationManager.getConfig());
    }
    serviceManager.register(replicationManager);
    // RM gets notified of expired pending delete from containerReplicaPendingOps by subscribing to it
    // so it can resend them.
    containerReplicaPendingOps.registerSubscriber(replicationManager);
    if (configurator.getScmSafeModeManager() != null) {
      scmSafeModeManager = configurator.getScmSafeModeManager();
    } else {
      scmSafeModeManager = new SCMSafeModeManager(conf,
          containerManager, pipelineManager, scmNodeManager, eventQueue,
          serviceManager, scmContext);
    }

    scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, containerManager,
        scmContext, eventQueue, replicationManager);

    statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder()
        .setStatefulServiceConfig(
            scmMetadataStore.getStatefulServiceConfigTable())
        .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
        .setRatisServer(scmHAManager.getRatisServer())
        .build();
  }