in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [141:179]
public void write(IN element, Context context) throws IOException, InterruptedException {
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);
// Choose the right topic to send.
String key = message.getKey();
List<TopicPartition> partitions = metadataListener.availablePartitions();
TopicPartition partition = topicRouter.route(element, key, partitions, sinkContext);
String topic = partition.getFullTopicName();
// Create message builder for sending messages.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
// Message Delay delivery.
long deliverAt = messageDelayer.deliverAt(element, sinkContext);
if (deliverAt > 0) {
builder.deliverAt(deliverAt);
}
// Perform message sending.
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// We would just ignore the sending exception. This may cause data loss.
builder.sendAsync();
} else {
// Increase the pending message count.
pendingMessages.incrementAndGet();
CompletableFuture<MessageId> future = builder.sendAsync();
future.whenComplete(
(id, ex) -> {
pendingMessages.decrementAndGet();
if (ex != null) {
mailboxExecutor.execute(
() -> throwSendingException(topic, ex),
"Failed to send data to Pulsar");
} else {
LOG.debug("Sent message to Pulsar {} with message id {}", topic, id);
}
});
}
}