public List taskConfigs()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java [169:233]


    public List<KeyValue> taskConfigs(int maxTasks) {
        // normal topic
        Map<String, String> topicTagMap = ReplicatorConnectorConfig.getSrcTopicTagMap(
                connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID),
                connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
        if (MapUtils.isEmpty(topicTagMap)) {
            throw new ConnectException("Source topics & tags config cannot be null, please check the config info");
        }
        List<String> topics = new LinkedList<>(topicTagMap.keySet());
        // todo rebalance 使用原生的;runtime & connector 都保存offset;
        // get queue
        curMessageQueues = fetchMessageQueues(topics);
        int taskNum;
        taskNum = Math.min(curMessageQueues.size(), maxTasks);
        log.info("messageQueue : " + curMessageQueues.size() + " " + curMessageQueues);
        // divide
        List<List<MessageQueue>> normalDivided = divide(new ArrayList<>(curMessageQueues), taskNum);
        log.info("normalDivided : " + normalDivided + " " + normalDivided);

        List<KeyValue> configs = new ArrayList<>();
        for (int i = 0; i < taskNum; i++) {
            KeyValue keyValue = new DefaultKeyValue();
            keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, JSON.toJSONString(normalDivided.get(i)));

            // CONNECTOR_ID is not fulfilled by rebalance
//            keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
//            keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
            keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
            keyValue.put(ReplicatorConnectorConfig.SRC_REGION, connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
            keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
            if (null != connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
                keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
            }
            keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
            keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
            keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
            keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
            keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
            keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLOUD));
            keyValue.put(ReplicatorConnectorConfig.DEST_REGION, connectorConfig.getString(ReplicatorConnectorConfig.DEST_REGION));
            keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
            if (null != connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
                keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
            }
            keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
            keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC));
            keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
            keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
            keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));

            keyValue.put(ReplicatorConnectorConfig.SYNC_TPS, connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS, ReplicatorConnectorConfig.DEFAULT_SYNC_TPS));
            keyValue.put(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, connectorConfig.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 10 * 1000L));

            configs.add(keyValue);
            log.info("ReplicatorSourceConnector sub task config : " + keyValue);
        }
        // sort config's items for consistent rebalance
        configs = ReplicatorUtils.sortList(configs, new Comparator<DefaultKeyValue>() {
            @Override
            public int compare(DefaultKeyValue o1, DefaultKeyValue o2) {
                return buildCompareString(o1).compareTo(buildCompareString(o2));
            }
        });
        return configs;
    }