in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java [243:280]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final KafkaSource<RowData> kafkaSource =
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
DataStreamSource<RowData> sourceStream =
execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
}
@Override
public boolean isBounded() {
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
}
@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}
};
}