public PulsarSource build()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java [545:616]


    public PulsarSource<OUT> build() {
        // Ensure the topic subscriber for pulsar.
        checkNotNull(subscriber, "No topic names or topic pattern are provided.");

        if (rangeGenerator == null) {
            LOG.warn(
                    "No range generator provided, we would use the FullRangeGenerator as the default range generator.");
            this.rangeGenerator = new FullRangeGenerator();
        }

        if (boundedness == null) {
            LOG.warn("No boundedness was set, mark it as a endless stream.");
            this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
        }
        if (boundedness == Boundedness.BOUNDED
                && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) >= 0) {
            LOG.warn(
                    "{} property is overridden to -1 because the source is bounded.",
                    PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
            configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
        }

        checkNotNull(deserializationSchema, "deserializationSchema should be set.");
        // Schema evolution validation.
        if (Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) {
            checkState(
                    deserializationSchema instanceof PulsarSchemaWrapper,
                    "When enabling schema evolution, you must provide a Pulsar Schema in builder's setDeserializationSchema method.");
        } else if (deserializationSchema instanceof PulsarSchemaWrapper) {
            LOG.info(
                    "It seems like you are consuming messages by using Pulsar Schema."
                            + " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
                            + " We would use bypass Schema check by default.");
        }

        if (pulsarCrypto == null) {
            this.pulsarCrypto = PulsarCrypto.disabled();
        }

        if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
            LOG.warn(
                    "We recommend set a readable consumer name through setConsumerName(String) in production mode.");
        } else {
            String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
            if (!consumerName.contains("%s")) {
                configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s");
            }
        }

        // Make sure they are serializable.
        checkState(
                isSerializable(deserializationSchema),
                "PulsarDeserializationSchema isn't serializable");
        checkState(isSerializable(startCursor), "StartCursor isn't serializable");
        checkState(isSerializable(stopCursor), "StopCursor isn't serializable");
        checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable");
        checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");

        // Check builder configuration.
        SourceConfiguration sourceConfiguration =
                configBuilder.build(SOURCE_CONFIG_VALIDATOR, SourceConfiguration::new);

        return new PulsarSource<>(
                sourceConfiguration,
                subscriber,
                rangeGenerator,
                startCursor,
                stopCursor,
                boundedness,
                deserializationSchema,
                pulsarCrypto);
    }