in src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java [105:151]
public SourceReader<OUT, RocketMQSourceSplit> createReader(SourceReaderContext readerContext)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup().addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
new RocketMQSourceReaderMetrics(readerContext.metricGroup());
// unique reader
RocketMQSplitReader<OUT> reader =
new RocketMQSplitReader<>(
configuration,
readerContext,
deserializationSchema,
rocketMQSourceReaderMetrics);
Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier = () -> reader;
RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
new RocketMQSourceFetcherManager(
elementsQueue, splitReaderSupplier, (ignore) -> {});
RocketMQRecordEmitter<OUT> recordEmitter =
new RocketMQRecordEmitter<>(deserializationSchema);
return new RocketMQSourceReader<>(
elementsQueue,
rocketmqSourceFetcherManager,
recordEmitter,
configuration,
readerContext,
rocketMQSourceReaderMetrics,
splitReaderSupplier);
}