in src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java [105:149]
public SourceReader<OUT, RocketMQSourceSplit> createReader(SourceReaderContext readerContext)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup().addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
new RocketMQSourceReaderMetrics(readerContext.metricGroup());
Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier =
() ->
new RocketMQSplitReader<>(
configuration,
readerContext,
deserializationSchema,
rocketMQSourceReaderMetrics);
RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
new RocketMQSourceFetcherManager(
elementsQueue, splitReaderSupplier, (ignore) -> {});
RocketMQRecordEmitter<OUT> recordEmitter =
new RocketMQRecordEmitter<>(deserializationSchema);
return new RocketMQSourceReader<>(
elementsQueue,
rocketmqSourceFetcherManager,
recordEmitter,
configuration,
readerContext,
rocketMQSourceReaderMetrics);
}