public SinkRuntimeProvider getSinkRuntimeProvider()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java [200:261]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        final SerializationSchema<RowData> keySerialization =
                createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix);

        final SerializationSchema<RowData> valueSerialization =
                createSerialization(context, valueEncodingFormat, valueProjection, null);

        final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
        final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
        if (transactionalIdPrefix != null) {
            sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
        }
        final KafkaSink<RowData> kafkaSink =
                sinkBuilder
                        .setDeliveryGuarantee(deliveryGuarantee)
                        .setBootstrapServers(
                                properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
                        .setKafkaProducerConfig(properties)
                        .setRecordSerializer(
                                new DynamicKafkaRecordSerializationSchema(
                                        topics,
                                        topicPattern,
                                        partitioner,
                                        keySerialization,
                                        valueSerialization,
                                        getFieldGetters(physicalChildren, keyProjection),
                                        getFieldGetters(physicalChildren, valueProjection),
                                        hasMetadata(),
                                        getMetadataPositions(physicalChildren),
                                        upsertMode))
                        .setTransactionNamingStrategy(transactionNamingStrategy)
                        .build();
        if (flushMode.isEnabled() && upsertMode) {
            return new DataStreamSinkProvider() {
                @Override
                public DataStreamSink<?> consumeDataStream(
                        ProviderContext providerContext, DataStream<RowData> dataStream) {
                    final boolean objectReuse =
                            dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
                    final ReducingUpsertSink<?, ?> sink =
                            new ReducingUpsertSink<>(
                                    kafkaSink,
                                    physicalDataType,
                                    keyProjection,
                                    flushMode,
                                    objectReuse
                                            ? createRowDataTypeSerializer(
                                                            context,
                                                            dataStream.getExecutionConfig())
                                                    ::copy
                                            : rowData -> rowData);
                    final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
                    providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
                    if (parallelism != null) {
                        end.setParallelism(parallelism);
                    }
                    return end;
                }
            };
        }
        return SinkV2Provider.of(kafkaSink, parallelism);
    }