in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java [176:235]
public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
getKeyDecodingFormat(helper);
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
getValueDecodingFormat(helper);
helper.validateExcept(PROPERTIES_PREFIX);
final ReadableConfig tableOptions = helper.getOptions();
validateTableSourceOptions(tableOptions);
validatePKConstraints(
context.getObjectIdentifier(),
context.getPrimaryKeyIndexes(),
context.getCatalogTable().getOptions(),
valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// add topic-partition discovery
final Duration partitionDiscoveryInterval =
tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
properties.setProperty(
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
Long.toString(partitionDiscoveryInterval.toMillis()));
final DataType physicalDataType = context.getPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
getSourceTopics(tableOptions),
getSourceTopicPattern(tableOptions),
properties,
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
boundedOptions.boundedMode,
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
context.getObjectIdentifier().asSummaryString());
}