public void start()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [669:733]


    public void start(KeyValue config) {
        log.info("ReplicatorSourceTask init " + config);
        log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
        // build connectConfig
        connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()) + 1);
        connectorConfig.setConnectorId(sourceTaskContext.getConnectorName());
        connectorConfig.setSrcCloud(config.getString(ReplicatorConnectorConfig.SRC_CLOUD));
        connectorConfig.setSrcRegion(config.getString(ReplicatorConnectorConfig.SRC_REGION));
        connectorConfig.setSrcCluster(config.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
        connectorConfig.setSrcInstanceId(config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
        connectorConfig.setSrcEndpoint(config.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
        connectorConfig.setSrcTopicTags(config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
        connectorConfig.setDestCloud(config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
        connectorConfig.setDestRegion(config.getString(ReplicatorConnectorConfig.DEST_REGION));
        connectorConfig.setDestCluster(config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
        connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
        connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
        connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
        connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
        connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));
        connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false")));

        connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS));
        connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES));
        connectorConfig.setSrcAccessKey(config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY));
        connectorConfig.setSrcSecretKey(config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY));

        connectorConfig.setCommitOffsetIntervalMs(config.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 10 * 1000));

        connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()));
        if (connectorConfig.getConsumeFromWhere() == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
            connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
        }
        log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig);

        try {
            log.info("prepare init ....");
            // get pull consumer group & create group
            String srcClusterName = connectorConfig.getSrcCluster();
            String pullConsumerGroup = connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
            buildMqAdminClient();
            if (connectorConfig.isAutoCreateInnerConsumergroup()) {
                createAndUpdatePullConsumerGroup(srcClusterName, pullConsumerGroup);
            }
            log.info("createAndUpdatePullConsumerGroup " + pullConsumerGroup + " finished.");
            // init converter
            // init pullConsumer
            buildConsumer();
            log.info("buildConsumer finished.");
            // init limiter
            tpsLimit = connectorConfig.getSyncTps();
            log.info("RateLimiter init finished.");
            // subscribe topic & start consumer
            subscribeTopicAndStartConsumer();
            // init sync delay metrics monitor
            execScheduleTask();
            log.info("RateLimiter init finished.");
            log.info("QueueOffsetManager init finished.");
        } catch (Exception e) {
            log.error("start ReplicatorSourceTask error, please check connectorConfig.", e);
            cleanResource();
            throw new StartTaskException("Start replicator source task error, errMsg : " + e.getMessage(), e);
        }

    }