in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java [248:299]
public DynamicTableSink createDynamicTableSink(Context context) {
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
this, autoCompleteSchemaRegistrySubject(context));
final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
getKeyEncodingFormat(helper);
final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
getValueEncodingFormat(helper);
helper.validateExcept(PROPERTIES_PREFIX);
final ReadableConfig tableOptions = helper.getOptions();
final DeliveryGuarantee deliveryGuarantee = validateDeprecatedSemantic(tableOptions);
validateTableSinkOptions(tableOptions);
KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
validatePKConstraints(
context.getObjectIdentifier(),
context.getPrimaryKeyIndexes(),
context.getCatalogTable().getOptions(),
valueEncodingFormat);
final DataType physicalDataType = context.getPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
return createKafkaTableSink(
physicalDataType,
keyEncodingFormat.orElse(null),
valueEncodingFormat,
keyProjection,
valueProjection,
keyPrefix,
getTopics(tableOptions),
getTopicPattern(tableOptions),
getKafkaProperties(context.getCatalogTable().getOptions()),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
deliveryGuarantee,
parallelism,
tableOptions.get(TRANSACTIONAL_ID_PREFIX),
tableOptions.get(TRANSACTION_NAMING_STRATEGY));
}