in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java [189:248]
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema<RowData> keySerialization =
createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix);
final SerializationSchema<RowData> valueSerialization =
createSerialization(context, valueEncodingFormat, valueProjection, null);
final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
if (transactionalIdPrefix != null) {
sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
}
final KafkaSink<RowData> kafkaSink =
sinkBuilder
.setDeliveryGuarantee(deliveryGuarantee)
.setBootstrapServers(
properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
.setKafkaProducerConfig(properties)
.setRecordSerializer(
new DynamicKafkaRecordSerializationSchema(
topic,
partitioner,
keySerialization,
valueSerialization,
getFieldGetters(physicalChildren, keyProjection),
getFieldGetters(physicalChildren, valueProjection),
hasMetadata(),
getMetadataPositions(physicalChildren),
upsertMode))
.build();
if (flushMode.isEnabled() && upsertMode) {
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
final boolean objectReuse =
dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
final ReducingUpsertSink<?, ?> sink =
new ReducingUpsertSink<>(
kafkaSink,
physicalDataType,
keyProjection,
flushMode,
objectReuse
? createRowDataTypeSerializer(
context,
dataStream.getExecutionConfig())
::copy
: rowData -> rowData);
final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
if (parallelism != null) {
end.setParallelism(parallelism);
}
return end;
}
};
}
return SinkV2Provider.of(kafkaSink, parallelism);
}