in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java [1122:1316]
public void replay(
long offset,
long producerId,
short producerEpoch,
CoordinatorRecord record
) throws RuntimeException {
ApiMessage key = record.key();
ApiMessageAndVersion value = record.value();
CoordinatorRecordType recordType;
try {
recordType = CoordinatorRecordType.fromId(key.apiKey());
} catch (UnsupportedVersionException ex) {
throw new IllegalStateException("Received an unknown record type " + key.apiKey()
+ " in " + record, ex);
}
switch (recordType) {
case LEGACY_OFFSET_COMMIT:
offsetMetadataManager.replay(
offset,
producerId,
convertLegacyOffsetCommitKey((LegacyOffsetCommitKey) key),
convertLegacyOffsetCommitValue((LegacyOffsetCommitValue) Utils.messageOrNull(value))
);
break;
case OFFSET_COMMIT:
offsetMetadataManager.replay(
offset,
producerId,
(OffsetCommitKey) key,
(OffsetCommitValue) Utils.messageOrNull(value)
);
break;
case GROUP_METADATA:
groupMetadataManager.replay(
(GroupMetadataKey) key,
(GroupMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_METADATA:
groupMetadataManager.replay(
(ConsumerGroupMetadataKey) key,
(ConsumerGroupMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(ConsumerGroupPartitionMetadataKey) key,
(ConsumerGroupPartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(ConsumerGroupMemberMetadataKey) key,
(ConsumerGroupMemberMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA:
groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMetadataKey) key,
(ConsumerGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER:
groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMemberKey) key,
(ConsumerGroupTargetAssignmentMemberValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT:
groupMetadataManager.replay(
(ConsumerGroupCurrentMemberAssignmentKey) key,
(ConsumerGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(ShareGroupPartitionMetadataKey) key,
(ShareGroupPartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(ShareGroupMemberMetadataKey) key,
(ShareGroupMemberMetadataValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_METADATA:
groupMetadataManager.replay(
(ShareGroupMetadataKey) key,
(ShareGroupMetadataValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_TARGET_ASSIGNMENT_METADATA:
groupMetadataManager.replay(
(ShareGroupTargetAssignmentMetadataKey) key,
(ShareGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER:
groupMetadataManager.replay(
(ShareGroupTargetAssignmentMemberKey) key,
(ShareGroupTargetAssignmentMemberValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT:
groupMetadataManager.replay(
(ShareGroupCurrentMemberAssignmentKey) key,
(ShareGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value)
);
break;
case SHARE_GROUP_STATE_PARTITION_METADATA:
groupMetadataManager.replay(
(ShareGroupStatePartitionMetadataKey) key,
(ShareGroupStatePartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_REGULAR_EXPRESSION:
groupMetadataManager.replay(
(ConsumerGroupRegularExpressionKey) key,
(ConsumerGroupRegularExpressionValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_METADATA:
groupMetadataManager.replay(
(StreamsGroupMetadataKey) key,
(StreamsGroupMetadataValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(StreamsGroupPartitionMetadataKey) key,
(StreamsGroupPartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(StreamsGroupMemberMetadataKey) key,
(StreamsGroupMemberMetadataValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_TARGET_ASSIGNMENT_METADATA:
groupMetadataManager.replay(
(StreamsGroupTargetAssignmentMetadataKey) key,
(StreamsGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
groupMetadataManager.replay(
(StreamsGroupTargetAssignmentMemberKey) key,
(StreamsGroupTargetAssignmentMemberValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_CURRENT_MEMBER_ASSIGNMENT:
groupMetadataManager.replay(
(StreamsGroupCurrentMemberAssignmentKey) key,
(StreamsGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_TOPOLOGY:
groupMetadataManager.replay(
(StreamsGroupTopologyKey) key,
(StreamsGroupTopologyValue) Utils.messageOrNull(value)
);
break;
default:
throw new IllegalStateException("Received an unknown record type " + recordType
+ " in " + record);
}
}