private TypedMessageBuilder createMessageBuilder()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [186:241]


    private TypedMessageBuilder<?> createMessageBuilder(
            String topic, Context context, PulsarMessage<?> message) throws PulsarClientException {

        Schema<?> schema = message.getSchema();
        TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);

        byte[] orderingKey = message.getOrderingKey();
        if (orderingKey != null && orderingKey.length > 0) {
            builder.orderingKey(orderingKey);
        }

        String key = message.getKey();
        if (!Strings.isNullOrEmpty(key)) {
            builder.key(key);
        }

        long eventTime = message.getEventTime();
        if (eventTime > 0) {
            builder.eventTime(eventTime);
        } else {
            // Set default message timestamp if flink has provided one.
            Long timestamp = context.timestamp();
            if (timestamp != null && timestamp > 0L) {
                builder.eventTime(timestamp);
            }
        }

        // Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder.
        // The type has been checked in PulsarMessageBuilder#value.
        Object value = message.getValue();
        if (value == null) {
            LOG.warn("Send a message with empty payloads, this is a tombstone message in Pulsar.");
        }
        ((TypedMessageBuilder) builder).value(value);

        Map<String, String> properties = message.getProperties();
        if (properties != null && !properties.isEmpty()) {
            builder.properties(properties);
        }

        Long sequenceId = message.getSequenceId();
        if (sequenceId != null) {
            builder.sequenceId(sequenceId);
        }

        List<String> clusters = message.getReplicationClusters();
        if (clusters != null && !clusters.isEmpty()) {
            builder.replicationClusters(clusters);
        }

        if (message.isDisableReplication()) {
            builder.disableReplication();
        }

        return builder;
    }