in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ManagerControllerHelix.java [220:256]
public boolean handleTopicAssignmentEvent(String topic, String srcCluster, String dstCluster, String routePartition, String toState) {
synchronized (_handlerLock) {
if (_currentControllerInstance == null) {
if (toState.equals("OFFLINE") || toState.equals("DROPPED")) {
LOGGER.error(
"Controller is not started yet. Failed to action={} topic={} for srcCluster={}, dstCluster={}, routePartition={}",
toState, topic, srcCluster, dstCluster, routePartition);
return false;
}
LOGGER.info(
"Controller is not started yet. Start a new instance: srcCluster={}, dstCluster={}, routePartition={}",
srcCluster, dstCluster, routePartition);
handleRouteAssignmentOnline(srcCluster, dstCluster, routePartition);
}
if (!(srcCluster.equals(_currentSrcCluster) && dstCluster.equals(_currentDstCluster))) {
String msg = String.format("Inconsistent route assignment: expected src=%s, dst=%s, but given src=%s, dst=%s, toState=%s",
_currentSrcCluster, _currentDstCluster, srcCluster, dstCluster, toState);
LOGGER.error(msg);
if (!toState.equals("OFFLINE") && !toState.equals("DROPPED")) {
throw new IllegalArgumentException(msg);
} else {
return false;
}
}
if (toState.equals("ONLINE")) {
return handleTopicAssignmentOnline(topic, srcCluster, dstCluster);
} else if (toState.equals("OFFLINE")) {
return handleTopicAssignmentOffline(topic, srcCluster, dstCluster);
} else if (toState.equals("DROPPED")) {
return handleTopicAssignmentDropped(topic, srcCluster, dstCluster);
} else {
String msg = "Invalid topic assignement state: " + toState;
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
}
}