in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java [100:125]
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
throws IOException {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
return;
}
// buffer key(s)
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
}
// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
outputCollector.outputCollector = collector;
if (record.value() == null && upsertMode) {
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
}
keyCollector.buffer.clear();
}