in metadata/src/main/java/org/apache/kafka/controller/QuorumController.java [1460:1612]
private QuorumController(
FaultHandler nonFatalFaultHandler,
FaultHandler fatalFaultHandler,
LogContext logContext,
int nodeId,
String clusterId,
KafkaEventQueue queue,
Time time,
KafkaConfigSchema configSchema,
RaftClient<ApiMessageAndVersion> raftClient,
QuorumFeatures quorumFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
ReplicaPlacer replicaPlacer,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
long sessionTimeoutNs,
OptionalLong fenceStaleBrokerIntervalNs,
QuorumControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata,
int maxRecordsPerBatch,
DelegationTokenCache tokenCache,
String tokenSecretKeyString,
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName,
long controllerPerformanceSamplePeriodMs,
long controllerPerformanceAlwaysLogThresholdMs
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
this.clusterId = clusterId;
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.resourceExists = new ConfigResourceExistenceChecker();
this.clientQuotaControlManager = new ClientQuotaControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
this.clusterSupportDescriber = new QuorumClusterFeatureSupportDescriber();
this.queueAccessor = new PeriodicTaskControlManagerQueueAccessor();
this.periodicControl = new PeriodicTaskControlManager.Builder().
setLogContext(logContext).
setTime(time).
setQueueAccessor(queueAccessor).
build();
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
setSnapshotRegistry(snapshotRegistry).
setClusterFeatureSupportDescriber(clusterSupportDescriber).
setKRaftVersionAccessor(new RaftClientKRaftVersionAccessor(raftClient)).
build();
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setClusterId(clusterId).
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler(this::handleBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setKafkaConfigSchema(configSchema).
setExistenceChecker(resourceExists).
setAlterConfigPolicy(alterConfigPolicy).
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
setFeatureControl(featureControl).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setClusterControlManager(clusterControl).
build();
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
this.scramControlManager = new ScramControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
this.delegationTokenControlManager = new DelegationTokenControlManager.Builder().
setLogContext(logContext).
setTokenCache(tokenCache).
setDelegationTokenSecretKey(tokenSecretKeyString).
setDelegationTokenMaxLifeMs(delegationTokenMaxLifeMs).
setDelegationTokenExpiryTimeMs(delegationTokenExpiryTimeMs).
build();
this.aclControlManager = new AclControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
this.raftClient = raftClient;
this.bootstrapMetadata = bootstrapMetadata;
this.maxRecordsPerBatch = maxRecordsPerBatch;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.performanceMonitor = new EventPerformanceMonitor.Builder().
setLogContext(logContext).
setPeriodNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceSamplePeriodMs)).
setAlwaysLogThresholdNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceAlwaysLogThresholdMs)).
build();
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
if (fenceStaleBrokerIntervalNs.isPresent()) {
registerMaybeFenceStaleBroker(fenceStaleBrokerIntervalNs.getAsLong());
} else {
registerMaybeFenceStaleBroker(maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs));
}
if (leaderImbalanceCheckIntervalNs.isPresent()) {
registerElectPreferred(leaderImbalanceCheckIntervalNs.getAsLong());
}
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
registerGeneratePeriodicPerformanceMessage();
// OffsetControlManager must be initialized last, because its constructor will take the
// initial in-memory snapshot of all extant timeline data structures.
this.offsetControl = new OffsetControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setMetrics(controllerMetrics).
setTime(time).
build();
log.info("Creating new QuorumController with clusterId {}", clusterId);
this.raftClient.register(metaLogListener);
}