in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java [92:114]
public DynamicTableSink createDynamicTableSink(Context context) {
List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
getPrimaryKeyLogicalTypesWithIndex(context);
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
ElasticsearchConfiguration config = getConfiguration(helper);
helper.validate();
validateConfiguration(config);
return new ElasticsearchDynamicSink(
format,
config,
primaryKeyLogicalTypesWithIndex,
context.getPhysicalRowDataType(),
capitalize(factoryIdentifier),
sinkBuilderSupplier,
getDocumentType(config),
getLocalTimeZoneId(context.getConfiguration()));
}