in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [669:733]
public void start(KeyValue config) {
log.info("ReplicatorSourceTask init " + config);
log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
// build connectConfig
connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()) + 1);
connectorConfig.setConnectorId(sourceTaskContext.getConnectorName());
connectorConfig.setSrcCloud(config.getString(ReplicatorConnectorConfig.SRC_CLOUD));
connectorConfig.setSrcRegion(config.getString(ReplicatorConnectorConfig.SRC_REGION));
connectorConfig.setSrcCluster(config.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
connectorConfig.setSrcInstanceId(config.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
connectorConfig.setSrcEndpoint(config.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
connectorConfig.setSrcTopicTags(config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
connectorConfig.setDestCloud(config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
connectorConfig.setDestRegion(config.getString(ReplicatorConnectorConfig.DEST_REGION));
connectorConfig.setDestCluster(config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false")));
connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS));
connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES));
connectorConfig.setSrcAccessKey(config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY));
connectorConfig.setSrcSecretKey(config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY));
connectorConfig.setCommitOffsetIntervalMs(config.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 10 * 1000));
connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()));
if (connectorConfig.getConsumeFromWhere() == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
}
log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig);
try {
log.info("prepare init ....");
// get pull consumer group & create group
String srcClusterName = connectorConfig.getSrcCluster();
String pullConsumerGroup = connectorConfig.generateTaskIdWithIndexAsConsumerGroup();
buildMqAdminClient();
if (connectorConfig.isAutoCreateInnerConsumergroup()) {
createAndUpdatePullConsumerGroup(srcClusterName, pullConsumerGroup);
}
log.info("createAndUpdatePullConsumerGroup " + pullConsumerGroup + " finished.");
// init converter
// init pullConsumer
buildConsumer();
log.info("buildConsumer finished.");
// init limiter
tpsLimit = connectorConfig.getSyncTps();
log.info("RateLimiter init finished.");
// subscribe topic & start consumer
subscribeTopicAndStartConsumer();
// init sync delay metrics monitor
execScheduleTask();
log.info("RateLimiter init finished.");
log.info("QueueOffsetManager init finished.");
} catch (Exception e) {
log.error("start ReplicatorSourceTask error, please check connectorConfig.", e);
cleanResource();
throw new StartTaskException("Start replicator source task error, errMsg : " + e.getMessage(), e);
}
}