in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java [521:562]
private KafkaRecordDeserializationSchema<RowData> createKafkaDeserializationSchema(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
.map(
k ->
Stream.of(ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
// check if connector metadata is used at all
final boolean hasMetadata = metadataKeys.size() > 0;
// adjust physical arity with value format's metadata
final int adjustedPhysicalArity =
DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size();
// adjust value format projection to include value format's metadata columns at the end
final int[] adjustedValueProjection =
IntStream.concat(
IntStream.of(valueProjection),
IntStream.range(
keyProjection.length + valueProjection.length,
adjustedPhysicalArity))
.toArray();
return new DynamicKafkaDeserializationSchema(
adjustedPhysicalArity,
keyDeserialization,
keyProjection,
valueDeserialization,
adjustedValueProjection,
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
}