in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java [132:179]
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();
DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
// Validate the option data type.
helper.validateExcept(PROPERTIES_PREFIX);
validateSource(
tableOptions,
keyDecodingFormat,
valueDecodingFormat,
context.getPrimaryKeyIndexes());
Tuple2<int[], int[]> keyValueProjections =
createKeyValueProjections(context.getCatalogTable());
String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST;
final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
Integer parallelism = tableOptions.get(SCAN_PARALLELISM);
return new KafkaDynamicSource(
context.getPhysicalRowDataType(),
keyDecodingFormat,
new DecodingFormatWrapper(valueDecodingFormat),
keyValueProjections.f0,
keyValueProjections.f1,
keyPrefix,
getTopics(tableOptions),
getTopicPattern(tableOptions),
properties,
earliest,
Collections.emptyMap(),
0,
boundedOptions.boundedMode,
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
true,
context.getObjectIdentifier().asSummaryString(),
parallelism);
}