in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [307:361]
public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
CuratorFramework zk,
String basePath,
String peerName, boolean createIfMissing,
boolean enablePeer) {
try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin()) {
boolean peerThere = false;
while (!peerThere) {
try {
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
if (peerConfig !=null) {
peerThere=true;
}
} catch (ReplicationPeerNotFoundException e) {
if (createIfMissing) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
// get the current cluster's ZK config
String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
":" +
hbaseConf.get("hbase.zookeeper.property.clientPort");
String znodePath = zookeeperQ + ":"+basePath;
ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
admin.addReplicationPeer(peerName, rconf);
peerThere = true;
}
}
if (peerThere) {
if (enablePeer){
LOG.info("enable peer,{}", peerName);
List<ReplicationPeerDescription> peers = admin.listReplicationPeers().stream()
.filter(peer -> peer.getPeerId().equals(peerName))
.filter(peer -> !peer.isEnabled())
.collect(Collectors.toList());
if (!peers.isEmpty()){
admin.enableReplicationPeer(peerName);
}
}
break;
} else {
LOG.info("peer "+
peerName+" not found, service will not completely start until the peer exists");
}
Thread.sleep(5000);
}
LOG.info("found replication peer " + peerName);
} catch (Exception e) {
LOG.error("Exception running proxy ",e);
}
}