in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [285:334]
public static void checkForOrCreateReplicationPeer(Configuration hbaseConf, CuratorFramework zk,
String basePath, String peerName, boolean createIfMissing, boolean enablePeer) {
try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin()) {
boolean peerThere = false;
while (!peerThere) {
try {
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
if (peerConfig != null) {
peerThere = true;
}
} catch (ReplicationPeerNotFoundException e) {
if (createIfMissing) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
// get the current cluster's ZK config
String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") + ":"
+ hbaseConf.get("hbase.zookeeper.property.clientPort");
String znodePath = zookeeperQ + ":" + basePath;
ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
admin.addReplicationPeer(peerName, rconf);
peerThere = true;
}
}
if (peerThere) {
if (enablePeer) {
LOG.info("enable peer,{}", peerName);
List<ReplicationPeerDescription> peers = admin.listReplicationPeers().stream()
.filter(peer -> peer.getPeerId().equals(peerName)).filter(peer -> !peer.isEnabled())
.collect(Collectors.toList());
if (!peers.isEmpty()) {
admin.enableReplicationPeer(peerName);
}
}
break;
} else {
LOG.info("peer " + peerName
+ " not found, service will not completely start until the peer exists");
}
Thread.sleep(5000);
}
LOG.info("found replication peer " + peerName);
} catch (Exception e) {
LOG.error("Exception running proxy ", e);
}
}