in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java [136:162]
public KafkaWriter<IN> restoreWriter(
WriterInitContext context, Collection<KafkaWriterState> recoveredState) {
KafkaWriter<IN> writer;
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
writer =
new ExactlyOnceKafkaWriter<>(
deliveryGuarantee,
kafkaProducerConfig,
transactionalIdPrefix,
context,
recordSerializer,
context.asSerializationSchemaInitializationContext(),
transactionNamingStrategy.getAbortImpl(),
transactionNamingStrategy.getImpl(),
recoveredState);
} else {
writer =
new KafkaWriter<>(
deliveryGuarantee,
kafkaProducerConfig,
context,
recordSerializer,
context.asSerializationSchemaInitializationContext());
}
writer.initialize();
return writer;
}