public DynamicTableSink createDynamicTableSink()

in flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java [76:96]


    public DynamicTableSink createDynamicTableSink(Context context) {
        List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
                getPrimaryKeyLogicalTypesWithIndex(context);

        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        EncodingFormat<SerializationSchema<RowData>> format =
                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);

        OpensearchConfiguration config = getConfiguration(helper);
        helper.validate();
        validateConfiguration(config);

        return new OpensearchDynamicSink(
                format,
                config,
                primaryKeyLogicalTypesWithIndex,
                context.getPhysicalRowDataType(),
                capitalize(FACTORY_IDENTIFIER),
                getLocalTimeZoneId(context.getConfiguration()));
    }