public void run()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [250:358]


    public void run(SourceContext context) throws Exception {
        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
        String tag =
                props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
        int pullBatchSize =
                RocketMQUtils.getInteger(
                        props,
                        RocketMQConfig.CONSUMER_BATCH_SIZE,
                        RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
        timer.scheduleAtFixedRate(
                () -> {
                    // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
                    context.emitWatermark(waterMarkForAll.getCurrentWatermark());
                },
                5,
                5,
                TimeUnit.SECONDS);
        if (StringUtils.isEmpty(sql)) {
            consumer.subscribe(topic, tag);
        } else {
            // pull with sql do not support block pull.
            consumer.subscribe(topic, MessageSelector.bySql(sql));
        }
        for (MessageQueue mq : messageQueues) {
            this.executor.execute(
                    () ->
                            RetryUtil.call(
                                    () -> {
                                        while (runningChecker.isRunning()) {
                                            try {
                                                Long offset = offsetTable.get(mq);
                                                consumer.setPullBatchSize(pullBatchSize);
                                                consumer.seek(mq, offset);
                                                boolean found = false;
                                                List<MessageExt> messages =
                                                        consumer.poll(
                                                                RocketMQUtils.getInteger(
                                                                        props,
                                                                        RocketMQConfig
                                                                                .CONSUMER_TIMEOUT,
                                                                        RocketMQConfig
                                                                                .DEFAULT_CONSUMER_TIMEOUT));
                                                if (CollectionUtils.isNotEmpty(messages)) {
                                                    long fetchTime = System.currentTimeMillis();
                                                    for (MessageExt msg : messages) {
                                                        byte[] key =
                                                                msg.getKeys() != null
                                                                        ? msg.getKeys()
                                                                                .getBytes(
                                                                                        StandardCharsets
                                                                                                .UTF_8)
                                                                        : null;
                                                        byte[] value = msg.getBody();
                                                        OUT data =
                                                                schema.deserializeKeyAndValue(
                                                                        key, value);

                                                        // output and state update are atomic
                                                        synchronized (checkPointLock) {
                                                            log.debug(
                                                                    msg.getMsgId()
                                                                            + "_"
                                                                            + msg.getBrokerName()
                                                                            + " "
                                                                            + msg.getQueueId()
                                                                            + " "
                                                                            + msg.getQueueOffset());
                                                            context.collectWithTimestamp(
                                                                    data, msg.getBornTimestamp());
                                                            long emitTime =
                                                                    System.currentTimeMillis();
                                                            // update max eventTime per queue
                                                            // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
                                                            waterMarkForAll.extractTimestamp(
                                                                    msg.getBornTimestamp());
                                                            tpsMetric.markEvent();
                                                            long eventTime =
                                                                    msg.getStoreTimestamp();
                                                            fetchDelay.report(
                                                                    Math.abs(
                                                                            fetchTime - eventTime));
                                                            emitDelay.report(
                                                                    Math.abs(emitTime - eventTime));
                                                        }
                                                    }
                                                    found = true;
                                                }
                                                synchronized (checkPointLock) {
                                                    updateMessageQueueOffset(
                                                            mq, consumer.committed(mq));
                                                }

                                                if (!found) {
                                                    RetryUtil.waitForMs(
                                                            RocketMQConfig
                                                                    .DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
                                                }
                                            } catch (Exception e) {
                                                throw new RuntimeException(e);
                                            }
                                        }
                                        return true;
                                    },
                                    "RuntimeException",
                                    runningChecker));
        }

        awaitTermination();
    }