in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java [63:132]
public static IdealState expandCustomRebalanceModeIdealStateFor(IdealState oldIdealState,
String topicName, int newNumTopicPartitions, ControllerConf controllerConf,
PriorityQueue<InstanceTopicPartitionHolder> currentServingInstances) {
final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(topicName);
customModeIdealStateBuilder
.setStateModel(OnlineOfflineStateModel.name)
.setNumPartitions(newNumTopicPartitions).setNumReplica(1)
.setMaxPartitionsPerNode(newNumTopicPartitions);
int numOldPartitions = oldIdealState.getNumPartitions();
for (int i = 0; i < numOldPartitions; ++i) {
String partitionName = Integer.toString(i);
try {
String instanceName =
oldIdealState.getInstanceStateMap(partitionName).keySet().iterator().next();
customModeIdealStateBuilder.assignInstanceAndState(partitionName, instanceName, "ONLINE");
} catch (Exception e) {
// No worker added into the cluster.
}
}
ZkClient zkClient = null;
String topicPath = "/consumers/" + controllerConf.getGroupId() + "/offsets/" + topicName;
String consumerOffsetPath = topicPath + "/";
String zkString = controllerConf.getConsumerCommitZkPath().isEmpty() ?
controllerConf.getSrcKafkaZkPath() : controllerConf.getConsumerCommitZkPath();
boolean pathExisted = false;
if (!StringUtils.isEmpty(zkString)) {
zkClient = new ZkClient(zkString, 30000, 30000, ZKStringSerializer$.MODULE$);
try {
if (!zkClient.exists(topicPath)) {
zkClient.createPersistent(topicPath);
}
pathExisted = true;
} catch (Exception e) {
LOGGER.warn("Fails to create path {}", topicPath, e);
}
}
// Assign new partitions to as many workers as possible
List<InstanceTopicPartitionHolder> instancesForNewPartitions = new ArrayList<>();
while (instancesForNewPartitions.size() < newNumTopicPartitions - numOldPartitions
&& !currentServingInstances.isEmpty()) {
instancesForNewPartitions.add(currentServingInstances.poll());
}
if (!instancesForNewPartitions.isEmpty()) {
for (int i = numOldPartitions; i < newNumTopicPartitions; ++i) {
if (pathExisted) {
Object obj = zkClient.readData(consumerOffsetPath + i, true);
if (obj == null) {
zkClient.createPersistent(consumerOffsetPath + i, "0");
LOGGER.info("Create new zk node " + zkString + consumerOffsetPath + i);
}
}
InstanceTopicPartitionHolder liveInstance =
instancesForNewPartitions.get((i - numOldPartitions) % instancesForNewPartitions.size());
customModeIdealStateBuilder.assignInstanceAndState(Integer.toString(i),
liveInstance.getInstanceName(),
"ONLINE");
liveInstance.addTopicPartition(new TopicPartition(topicName, i));
LOGGER.info("Assign new partition " + topicName + ":" + i + " to instance " + liveInstance.getInstanceName());
}
currentServingInstances.addAll(instancesForNewPartitions);
}
if (zkClient != null) {
zkClient.close();
}
return customModeIdealStateBuilder.build();
}