in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java [237:259]
private static Authentication createAuthentication(PulsarConfiguration configuration)
throws PulsarClientException {
if (configuration.contains(PULSAR_AUTH_PLUGIN_CLASS_NAME)) {
String authPluginClassName = configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
if (configuration.contains(PULSAR_AUTH_PARAMS)) {
String authParamsString = configuration.get(PULSAR_AUTH_PARAMS);
return AuthenticationFactory.create(authPluginClassName, authParamsString);
} else {
Map<String, String> paramsMap = configuration.getProperties(PULSAR_AUTH_PARAM_MAP);
if (paramsMap.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"No %s or %s provided",
PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key()));
}
return AuthenticationFactory.create(authPluginClassName, paramsMap);
}
}
return AuthenticationDisabled.INSTANCE;
}