private void replay()

in metadata/src/main/java/org/apache/kafka/controller/QuorumController.java [1193:1289]


    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
        if (log.isTraceEnabled()) {
            if (snapshotId.isPresent()) {
                log.trace("Replaying snapshot {} record {}",
                    Snapshots.filenameFromSnapshotId(snapshotId.get()),
                        recordRedactor.toLoggableString(message));
            } else {
                log.trace("Replaying log record {} with offset {}",
                        recordRedactor.toLoggableString(message), offset);
            }
        }
        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
        switch (type) {
            case REGISTER_BROKER_RECORD:
                clusterControl.replay((RegisterBrokerRecord) message, offset);
                break;
            case UNREGISTER_BROKER_RECORD:
                clusterControl.replay((UnregisterBrokerRecord) message);
                break;
            case TOPIC_RECORD:
                replicationControl.replay((TopicRecord) message);
                break;
            case PARTITION_RECORD:
                replicationControl.replay((PartitionRecord) message);
                break;
            case CONFIG_RECORD:
                configurationControl.replay((ConfigRecord) message);
                break;
            case PARTITION_CHANGE_RECORD:
                replicationControl.replay((PartitionChangeRecord) message);
                break;
            case FENCE_BROKER_RECORD:
                clusterControl.replay((FenceBrokerRecord) message);
                break;
            case UNFENCE_BROKER_RECORD:
                clusterControl.replay((UnfenceBrokerRecord) message);
                break;
            case REMOVE_TOPIC_RECORD:
                replicationControl.replay((RemoveTopicRecord) message);
                break;
            case FEATURE_LEVEL_RECORD:
                featureControl.replay((FeatureLevelRecord) message);
                break;
            case CLIENT_QUOTA_RECORD:
                clientQuotaControlManager.replay((ClientQuotaRecord) message);
                break;
            case PRODUCER_IDS_RECORD:
                producerIdControlManager.replay((ProducerIdsRecord) message);
                break;
            case BROKER_REGISTRATION_CHANGE_RECORD:
                clusterControl.replay((BrokerRegistrationChangeRecord) message);
                break;
            case ACCESS_CONTROL_ENTRY_RECORD:
                aclControlManager.replay((AccessControlEntryRecord) message);
                break;
            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                aclControlManager.replay((RemoveAccessControlEntryRecord) message);
                break;
            case USER_SCRAM_CREDENTIAL_RECORD:
                scramControlManager.replay((UserScramCredentialRecord) message);
                break;
            case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
                scramControlManager.replay((RemoveUserScramCredentialRecord) message);
                break;
            case DELEGATION_TOKEN_RECORD:
                delegationTokenControlManager.replay((DelegationTokenRecord) message);
                break;
            case REMOVE_DELEGATION_TOKEN_RECORD:
                delegationTokenControlManager.replay((RemoveDelegationTokenRecord) message);
                break;
            case NO_OP_RECORD:
                // NoOpRecord is an empty record and doesn't need to be replayed
                break;
            case ZK_MIGRATION_STATE_RECORD:
                // In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
                // users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
                // Therefore, this case needs to be retained but will be a no-op.
                break;
            case BEGIN_TRANSACTION_RECORD:
                offsetControl.replay((BeginTransactionRecord) message, offset);
                break;
            case END_TRANSACTION_RECORD:
                offsetControl.replay((EndTransactionRecord) message, offset);
                break;
            case ABORT_TRANSACTION_RECORD:
                offsetControl.replay((AbortTransactionRecord) message, offset);
                break;
            case REGISTER_CONTROLLER_RECORD:
                clusterControl.replay((RegisterControllerRecord) message);
                break;
            case CLEAR_ELR_RECORD:
                replicationControl.replay((ClearElrRecord) message);
                break;
            default:
                throw new RuntimeException("Unhandled record type " + type);
        }
    }