in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java [169:233]
public List<KeyValue> taskConfigs(int maxTasks) {
// normal topic
Map<String, String> topicTagMap = ReplicatorConnectorConfig.getSrcTopicTagMap(
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
if (MapUtils.isEmpty(topicTagMap)) {
throw new ConnectException("Source topics & tags config cannot be null, please check the config info");
}
List<String> topics = new LinkedList<>(topicTagMap.keySet());
// todo rebalance 使用原生的;runtime & connector 都保存offset;
// get queue
curMessageQueues = fetchMessageQueues(topics);
int taskNum;
taskNum = Math.min(curMessageQueues.size(), maxTasks);
log.info("messageQueue : " + curMessageQueues.size() + " " + curMessageQueues);
// divide
List<List<MessageQueue>> normalDivided = divide(new ArrayList<>(curMessageQueues), taskNum);
log.info("normalDivided : " + normalDivided + " " + normalDivided);
List<KeyValue> configs = new ArrayList<>();
for (int i = 0; i < taskNum; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, JSON.toJSONString(normalDivided.get(i)));
// CONNECTOR_ID is not fulfilled by rebalance
// keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
// keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
keyValue.put(ReplicatorConnectorConfig.SRC_REGION, connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
if (null != connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
}
keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLOUD));
keyValue.put(ReplicatorConnectorConfig.DEST_REGION, connectorConfig.getString(ReplicatorConnectorConfig.DEST_REGION));
keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
if (null != connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
}
keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC));
keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SYNC_TPS, connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS, ReplicatorConnectorConfig.DEFAULT_SYNC_TPS));
keyValue.put(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, connectorConfig.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 10 * 1000L));
configs.add(keyValue);
log.info("ReplicatorSourceConnector sub task config : " + keyValue);
}
// sort config's items for consistent rebalance
configs = ReplicatorUtils.sortList(configs, new Comparator<DefaultKeyValue>() {
@Override
public int compare(DefaultKeyValue o1, DefaultKeyValue o2) {
return buildCompareString(o1).compareTo(buildCompareString(o2));
}
});
return configs;
}