in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java [233:271]
private void emitRow(
@Nullable GenericRowData physicalKeyRow,
@Nullable GenericRowData physicalValueRow) {
final RowKind rowKind;
if (physicalValueRow == null) {
if (upsertMode) {
rowKind = RowKind.DELETE;
} else {
throw new DeserializationException(
"Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
}
} else {
rowKind = physicalValueRow.getRowKind();
}
final int metadataArity = metadataConverters.length;
final GenericRowData producedRow =
new GenericRowData(rowKind, physicalArity + metadataArity);
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
assert physicalKeyRow != null;
producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
}
if (physicalValueRow != null) {
for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
producedRow.setField(
valueProjection[valuePos], physicalValueRow.getField(valuePos));
}
}
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
producedRow.setField(
physicalArity + metadataPos,
metadataConverters[metadataPos].read(inputRecord));
}
outputCollector.collect(producedRow);
}