public DynamicTableSink createDynamicTableSink()

in flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java [83:106]


    public DynamicTableSink createDynamicTableSink(Context context) {
        TableSchema tableSchema = context.getCatalogTable().getSchema();
        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);

        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);

        final EncodingFormat<SerializationSchema<RowData>> format =
                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);

        helper.validate();
        Configuration configuration = new Configuration();
        context.getCatalogTable().getOptions().forEach(configuration::setString);
        Elasticsearch7Configuration config =
                new Elasticsearch7Configuration(configuration, context.getClassLoader());

        validate(config, configuration);

        return new Elasticsearch7DynamicSink(
                format,
                config,
                TableSchemaUtils.getPhysicalSchema(tableSchema),
                getLocalTimeZoneId(context.getConfiguration()));
    }