in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [186:241]
private TypedMessageBuilder<?> createMessageBuilder(
String topic, Context context, PulsarMessage<?> message) throws PulsarClientException {
Schema<?> schema = message.getSchema();
TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);
byte[] orderingKey = message.getOrderingKey();
if (orderingKey != null && orderingKey.length > 0) {
builder.orderingKey(orderingKey);
}
String key = message.getKey();
if (!Strings.isNullOrEmpty(key)) {
builder.key(key);
}
long eventTime = message.getEventTime();
if (eventTime > 0) {
builder.eventTime(eventTime);
} else {
// Set default message timestamp if flink has provided one.
Long timestamp = context.timestamp();
if (timestamp != null && timestamp > 0L) {
builder.eventTime(timestamp);
}
}
// Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder.
// The type has been checked in PulsarMessageBuilder#value.
Object value = message.getValue();
if (value == null) {
LOG.warn("Send a message with empty payloads, this is a tombstone message in Pulsar.");
}
((TypedMessageBuilder) builder).value(value);
Map<String, String> properties = message.getProperties();
if (properties != null && !properties.isEmpty()) {
builder.properties(properties);
}
Long sequenceId = message.getSequenceId();
if (sequenceId != null) {
builder.sequenceId(sequenceId);
}
List<String> clusters = message.getReplicationClusters();
if (clusters != null && !clusters.isEmpty()) {
builder.replicationClusters(clusters);
}
if (message.isDisableReplication()) {
builder.disableReplication();
}
return builder;
}