in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [111:134]
public ProducerRegister(
SinkConfiguration sinkConfiguration,
PulsarCrypto pulsarCrypto,
SinkWriterMetricGroup metricGroup)
throws PulsarClientException {
this.pulsarClient = createClient(sinkConfiguration);
this.sinkConfiguration = sinkConfiguration;
this.pulsarCrypto = pulsarCrypto;
this.metricGroup = metricGroup;
this.schemas = new HashMap<>();
this.producers = new HashMap<>();
this.transactions = new HashMap<>();
if (sinkConfiguration.isEnableMetrics()) {
metricGroup.setCurrentSendTimeGauge(this::currentSendTimeGauge);
}
// Check if we have enabled the transaction in the exactly-once delivery guarantee.
if (sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
this.coordinatorClient = getTcClient(pulsarClient);
} else {
this.coordinatorClient = null;
}
}