in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java [396:488]
public PulsarSink<IN> build() {
// Change delivery guarantee.
DeliveryGuarantee deliveryGuarantee = configBuilder.get(PULSAR_WRITE_DELIVERY_GUARANTEE);
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
LOG.warn(
"You haven't set delivery guarantee or set it to NONE, this would cause data loss. Make sure you have known this shortcoming.");
} else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
LOG.info(
"Exactly once require flink checkpoint and your pulsar cluster should support the transaction.");
configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
configBuilder.override(PULSAR_SEND_TIMEOUT_MS, 0L);
if (!configBuilder.contains(PULSAR_WRITE_TRANSACTION_TIMEOUT)) {
LOG.warn(
"The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval.");
} else {
Long timeout = configBuilder.get(PULSAR_WRITE_TRANSACTION_TIMEOUT);
LOG.warn(
"The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.",
timeout);
}
}
if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
LOG.warn(
"We recommend set a readable producer name through setProducerName(String) in production mode.");
} else {
String producerName = configBuilder.get(PULSAR_PRODUCER_NAME);
if (!producerName.contains("%s")) {
configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " - %s");
}
}
checkNotNull(serializationSchema, "serializationSchema must be set.");
// Schema evolution validation.
if (Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
checkState(
serializationSchema instanceof PulsarSchemaWrapper,
"When enabling schema evolution, you must provide a Pulsar Schema in builder's setSerializationSchema method.");
} else if (serializationSchema instanceof PulsarSchemaWrapper) {
LOG.info(
"It seems like you are sending messages by using Pulsar Schema."
+ " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
+ " We would use bypass Schema check by default.");
}
// Topic metadata listener validation.
if (metadataListener == null) {
if (topicRouter == null) {
throw new NullPointerException(
"No topic names or custom topic router are provided.");
} else {
LOG.warn(
"No topic set has been provided, make sure your custom topic router support empty topic set.");
this.metadataListener = new MetadataListener();
}
}
// Topic routing mode validation.
if (topicRoutingMode == null) {
LOG.info("No topic routing mode has been chosen. We use round-robin mode as default.");
this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
}
if (messageDelayer == null) {
this.messageDelayer = MessageDelayer.never();
}
if (pulsarCrypto == null) {
this.pulsarCrypto = PulsarCrypto.disabled();
}
// Make sure they are serializable.
checkState(
isSerializable(serializationSchema),
"PulsarSerializationSchema isn't serializable");
checkState(isSerializable(messageDelayer), "MessageDelayer isn't serializable");
checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");
// This is an unmodifiable configuration for Pulsar.
// We don't use Pulsar's built-in configure classes for compatible requirement.
SinkConfiguration sinkConfiguration =
configBuilder.build(SINK_CONFIG_VALIDATOR, SinkConfiguration::new);
return new PulsarSink<>(
sinkConfiguration,
serializationSchema,
metadataListener,
topicRoutingMode,
topicRouter,
messageDelayer,
pulsarCrypto);
}