in pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java [111:129]
private void createClient(NioEventLoops eventLoops) {
String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
if (hosts.length <= 0) {
throw new RuntimeException("Invalid Seed Hosts");
}
Host[] aeroSpikeHosts = new Host[hosts.length];
for (int i = 0; i < hosts.length; ++i) {
String[] hostPort = hosts[i].split(":");
aeroSpikeHosts[i] = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
}
ClientPolicy policy = new ClientPolicy();
if (aerospikeSinkConfig.getUserName() != null && !aerospikeSinkConfig.getUserName().isEmpty()
&& aerospikeSinkConfig.getPassword() != null && !aerospikeSinkConfig.getPassword().isEmpty()) {
policy.user = aerospikeSinkConfig.getUserName();
policy.password = aerospikeSinkConfig.getPassword();
}
policy.eventLoops = eventLoops;
client = new AerospikeClient(policy, aeroSpikeHosts);
}