public void start()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java [174:238]


    public void start(KeyValue config) {
        log.info("ReplicatorHeartbeatTask init " + config);
        log.info("sourceTaskContextConfigs : " + sourceTaskContext.configs());
        // build connectConfig
        connectorConfig.setTaskId(sourceTaskContext.getTaskName().substring(sourceTaskContext.getConnectorName().length()));
        connectorConfig.setConnectorId(sourceTaskContext.getConnectorName());
        connectorConfig.setSrcCloud(config.getString(connectorConfig.SRC_CLOUD));
        connectorConfig.setSrcRegion(config.getString(connectorConfig.SRC_REGION));
        connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
        connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
        connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
        connectorConfig.setSrcTopicTags(config.getString(connectorConfig.SRC_TOPICTAGS));
        connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
        connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
        connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
        connectorConfig.setDestInstanceId(config.getString(connectorConfig.DEST_INSTANCEID));
        connectorConfig.setDestEndpoint(config.getString(connectorConfig.DEST_ENDPOINT));
        connectorConfig.setDestTopic(config.getString(connectorConfig.DEST_TOPIC));

        connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
        connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));

        connectorConfig.setHeartbeatIntervalMs(config.getInt(connectorConfig.HEARTBEAT_INTERVALS_MS, connectorConfig.getHeartbeatIntervalMs()));
        connectorConfig.setHeartbeatTopic(config.getString(connectorConfig.HEARTBEAT_TOPIC, connectorConfig.DEFAULT_HEARTBEAT_TOPIC));

        log.info("ReplicatorHeartbeatTask connectorConfig : " + connectorConfig);

        try {
            // init consumer group
            String destClusterName = connectorConfig.getDestCluster();
            createAndUpdatePullConsumerGroup(destClusterName, consumeGroup);
            // build producer
            reBuildProducer();
            // build consumer
            reBuildConsumer();
            // start schedule task send msg to src heartbeat topic;
            executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, ReplicatorHeartbeatTask.class.getName() + "-producer");
                }
            });
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("heartbeat prepare send message.");
                        Message message = new Message(connectorConfig.getHeartbeatTopic(), "ping".getBytes("UTF-8"));
                        message.putUserProperty("src", connectorConfig.getSrcCloud());
                        message.putUserProperty("dest", connectorConfig.getDestCloud());
                        message.putUserProperty("heartbeatStartAt", System.currentTimeMillis() + "");
                        SendResult sendResult = producer.send(message);
                        producerLastSendOk = System.currentTimeMillis();
                        log.info("heartbeat send message ok");
                    } catch (Exception e) {
                        log.error("heartbeat producer to src " + connectorConfig.getHeartbeatTopic() + " error,", e);
                    }
                }
            }, connectorConfig.getHeartbeatIntervalMs(), connectorConfig.getHeartbeatIntervalMs(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            cleanResource();
            log.error("start ReplicatorHeartbeatTask error,", e);
            throw new StartTaskException("Start Replicator heartbeat task error, errMsg : " + e.getMessage(), e);
        }
    }