in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java [261:288]
private static void bindAddress(
PulsarConfiguration configuration,
ConfigOption<String> option,
boolean allowRandomPort,
ObjIntConsumer<String> setter) {
if (!configuration.contains(option)) {
return;
}
String address = configuration.get(option);
if (address.contains(":")) {
try {
String[] addresses = address.split(":");
String host = addresses[0];
int port = Integer.parseInt(addresses[1]);
setter.accept(host, port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid address '" + address + "', port should be int.");
}
} else if (allowRandomPort) {
setter.accept(address, 0);
} else {
throw new IllegalArgumentException(
"The address '" + address + "' should be in host:port format.");
}
}