in metadata/src/main/java/org/apache/kafka/controller/QuorumController.java [1193:1289]
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
if (log.isTraceEnabled()) {
if (snapshotId.isPresent()) {
log.trace("Replaying snapshot {} record {}",
Snapshots.filenameFromSnapshotId(snapshotId.get()),
recordRedactor.toLoggableString(message));
} else {
log.trace("Replaying log record {} with offset {}",
recordRedactor.toLoggableString(message), offset);
}
}
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
clusterControl.replay((RegisterBrokerRecord) message, offset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
break;
case TOPIC_RECORD:
replicationControl.replay((TopicRecord) message);
break;
case PARTITION_RECORD:
replicationControl.replay((PartitionRecord) message);
break;
case CONFIG_RECORD:
configurationControl.replay((ConfigRecord) message);
break;
case PARTITION_CHANGE_RECORD:
replicationControl.replay((PartitionChangeRecord) message);
break;
case FENCE_BROKER_RECORD:
clusterControl.replay((FenceBrokerRecord) message);
break;
case UNFENCE_BROKER_RECORD:
clusterControl.replay((UnfenceBrokerRecord) message);
break;
case REMOVE_TOPIC_RECORD:
replicationControl.replay((RemoveTopicRecord) message);
break;
case FEATURE_LEVEL_RECORD:
featureControl.replay((FeatureLevelRecord) message);
break;
case CLIENT_QUOTA_RECORD:
clientQuotaControlManager.replay((ClientQuotaRecord) message);
break;
case PRODUCER_IDS_RECORD:
producerIdControlManager.replay((ProducerIdsRecord) message);
break;
case BROKER_REGISTRATION_CHANGE_RECORD:
clusterControl.replay((BrokerRegistrationChangeRecord) message);
break;
case ACCESS_CONTROL_ENTRY_RECORD:
aclControlManager.replay((AccessControlEntryRecord) message);
break;
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
aclControlManager.replay((RemoveAccessControlEntryRecord) message);
break;
case USER_SCRAM_CREDENTIAL_RECORD:
scramControlManager.replay((UserScramCredentialRecord) message);
break;
case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
scramControlManager.replay((RemoveUserScramCredentialRecord) message);
break;
case DELEGATION_TOKEN_RECORD:
delegationTokenControlManager.replay((DelegationTokenRecord) message);
break;
case REMOVE_DELEGATION_TOKEN_RECORD:
delegationTokenControlManager.replay((RemoveDelegationTokenRecord) message);
break;
case NO_OP_RECORD:
// NoOpRecord is an empty record and doesn't need to be replayed
break;
case ZK_MIGRATION_STATE_RECORD:
// In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
// users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
// Therefore, this case needs to be retained but will be a no-op.
break;
case BEGIN_TRANSACTION_RECORD:
offsetControl.replay((BeginTransactionRecord) message, offset);
break;
case END_TRANSACTION_RECORD:
offsetControl.replay((EndTransactionRecord) message, offset);
break;
case ABORT_TRANSACTION_RECORD:
offsetControl.replay((AbortTransactionRecord) message, offset);
break;
case REGISTER_CONTROLLER_RECORD:
clusterControl.replay((RegisterControllerRecord) message);
break;
case CLEAR_ELR_RECORD:
replicationControl.replay((ClearElrRecord) message);
break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
}