in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java [172:223]
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
this, autoCompleteSchemaRegistrySubject(context));
final ReadableConfig tableOptions = helper.getOptions();
EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
// Validate the option data type.
helper.validateExcept(PROPERTIES_PREFIX);
validateSink(
tableOptions,
keyEncodingFormat,
valueEncodingFormat,
context.getPrimaryKeyIndexes());
KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
Tuple2<int[], int[]> keyValueProjections =
createKeyValueProjections(context.getCatalogTable());
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
Integer parallelism = tableOptions.get(SINK_PARALLELISM);
int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
SinkBufferFlushMode flushMode =
new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
// it will use hash partition if key is set else in round-robin behaviour.
return new KafkaDynamicSink(
context.getPhysicalRowDataType(),
context.getPhysicalRowDataType(),
keyEncodingFormat,
new EncodingFormatWrapper(valueEncodingFormat),
keyValueProjections.f0,
keyValueProjections.f1,
keyPrefix,
tableOptions.get(TOPIC).get(0),
properties,
null,
tableOptions.get(DELIVERY_GUARANTEE),
true,
flushMode,
parallelism,
tableOptions.get(TRANSACTIONAL_ID_PREFIX));
}