flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java [102:146]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            TableSchema schema,
            ZoneId localTimeZoneId,
            ElasticSearchBuilderProvider builderProvider) {
        this.format = format;
        this.schema = schema;
        this.config = config;
        this.localTimeZoneId = localTimeZoneId;
        this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
        this.builderProvider = builderProvider;
    }

    // --------------------------------------------------------------
    // End of hack to make configuration testing possible
    // --------------------------------------------------------------

    public boolean isDynamicIndexWithSystemTime() {
        IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper();
        return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : requestedMode.getContainedKinds()) {
            if (kind != RowKind.UPDATE_BEFORE) {
                builder.addContainedKind(kind);
            }
        }
        if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
            throw new ValidationException(
                    "Dynamic indexing based on system time only works on append only stream.");
        }
        return builder.build();
    }

    @Override
    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
        return () -> {
            SerializationSchema<RowData> format =
                    this.format.createRuntimeEncoder(context, schema.toRowDataType());

            final RowElasticsearchSinkFunction upsertFunction =
                    new RowElasticsearchSinkFunction(
                            IndexGeneratorFactory.createIndexGenerator(
                                    config.getIndex(), schema, localTimeZoneId),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java [97:141]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            TableSchema schema,
            ZoneId localTimeZoneId,
            ElasticSearchBuilderProvider builderProvider) {
        this.format = format;
        this.schema = schema;
        this.config = config;
        this.localTimeZoneId = localTimeZoneId;
        this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
        this.builderProvider = builderProvider;
    }

    // --------------------------------------------------------------
    // End of hack to make configuration testing possible
    // --------------------------------------------------------------

    public boolean isDynamicIndexWithSystemTime() {
        IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper();
        return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : requestedMode.getContainedKinds()) {
            if (kind != RowKind.UPDATE_BEFORE) {
                builder.addContainedKind(kind);
            }
        }
        if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
            throw new ValidationException(
                    "Dynamic indexing based on system time only works on append only stream.");
        }
        return builder.build();
    }

    @Override
    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
        return () -> {
            SerializationSchema<RowData> format =
                    this.format.createRuntimeEncoder(context, schema.toRowDataType());

            final RowElasticsearchSinkFunction upsertFunction =
                    new RowElasticsearchSinkFunction(
                            IndexGeneratorFactory.createIndexGenerator(
                                    config.getIndex(), schema, localTimeZoneId),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



