in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [250:358]
public void run(SourceContext context) throws Exception {
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
String tag =
props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
int pullBatchSize =
RocketMQUtils.getInteger(
props,
RocketMQConfig.CONSUMER_BATCH_SIZE,
RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
timer.scheduleAtFixedRate(
() -> {
// context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
context.emitWatermark(waterMarkForAll.getCurrentWatermark());
},
5,
5,
TimeUnit.SECONDS);
if (StringUtils.isEmpty(sql)) {
consumer.subscribe(topic, tag);
} else {
// pull with sql do not support block pull.
consumer.subscribe(topic, MessageSelector.bySql(sql));
}
for (MessageQueue mq : messageQueues) {
this.executor.execute(
() ->
RetryUtil.call(
() -> {
while (runningChecker.isRunning()) {
try {
Long offset = offsetTable.get(mq);
consumer.setPullBatchSize(pullBatchSize);
consumer.seek(mq, offset);
boolean found = false;
List<MessageExt> messages =
consumer.poll(
RocketMQUtils.getInteger(
props,
RocketMQConfig
.CONSUMER_TIMEOUT,
RocketMQConfig
.DEFAULT_CONSUMER_TIMEOUT));
if (CollectionUtils.isNotEmpty(messages)) {
long fetchTime = System.currentTimeMillis();
for (MessageExt msg : messages) {
byte[] key =
msg.getKeys() != null
? msg.getKeys()
.getBytes(
StandardCharsets
.UTF_8)
: null;
byte[] value = msg.getBody();
OUT data =
schema.deserializeKeyAndValue(
key, value);
// output and state update are atomic
synchronized (checkPointLock) {
log.debug(
msg.getMsgId()
+ "_"
+ msg.getBrokerName()
+ " "
+ msg.getQueueId()
+ " "
+ msg.getQueueOffset());
context.collectWithTimestamp(
data, msg.getBornTimestamp());
long emitTime =
System.currentTimeMillis();
// update max eventTime per queue
// waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
waterMarkForAll.extractTimestamp(
msg.getBornTimestamp());
tpsMetric.markEvent();
long eventTime =
msg.getStoreTimestamp();
fetchDelay.report(
Math.abs(
fetchTime - eventTime));
emitDelay.report(
Math.abs(emitTime - eventTime));
}
}
found = true;
}
synchronized (checkPointLock) {
updateMessageQueueOffset(
mq, consumer.committed(mq));
}
if (!found) {
RetryUtil.waitForMs(
RocketMQConfig
.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return true;
},
"RuntimeException",
runningChecker));
}
awaitTermination();
}