public DynamicTableSink createDynamicTableSink()

in flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java [84:106]


    public DynamicTableSink createDynamicTableSink(Context context) {
        TableSchema tableSchema = context.getCatalogTable().getSchema();
        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);

        final EncodingFormat<SerializationSchema<RowData>> format =
                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);

        helper.validate();
        Configuration configuration = new Configuration();
        context.getCatalogTable().getOptions().forEach(configuration::setString);
        Elasticsearch6Configuration config =
                new Elasticsearch6Configuration(configuration, context.getClassLoader());

        validate(config, configuration);

        return new Elasticsearch6DynamicSink(
                format,
                config,
                TableSchemaUtils.getPhysicalSchema(tableSchema),
                getLocalTimeZoneId(context.getConfiguration()));
    }