in core/src/main/scala/kafka/controller/KafkaController.scala [1786:1852]
override def process(event: ControllerEvent): Unit = {
try {
event match {
case event: MockEvent =>
// Used only in test cases
event.process()
case ShutdownEventThread =>
error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
case AutoPreferredReplicaLeaderElection =>
processAutoPreferredReplicaLeaderElection()
case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>
processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)
case UncleanLeaderElectionEnable =>
processUncleanLeaderElectionEnable()
case TopicUncleanLeaderElectionEnable(topic) =>
processTopicUncleanLeaderElectionEnable(topic)
case ControlledShutdown(id, brokerEpoch, callback) =>
processControlledShutdown(id, brokerEpoch, callback)
case LeaderAndIsrResponseReceived(response, brokerId) =>
processLeaderAndIsrResponseReceived(response, brokerId)
case UpdateMetadataResponseReceived(response, brokerId) =>
processUpdateMetadataResponseReceived(response, brokerId)
case TopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors) =>
processTopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors)
case BrokerChange =>
processBrokerChange()
case BrokerModifications(brokerId) =>
processBrokerModification(brokerId)
case ControllerChange =>
processControllerChange()
case Reelect =>
processReelect()
case RegisterBrokerAndReelect =>
processRegisterBrokerAndReelect()
case Expire =>
processExpire()
case TopicChange =>
processTopicChange()
case LogDirEventNotification =>
processLogDirEventNotification()
case PartitionModifications(topic) =>
processPartitionModifications(topic)
case TopicDeletion =>
processTopicDeletion()
case ApiPartitionReassignment(reassignments, callback) =>
processApiPartitionReassignment(reassignments, callback)
case ZkPartitionReassignment =>
processZkPartitionReassignment()
case ListPartitionReassignments(partitions, callback) =>
processListPartitionReassignments(partitions, callback)
case PartitionReassignmentIsrChange(partition) =>
processPartitionReassignmentIsrChange(partition)
case IsrChangeNotification =>
processIsrChangeNotification()
case Startup =>
processStartup()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $event.", e)
maybeResign()
case e: Throwable =>
error(s"Error processing event $event", e)
} finally {
updateMetrics()
}
}