in hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java [155:298]
public OzoneContainer(HddsDatanodeService hddsDatanodeService,
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context, CertificateClient certClient,
SecretKeyVerifierClient secretKeyClient,
VolumeChoosingPolicy volumeChoosingPolicy) throws IOException {
config = conf;
this.datanodeDetails = datanodeDetails;
this.context = context;
this.volumeChecker = new StorageVolumeChecker(conf, new Timer(),
datanodeDetails.threadNamePrefix());
volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context, VolumeType.DATA_VOLUME, volumeChecker);
volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
metaVolumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context, VolumeType.META_VOLUME, volumeChecker);
dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context, VolumeType.DB_VOLUME, volumeChecker);
final DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
if (SchemaV3.isFinalizedAndEnabled(config)) {
HddsVolumeUtil.loadAllHddsVolumeDbStore(
volumeSet, dbVolumeSet, false, LOG);
if (dnConf.autoCompactionSmallSstFile()) {
this.dbCompactionExecutorService = Executors.newScheduledThreadPool(
dnConf.getAutoCompactionSmallSstFileThreads(),
new ThreadFactoryBuilder().setNameFormat(
datanodeDetails.threadNamePrefix() +
"RocksDBCompactionThread-%d").build());
this.dbCompactionExecutorService.scheduleWithFixedDelay(this::compactDb,
dnConf.getAutoCompactionSmallSstFileIntervalMinutes(),
dnConf.getAutoCompactionSmallSstFileIntervalMinutes(),
TimeUnit.MINUTES);
}
}
long recoveringContainerTimeout = config.getTimeDuration(
OZONE_RECOVERING_CONTAINER_TIMEOUT,
OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
this.witnessedContainerMetadataStore = WitnessedContainerMetadataStoreImpl.get(conf);
containerSet = ContainerSet.newRwContainerSet(witnessedContainerMetadataStore.getContainerIdsTable(),
recoveringContainerTimeout);
metadataScanner = null;
metrics = ContainerMetrics.create(conf);
handlers = Maps.newHashMap();
IncrementalReportSender<Container> icrSender = container -> {
synchronized (containerSet) {
ContainerReplicaProto containerReport = container.getContainerReport();
IncrementalContainerReportProto icr = IncrementalContainerReportProto
.newBuilder()
.addReport(containerReport)
.build();
context.addIncrementalReport(icr);
context.getParent().triggerHeartbeat();
}
};
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, volumeChoosingPolicy, metrics, icrSender));
}
SecurityConfig secConf = new SecurityConfig(conf);
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
handlers, context, metrics,
TokenVerifier.create(secConf, secretKeyClient));
/*
* ContainerController is the control plane
* XceiverServerRatis is the write channel
* XceiverServerGrpc is the read channel
*/
controller = new ContainerController(containerSet, handlers);
writeChannel = XceiverServerRatis.newXceiverServerRatis(hddsDatanodeService,
datanodeDetails, config, hddsDispatcher, controller, certClient,
context);
replicationServer = new ReplicationServer(
controller,
conf.getObject(ReplicationConfig.class),
secConf,
certClient,
new ContainerImporter(conf, containerSet, controller,
volumeSet, volumeChoosingPolicy),
datanodeDetails.threadNamePrefix());
readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
Duration blockDeletingSvcInterval = dnConf.getBlockDeletionInterval();
long blockDeletingServiceTimeout = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
int blockDeletingServiceWorkerSize = config
.getInt(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
OZONE_BLOCK_DELETING_SERVICE_WORKERS_DEFAULT);
blockDeletingService =
new BlockDeletingService(this, blockDeletingSvcInterval.toMillis(),
blockDeletingServiceTimeout, TimeUnit.MILLISECONDS,
blockDeletingServiceWorkerSize, config,
datanodeDetails.threadNamePrefix(),
context.getParent().getReconfigurationHandler());
Duration recoveringContainerScrubbingSvcInterval =
dnConf.getRecoveringContainerScrubInterval();
long recoveringContainerScrubbingServiceTimeout = config
.getTimeDuration(OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_TIMEOUT,
OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
int recoveringContainerScrubbingServiceWorkerSize = config
.getInt(OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_WORKERS,
OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_WORKERS_DEFAULT);
recoveringContainerScrubbingService =
new StaleRecoveringContainerScrubbingService(
recoveringContainerScrubbingSvcInterval.toMillis(),
TimeUnit.MILLISECONDS,
recoveringContainerScrubbingServiceWorkerSize,
recoveringContainerScrubbingServiceTimeout,
containerSet);
if (certClient != null && secConf.isGrpcTlsEnabled()) {
tlsClientConfig = new GrpcTlsConfig(
certClient.getKeyManager(),
certClient.getTrustManager(), true);
} else {
tlsClientConfig = null;
}
initializingStatus =
new AtomicReference<>(InitializingStatus.UNINITIALIZED);
}