in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [148:248]
public void open(Configuration parameters) throws Exception {
log.debug("source open....");
Validate.notEmpty(props, "Consumer properties can not be empty");
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.enableCheckpoint =
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
if (checkPointLock == null) {
checkPointLock = new ReentrantLock();
}
if (waterMarkPerQueue == null) {
waterMarkPerQueue = new WaterMarkPerQueue(5000);
}
if (waterMarkForAll == null) {
waterMarkForAll = new WaterMarkForAll(5000);
}
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
runningChecker = new RunningChecker();
runningChecker.setRunning(true);
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("rmq-pull-thread-%d")
.build();
executor = Executors.newCachedThreadPool(threadFactory);
int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
consumer = new DefaultLitePullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
RocketMQConfig.buildConsumerConfigs(props, consumer);
// set unique instance name, avoid exception:
// https://help.aliyun.com/document_detail/29646.html
String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
String instanceName =
RocketMQUtils.getInstanceName(
runtimeName,
topic,
group,
String.valueOf(indexOfThisSubTask),
String.valueOf(System.nanoTime()));
consumer.setInstanceName(instanceName);
consumer.start();
Counter outputCounter =
getRuntimeContext()
.getMetricGroup()
.counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
tpsMetric =
getRuntimeContext()
.getMetricGroup()
.meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
getRuntimeContext()
.getMetricGroup()
.gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
getRuntimeContext()
.getMetricGroup()
.gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
final RuntimeContext ctx = getRuntimeContext();
// The lock that guarantees that record emission and state updates are atomic,
// from the view of taking a checkpoint.
int taskNumber = ctx.getNumberOfParallelSubtasks();
int taskIndex = ctx.getIndexOfThisSubtask();
log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
messageQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
if (restored) {
initOffsetTableFromRestoredOffsets(messageQueues);
} else {
initOffsets(messageQueues);
}
}