in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java [174:238]
public void start(KeyValue config) {
log.info("ReplicatorHeartbeatTask init " + config);
log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
// build connectConfig
connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()));
connectorConfig.setConnectorId(sourceTaskContext.getConnectorName());
connectorConfig.setSrcCloud(config.getString(connectorConfig.SRC_CLOUD));
connectorConfig.setSrcRegion(config.getString(connectorConfig.SRC_REGION));
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.SRC_TOPICTAGS));
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
connectorConfig.setDestInstanceId(config.getString(connectorConfig.DEST_INSTANCEID));
connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT));
connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC));
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));
connectorConfig.setHeartbeatIntervalMs(config.getInt(connectorConfig.HEARTBEAT_INTERVALS_MS, connectorConfig.getHeartbeatIntervalMs()));
connectorConfig.setHeartbeatTopic(config.getString(connectorConfig.HEARTBEAT_TOPIC, connectorConfig.DEFAULT_HEARTBEAT_TOPIC));
log.info("ReplicatorHeartbeatTask connectorConfig : " + connectorConfig);
try {
// init consumer group
String destClusterName = connectorConfig.getDestCluster();
createAndUpdatePullConsumerGroup(destClusterName, consumeGroup);
// build producer
reBuildProducer();
// build consumer
reBuildConsumer();
// start schedule task send msg to src heartbeat topic;
executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, ReplicatorHeartbeatTask.class.getName() + "-producer");
}
});
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("heartbeat prepare send message.");
Message message = new Message(connectorConfig.getHeartbeatTopic(), "ping".getBytes("UTF-8"));
message.putUserProperty("src", connectorConfig.getSrcCloud());
message.putUserProperty("dest", connectorConfig.getDestCloud());
message.putUserProperty("heartbeatStartAt", System.currentTimeMillis() + "");
SendResult sendResult = producer.send(message);
producerLastSendOk = System.currentTimeMillis();
log.info("heartbeat send message ok");
} catch (Exception e) {
log.error("heartbeat producer to src " + connectorConfig.getHeartbeatTopic() + " error,", e);
}
}
}, connectorConfig.getHeartbeatIntervalMs(), connectorConfig.getHeartbeatIntervalMs(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
cleanResource();
log.error("start ReplicatorHeartbeatTask error,", e);
throw new StartTaskException("Start Replicator heartbeat task error, errMsg : " + e.getMessage(), e);
}
}