private void configPulsarCrypto()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [254:279]


    private void configPulsarCrypto(ProducerBuilder<?> builder) {
        CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader == null) {
            return;
        }

        // Set the message crypto key reader.
        builder.cryptoKeyReader(cryptoKeyReader);

        // Set the encrypt keys.
        Set<String> encryptKeys = pulsarCrypto.encryptKeys();
        if (encryptKeys == null || encryptKeys.isEmpty()) {
            throw new IllegalArgumentException("You should provide encryptKeys in PulsarCrypto");
        }
        encryptKeys.forEach(builder::addEncryptionKey);

        // Set the message crypto if provided.
        // Pulsar forgets to expose the config in producer builder.
        // See issue https://github.com/apache/pulsar/issues/19139
        MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
                pulsarCrypto.messageCrypto();
        if (messageCrypto != null) {
            ProducerConfigurationData producerConfig = ((ProducerBuilderImpl<?>) builder).getConf();
            producerConfig.setMessageCrypto(messageCrypto);
        }
    }