in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java [117:175]
public ProducerRecord<byte[], byte[]> serialize(
RowData consumedRow, KafkaSinkContext context, Long timestamp) {
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
targetTopic,
extractPartition(
consumedRow,
targetTopic,
null,
valueSerialized,
context.getPartitionsForTopic(targetTopic)),
null,
valueSerialized);
}
final byte[] keySerialized;
if (keySerialization == null) {
keySerialized = null;
} else {
final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters);
keySerialized = keySerialization.serialize(keyRow);
}
final byte[] valueSerialized;
final RowKind kind = consumedRow.getRowKind();
if (upsertMode) {
if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
// transform the message as the tombstone message
valueSerialized = null;
} else {
// make the message to be INSERT to be compliant with the INSERT-ONLY format
final RowData valueRow =
DynamicKafkaRecordSerializationSchema.createProjectedRow(
consumedRow, kind, valueFieldGetters);
valueRow.setRowKind(RowKind.INSERT);
valueSerialized = valueSerialization.serialize(valueRow);
}
} else {
final RowData valueRow =
DynamicKafkaRecordSerializationSchema.createProjectedRow(
consumedRow, kind, valueFieldGetters);
valueSerialized = valueSerialization.serialize(valueRow);
}
final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
targetTopic,
extractPartition(
consumedRow,
targetTopic,
keySerialized,
valueSerialized,
context.getPartitionsForTopic(targetTopic)),
readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.TIMESTAMP),
keySerialized,
valueSerialized,
readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS));
}