public TypedMessageBuilder createMessageBuilder()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [146:160]


    public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, @Nullable Schema<?> schema)
            throws PulsarClientException {
        if (schema == null || schema.getSchemaInfo().getType() == SchemaType.BYTES) {
            schema = getBytesSchema(topic);
        }
        ProducerBase<?> producer = (ProducerBase<?>) getOrCreateProducer(topic, schema);
        TransactionImpl transaction = null;

        if (sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
            transaction = (TransactionImpl) getOrCreateTransaction(topic);
        }

        return (TypedMessageBuilder<T>)
                new TypedMessageBuilderImpl<>(producer, schema, transaction);
    }