public void write()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [141:179]


    public void write(IN element, Context context) throws IOException, InterruptedException {
        PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);

        // Choose the right topic to send.
        String key = message.getKey();
        List<TopicPartition> partitions = metadataListener.availablePartitions();
        TopicPartition partition = topicRouter.route(element, key, partitions, sinkContext);
        String topic = partition.getFullTopicName();

        // Create message builder for sending messages.
        TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);

        // Message Delay delivery.
        long deliverAt = messageDelayer.deliverAt(element, sinkContext);
        if (deliverAt > 0) {
            builder.deliverAt(deliverAt);
        }

        // Perform message sending.
        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
            // We would just ignore the sending exception. This may cause data loss.
            builder.sendAsync();
        } else {
            // Increase the pending message count.
            pendingMessages.incrementAndGet();
            CompletableFuture<MessageId> future = builder.sendAsync();
            future.whenComplete(
                    (id, ex) -> {
                        pendingMessages.decrementAndGet();
                        if (ex != null) {
                            mailboxExecutor.execute(
                                    () -> throwSendingException(topic, ex),
                                    "Failed to send data to Pulsar");
                        } else {
                            LOG.debug("Sent message to Pulsar {} with message id {}", topic, id);
                        }
                    });
        }
    }