public void open()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [148:248]


    public void open(Configuration parameters) throws Exception {
        log.debug("source open....");
        Validate.notEmpty(props, "Consumer properties can not be empty");

        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);

        Validate.notEmpty(topic, "Consumer topic can not be empty");
        Validate.notEmpty(group, "Consumer group can not be empty");

        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
        Validate.isTrue(
                !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
                "Consumer tag and sql can not set value at the same time");

        this.enableCheckpoint =
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();

        if (offsetTable == null) {
            offsetTable = new ConcurrentHashMap<>();
        }
        if (restoredOffsets == null) {
            restoredOffsets = new ConcurrentHashMap<>();
        }
        if (pendingOffsetsToCommit == null) {
            pendingOffsetsToCommit = new LinkedMap();
        }
        if (checkPointLock == null) {
            checkPointLock = new ReentrantLock();
        }
        if (waterMarkPerQueue == null) {
            waterMarkPerQueue = new WaterMarkPerQueue(5000);
        }
        if (waterMarkForAll == null) {
            waterMarkForAll = new WaterMarkForAll(5000);
        }
        if (timer == null) {
            timer = Executors.newSingleThreadScheduledExecutor();
        }

        runningChecker = new RunningChecker();
        runningChecker.setRunning(true);

        final ThreadFactory threadFactory =
                new ThreadFactoryBuilder()
                        .setDaemon(true)
                        .setNameFormat("rmq-pull-thread-%d")
                        .build();
        executor = Executors.newCachedThreadPool(threadFactory);

        int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
        consumer = new DefaultLitePullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
        RocketMQConfig.buildConsumerConfigs(props, consumer);

        // set unique instance name, avoid exception:
        // https://help.aliyun.com/document_detail/29646.html
        String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
        String instanceName =
                RocketMQUtils.getInstanceName(
                        runtimeName,
                        topic,
                        group,
                        String.valueOf(indexOfThisSubTask),
                        String.valueOf(System.nanoTime()));
        consumer.setInstanceName(instanceName);
        consumer.start();

        Counter outputCounter =
                getRuntimeContext()
                        .getMetricGroup()
                        .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
        tpsMetric =
                getRuntimeContext()
                        .getMetricGroup()
                        .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));

        getRuntimeContext()
                .getMetricGroup()
                .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
        getRuntimeContext()
                .getMetricGroup()
                .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);

        final RuntimeContext ctx = getRuntimeContext();
        // The lock that guarantees that record emission and state updates are atomic,
        // from the view of taking a checkpoint.
        int taskNumber = ctx.getNumberOfParallelSubtasks();
        int taskIndex = ctx.getIndexOfThisSubtask();
        log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
        Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
        messageQueues =
                RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
        // If the job recovers from the state, the state has already contained the offsets of last
        // commit.
        if (restored) {
            initOffsetTableFromRestoredOffsets(messageQueues);
        } else {
            initOffsets(messageQueues);
        }
    }