in hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java [179:357]
public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
StorageContainerServiceProvider scmServiceProvider,
ContainerCountBySizeDao containerCountBySizeDao,
UtilizationSchemaDefinition utilizationSchemaDefinition,
ContainerHealthSchemaManager containerHealthSchemaManager,
ReconContainerMetadataManager reconContainerMetadataManager,
ReconUtils reconUtils,
ReconSafeModeManager safeModeManager,
ReconContext reconContext,
DataSource dataSource,
ReconTaskStatusUpdaterManager taskStatusUpdaterManager)
throws IOException {
reconNodeDetails = reconUtils.getReconNodeDetails(conf);
this.threadNamePrefix = reconNodeDetails.threadNamePrefix();
this.eventQueue = new EventQueue(threadNamePrefix);
eventQueue.setSilent(true);
this.reconContext = reconContext;
this.scmContext = new SCMContext.Builder()
.setIsPreCheckComplete(true)
.setSCM(this)
.build();
this.ozoneConfiguration = getReconScmConfiguration(conf);
long scmClientRPCTimeOut = conf.getTimeDuration(
OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY,
OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT,
TimeUnit.MILLISECONDS);
long scmClientMaxRetryTimeOut = conf.getTimeDuration(
OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY,
OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
int scmClientFailOverMaxRetryCount = conf.getInt(
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY,
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT);
conf.setLong(HDDS_SCM_CLIENT_RPC_TIME_OUT, scmClientRPCTimeOut);
conf.setLong(HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT, scmClientMaxRetryTimeOut);
conf.setLong(HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY,
scmClientFailOverMaxRetryCount);
this.scmStorageConfig = new ReconStorageConfig(conf, reconUtils);
this.clusterMap = new NetworkTopologyImpl(conf);
this.dbStore = DBStoreBuilder.createDBStore(ozoneConfiguration, ReconSCMDBDefinition.get());
this.scmLayoutVersionManager =
new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion());
this.scmhaManager = SCMHAManagerStub.getInstance(
true, new SCMDBTransactionBufferImpl());
this.sequenceIdGen = new SequenceIdGenerator(
conf, scmhaManager, ReconSCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
reconContext.setClusterId(scmStorageConfig.getClusterID());
this.nodeManager =
new ReconNodeManager(conf, scmStorageConfig, eventQueue, clusterMap,
ReconSCMDBDefinition.NODES.getTable(dbStore),
this.scmLayoutVersionManager, reconContext);
placementMetrics = SCMContainerPlacementMetrics.create();
this.containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, nodeManager,
clusterMap, true, placementMetrics);
this.datanodeProtocolServer = new ReconDatanodeProtocolServer(
conf, this, eventQueue);
this.pipelineManager = ReconPipelineManager.newReconPipelineManager(
conf, nodeManager,
ReconSCMDBDefinition.PIPELINES.getTable(dbStore),
eventQueue,
scmhaManager,
scmContext);
ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
Clock.system(ZoneId.systemDefault()));
this.containerManager = new ReconContainerManager(conf,
dbStore,
ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
pipelineManager, scmServiceProvider,
containerHealthSchemaManager, reconContainerMetadataManager,
scmhaManager, sequenceIdGen, pendingOps);
this.scmServiceProvider = scmServiceProvider;
this.isSyncDataFromSCMRunning = new AtomicBoolean();
this.containerCountBySizeDao = containerCountBySizeDao;
NodeReportHandler nodeReportHandler =
new NodeReportHandler(nodeManager);
this.safeModeManager = safeModeManager;
ReconPipelineReportHandler pipelineReportHandler =
new ReconPipelineReportHandler(safeModeManager,
pipelineManager, scmContext, conf, scmServiceProvider);
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, scmContext);
ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
PipelineSyncTask pipelineSyncTask = new PipelineSyncTask(pipelineManager, nodeManager,
scmServiceProvider, reconTaskConfig, taskStatusUpdaterManager);
containerHealthTask = new ContainerHealthTask(containerManager, scmServiceProvider,
containerHealthSchemaManager, containerPlacementPolicy,
reconTaskConfig, reconContainerMetadataManager, conf, taskStatusUpdaterManager);
this.containerSizeCountTask = new ContainerSizeCountTask(containerManager,
reconTaskConfig, containerCountBySizeDao, utilizationSchemaDefinition, taskStatusUpdaterManager);
this.dataSource = dataSource;
StaleNodeHandler staleNodeHandler =
new ReconStaleNodeHandler(nodeManager, pipelineManager, pipelineSyncTask);
DeadNodeHandler deadNodeHandler = new ReconDeadNodeHandler(nodeManager,
pipelineManager, containerManager, scmServiceProvider,
containerHealthTask, pipelineSyncTask);
ContainerReportHandler containerReportHandler =
new ReconContainerReportHandler(nodeManager, containerManager);
IncrementalContainerReportHandler icrHandler =
new ReconIncrementalContainerReportHandler(nodeManager,
containerManager, scmContext);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(
pipelineManager, containerManager, scmContext,
null, 0);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
ReconNewNodeHandler newNodeHandler = new ReconNewNodeHandler(nodeManager);
// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
// Dispatcher should always dispatch FCR first followed by ICR
// conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
// .queue.wait.threshold
long waitQueueThreshold = ozoneConfiguration.getInt(
ScmUtils.getContainerReportConfPrefix() + ".queue.wait.threshold",
OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT);
// conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
// .execute.wait.threshold
long execWaitThreshold = ozoneConfiguration.getInt(
ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
List<BlockingQueue<ContainerReport>> queues
= ReconUtils.initContainerReportQueue(ozoneConfiguration);
List<ThreadPoolExecutor> executors
= FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(
threadNamePrefix, queues);
Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
= new ConcurrentHashMap<>();
FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
ContainerReport> containerReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
containerReportHandler),
containerReportHandler, queues, eventQueue,
ContainerReportFromDatanode.class, executors,
reportExecutorMap);
containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
containerReportExecutors.setExecWaitThreshold(execWaitThreshold);
FixedThreadPoolWithAffinityExecutor<IncrementalContainerReportFromDatanode,
ContainerReport> incrementalReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(
SCMEvents.INCREMENTAL_CONTAINER_REPORT,
icrHandler),
icrHandler, queues, eventQueue,
IncrementalContainerReportFromDatanode.class, executors,
reportExecutorMap);
incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
containerReportHandler);
eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,
incrementalReportExecutors, icrHandler);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, nodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
reconScmTasks.add(pipelineSyncTask);
reconScmTasks.add(containerHealthTask);
reconScmTasks.add(containerSizeCountTask);
reconSafeModeMgrTask = new ReconSafeModeMgrTask(
containerManager, nodeManager, safeModeManager,
reconTaskConfig, ozoneConfiguration);
}