in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java [545:616]
public PulsarSource<OUT> build() {
// Ensure the topic subscriber for pulsar.
checkNotNull(subscriber, "No topic names or topic pattern are provided.");
if (rangeGenerator == null) {
LOG.warn(
"No range generator provided, we would use the FullRangeGenerator as the default range generator.");
this.rangeGenerator = new FullRangeGenerator();
}
if (boundedness == null) {
LOG.warn("No boundedness was set, mark it as a endless stream.");
this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
}
if (boundedness == Boundedness.BOUNDED
&& configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) >= 0) {
LOG.warn(
"{} property is overridden to -1 because the source is bounded.",
PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
}
checkNotNull(deserializationSchema, "deserializationSchema should be set.");
// Schema evolution validation.
if (Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) {
checkState(
deserializationSchema instanceof PulsarSchemaWrapper,
"When enabling schema evolution, you must provide a Pulsar Schema in builder's setDeserializationSchema method.");
} else if (deserializationSchema instanceof PulsarSchemaWrapper) {
LOG.info(
"It seems like you are consuming messages by using Pulsar Schema."
+ " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
+ " We would use bypass Schema check by default.");
}
if (pulsarCrypto == null) {
this.pulsarCrypto = PulsarCrypto.disabled();
}
if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
LOG.warn(
"We recommend set a readable consumer name through setConsumerName(String) in production mode.");
} else {
String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
if (!consumerName.contains("%s")) {
configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s");
}
}
// Make sure they are serializable.
checkState(
isSerializable(deserializationSchema),
"PulsarDeserializationSchema isn't serializable");
checkState(isSerializable(startCursor), "StartCursor isn't serializable");
checkState(isSerializable(stopCursor), "StopCursor isn't serializable");
checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable");
checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");
// Check builder configuration.
SourceConfiguration sourceConfiguration =
configBuilder.build(SOURCE_CONFIG_VALIDATOR, SourceConfiguration::new);
return new PulsarSource<>(
sourceConfiguration,
subscriber,
rangeGenerator,
startCursor,
stopCursor,
boundedness,
deserializationSchema,
pulsarCrypto);
}