in flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java [76:96]
public DynamicTableSink createDynamicTableSink(Context context) {
List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
getPrimaryKeyLogicalTypesWithIndex(context);
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
OpensearchConfiguration config = getConfiguration(helper);
helper.validate();
validateConfiguration(config);
return new OpensearchDynamicSink(
format,
config,
primaryKeyLogicalTypesWithIndex,
context.getPhysicalRowDataType(),
capitalize(FACTORY_IDENTIFIER),
getLocalTimeZoneId(context.getConfiguration()));
}