in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java [103:124]
public <K, V> PulsarSchema(
Schema<KeyValue<K, V>> kvSchema, Class<K> keyClass, Class<V> valueClass) {
SchemaInfo info = kvSchema.getSchemaInfo();
checkArgument(
info.getType() == SchemaType.KEY_VALUE,
"This constructor could only be applied for KeyValueSchema");
KeyValue<SchemaInfo, SchemaInfo> infoKeyValue = decodeKeyValueSchemaInfo(info);
SchemaInfo infoKey = encodeClassInfo(infoKeyValue.getKey(), keyClass);
validateSchemaInfo(infoKey);
SchemaInfo infoValue = encodeClassInfo(infoKeyValue.getValue(), valueClass);
validateSchemaInfo(infoValue);
KeyValueEncodingType encodingType = decodeKeyValueEncodingType(info);
SchemaInfo encodedInfo =
encodeKeyValueSchemaInfo(info.getName(), infoKey, infoValue, encodingType);
this.schemaInfo = encodeClassInfo(encodedInfo, KeyValue.class);
this.schema = createSchema(this.schemaInfo);
}