public DynamicTableSink createDynamicTableSink()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java [110:132]


    public DynamicTableSink createDynamicTableSink(Context context) {
        List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
                getPrimaryKeyLogicalTypesWithIndex(context);

        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        EncodingFormat<SerializationSchema<RowData>> format =
                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);

        ElasticsearchConfiguration config = getConfiguration(helper);
        helper.validate();
        validateConfiguration(config);

        return new ElasticsearchDynamicSink(
                format,
                config,
                primaryKeyLogicalTypesWithIndex,
                context.getPhysicalRowDataType(),
                capitalize(factoryIdentifier),
                sinkBuilderSupplier,
                getDocumentType(config),
                getLocalTimeZoneId(context.getConfiguration()));
    }