public PulsarSink build()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java [396:488]


    public PulsarSink<IN> build() {
        // Change delivery guarantee.
        DeliveryGuarantee deliveryGuarantee = configBuilder.get(PULSAR_WRITE_DELIVERY_GUARANTEE);
        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
            LOG.warn(
                    "You haven't set delivery guarantee or set it to NONE, this would cause data loss. Make sure you have known this shortcoming.");
        } else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            LOG.info(
                    "Exactly once require flink checkpoint and your pulsar cluster should support the transaction.");
            configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
            configBuilder.override(PULSAR_SEND_TIMEOUT_MS, 0L);

            if (!configBuilder.contains(PULSAR_WRITE_TRANSACTION_TIMEOUT)) {
                LOG.warn(
                        "The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval.");
            } else {
                Long timeout = configBuilder.get(PULSAR_WRITE_TRANSACTION_TIMEOUT);
                LOG.warn(
                        "The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.",
                        timeout);
            }
        }

        if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
            LOG.warn(
                    "We recommend set a readable producer name through setProducerName(String) in production mode.");
        } else {
            String producerName = configBuilder.get(PULSAR_PRODUCER_NAME);
            if (!producerName.contains("%s")) {
                configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " - %s");
            }
        }

        checkNotNull(serializationSchema, "serializationSchema must be set.");
        // Schema evolution validation.
        if (Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
            checkState(
                    serializationSchema instanceof PulsarSchemaWrapper,
                    "When enabling schema evolution, you must provide a Pulsar Schema in builder's setSerializationSchema method.");
        } else if (serializationSchema instanceof PulsarSchemaWrapper) {
            LOG.info(
                    "It seems like you are sending messages by using Pulsar Schema."
                            + " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check."
                            + " We would use bypass Schema check by default.");
        }

        // Topic metadata listener validation.
        if (metadataListener == null) {
            if (topicRouter == null) {
                throw new NullPointerException(
                        "No topic names or custom topic router are provided.");
            } else {
                LOG.warn(
                        "No topic set has been provided, make sure your custom topic router support empty topic set.");
                this.metadataListener = new MetadataListener();
            }
        }

        // Topic routing mode validation.
        if (topicRoutingMode == null) {
            LOG.info("No topic routing mode has been chosen. We use round-robin mode as default.");
            this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
        }

        if (messageDelayer == null) {
            this.messageDelayer = MessageDelayer.never();
        }

        if (pulsarCrypto == null) {
            this.pulsarCrypto = PulsarCrypto.disabled();
        }

        // Make sure they are serializable.
        checkState(
                isSerializable(serializationSchema),
                "PulsarSerializationSchema isn't serializable");
        checkState(isSerializable(messageDelayer), "MessageDelayer isn't serializable");
        checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");

        // This is an unmodifiable configuration for Pulsar.
        // We don't use Pulsar's built-in configure classes for compatible requirement.
        SinkConfiguration sinkConfiguration =
                configBuilder.build(SINK_CONFIG_VALIDATOR, SinkConfiguration::new);

        return new PulsarSink<>(
                sinkConfiguration,
                serializationSchema,
                metadataListener,
                topicRoutingMode,
                topicRouter,
                messageDelayer,
                pulsarCrypto);
    }