public static void checkForOrCreateReplicationPeer()

in kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java [285:334]


  public static void checkForOrCreateReplicationPeer(Configuration hbaseConf, CuratorFramework zk,
    String basePath, String peerName, boolean createIfMissing, boolean enablePeer) {
    try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
      Admin admin = conn.getAdmin()) {

      boolean peerThere = false;

      while (!peerThere) {
        try {
          ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
          if (peerConfig != null) {
            peerThere = true;
          }
        } catch (ReplicationPeerNotFoundException e) {
          if (createIfMissing) {
            ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
            // get the current cluster's ZK config
            String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") + ":"
              + hbaseConf.get("hbase.zookeeper.property.clientPort");
            String znodePath = zookeeperQ + ":" + basePath;
            ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
            admin.addReplicationPeer(peerName, rconf);
            peerThere = true;
          }
        }

        if (peerThere) {
          if (enablePeer) {
            LOG.info("enable peer,{}", peerName);
            List<ReplicationPeerDescription> peers = admin.listReplicationPeers().stream()
              .filter(peer -> peer.getPeerId().equals(peerName)).filter(peer -> !peer.isEnabled())
              .collect(Collectors.toList());
            if (!peers.isEmpty()) {
              admin.enableReplicationPeer(peerName);
            }
          }
          break;
        } else {
          LOG.info("peer " + peerName
            + " not found, service will not completely start until the peer exists");
        }
        Thread.sleep(5000);
      }

      LOG.info("found replication peer " + peerName);

    } catch (Exception e) {
      LOG.error("Exception running proxy ", e);
    }
  }