in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [146:160]
public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, @Nullable Schema<?> schema)
throws PulsarClientException {
if (schema == null || schema.getSchemaInfo().getType() == SchemaType.BYTES) {
schema = getBytesSchema(topic);
}
ProducerBase<?> producer = (ProducerBase<?>) getOrCreateProducer(topic, schema);
TransactionImpl transaction = null;
if (sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
transaction = (TransactionImpl) getOrCreateTransaction(topic);
}
return (TypedMessageBuilder<T>)
new TypedMessageBuilderImpl<>(producer, schema, transaction);
}