in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [244:255]
public void flush(boolean endOfInput) throws IOException {
if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
LOG.info("Flush the pending messages to Pulsar.");
// Try to flush pending messages.
producerRegister.flush();
// Make sure all the pending messages should be flushed to Pulsar.
while (pendingMessages.longValue() > 0) {
producerRegister.flush();
}
}
}