public static void checkForOrCreateReplicationPeer()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [307:361]


  public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
                        CuratorFramework zk,
                        String basePath,
                        String peerName, boolean createIfMissing,
                        boolean enablePeer) {
    try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
       Admin admin = conn.getAdmin()) {

      boolean peerThere = false;

      while (!peerThere) {
        try {
          ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
          if (peerConfig !=null) {
            peerThere=true;
          }
        } catch (ReplicationPeerNotFoundException e) {
          if (createIfMissing) {
            ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
            // get the current cluster's ZK config
            String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
                ":" +
                hbaseConf.get("hbase.zookeeper.property.clientPort");
            String znodePath = zookeeperQ + ":"+basePath;
            ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
            admin.addReplicationPeer(peerName, rconf);
            peerThere = true;
          }
        }

        if (peerThere) {
          if (enablePeer){
            LOG.info("enable peer,{}", peerName);
            List<ReplicationPeerDescription> peers = admin.listReplicationPeers().stream()
                    .filter(peer -> peer.getPeerId().equals(peerName))
                    .filter(peer -> !peer.isEnabled())
                    .collect(Collectors.toList());
            if (!peers.isEmpty()){
              admin.enableReplicationPeer(peerName);
            }
          }
          break;
        } else {
          LOG.info("peer "+
                  peerName+" not found, service will not completely start until the peer exists");
        }
        Thread.sleep(5000);
      }

      LOG.info("found replication peer " + peerName);

    } catch (Exception e) {
      LOG.error("Exception running proxy ",e);
    }
  }