in flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java [84:106]
public DynamicTableSink createDynamicTableSink(Context context) {
TableSchema tableSchema = context.getCatalogTable().getSchema();
ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
helper.validate();
Configuration configuration = new Configuration();
context.getCatalogTable().getOptions().forEach(configuration::setString);
Elasticsearch6Configuration config =
new Elasticsearch6Configuration(configuration, context.getClassLoader());
validate(config, configuration);
return new Elasticsearch6DynamicSink(
format,
config,
TableSchemaUtils.getPhysicalSchema(tableSchema),
getLocalTimeZoneId(context.getConfiguration()));
}