public static IdealState expandCustomRebalanceModeIdealStateFor()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java [63:132]


  public static IdealState expandCustomRebalanceModeIdealStateFor(IdealState oldIdealState,
      String topicName, int newNumTopicPartitions, ControllerConf controllerConf,
      PriorityQueue<InstanceTopicPartitionHolder> currentServingInstances) {
    final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(topicName);

    customModeIdealStateBuilder
        .setStateModel(OnlineOfflineStateModel.name)
        .setNumPartitions(newNumTopicPartitions).setNumReplica(1)
        .setMaxPartitionsPerNode(newNumTopicPartitions);

    int numOldPartitions = oldIdealState.getNumPartitions();
    for (int i = 0; i < numOldPartitions; ++i) {
      String partitionName = Integer.toString(i);
      try {
        String instanceName =
            oldIdealState.getInstanceStateMap(partitionName).keySet().iterator().next();
        customModeIdealStateBuilder.assignInstanceAndState(partitionName, instanceName, "ONLINE");
      } catch (Exception e) {
        // No worker added into the cluster.
      }
    }

    ZkClient zkClient = null;
    String topicPath = "/consumers/" + controllerConf.getGroupId() + "/offsets/" + topicName;
    String consumerOffsetPath = topicPath + "/";
    String zkString = controllerConf.getConsumerCommitZkPath().isEmpty() ?
        controllerConf.getSrcKafkaZkPath() : controllerConf.getConsumerCommitZkPath();
    boolean pathExisted = false;
    if (!StringUtils.isEmpty(zkString)) {
      zkClient = new ZkClient(zkString, 30000, 30000, ZKStringSerializer$.MODULE$);
      try {
        if (!zkClient.exists(topicPath)) {
          zkClient.createPersistent(topicPath);
        }
        pathExisted = true;
      } catch (Exception e) {
        LOGGER.warn("Fails to create path {}", topicPath, e);
      }
    }
    // Assign new partitions to as many workers as possible
    List<InstanceTopicPartitionHolder> instancesForNewPartitions = new ArrayList<>();
    while (instancesForNewPartitions.size() < newNumTopicPartitions - numOldPartitions
        && !currentServingInstances.isEmpty()) {
      instancesForNewPartitions.add(currentServingInstances.poll());
    }
    if (!instancesForNewPartitions.isEmpty()) {
      for (int i = numOldPartitions; i < newNumTopicPartitions; ++i) {
        if (pathExisted) {
          Object obj = zkClient.readData(consumerOffsetPath + i, true);
          if (obj == null) {
            zkClient.createPersistent(consumerOffsetPath + i, "0");
            LOGGER.info("Create new zk node " + zkString + consumerOffsetPath + i);
          }
        }

        InstanceTopicPartitionHolder liveInstance =
            instancesForNewPartitions.get((i - numOldPartitions) % instancesForNewPartitions.size());
        customModeIdealStateBuilder.assignInstanceAndState(Integer.toString(i),
            liveInstance.getInstanceName(),
            "ONLINE");
        liveInstance.addTopicPartition(new TopicPartition(topicName, i));
        LOGGER.info("Assign new partition " + topicName + ":" + i + " to instance " + liveInstance.getInstanceName());
      }
      currentServingInstances.addAll(instancesForNewPartitions);
    }
    if (zkClient != null) {
      zkClient.close();
    }
    return customModeIdealStateBuilder.build();
  }