in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [254:279]
private void configPulsarCrypto(ProducerBuilder<?> builder) {
CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
if (cryptoKeyReader == null) {
return;
}
// Set the message crypto key reader.
builder.cryptoKeyReader(cryptoKeyReader);
// Set the encrypt keys.
Set<String> encryptKeys = pulsarCrypto.encryptKeys();
if (encryptKeys == null || encryptKeys.isEmpty()) {
throw new IllegalArgumentException("You should provide encryptKeys in PulsarCrypto");
}
encryptKeys.forEach(builder::addEncryptionKey);
// Set the message crypto if provided.
// Pulsar forgets to expose the config in producer builder.
// See issue https://github.com/apache/pulsar/issues/19139
MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
pulsarCrypto.messageCrypto();
if (messageCrypto != null) {
ProducerConfigurationData producerConfig = ((ProducerBuilderImpl<?>) builder).getConf();
producerConfig.setMessageCrypto(messageCrypto);
}
}