public SinkRuntimeProvider getSinkRuntimeProvider()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java [120:178]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        SerializationSchema<RowData> format =
                this.format.createRuntimeEncoder(context, physicalRowDataType);

        final RowElasticsearchEmitter rowElasticsearchEmitter =
                new RowElasticsearchEmitter(
                        createIndexGenerator(),
                        format,
                        XContentType.JSON,
                        documentType,
                        createKeyExtractor());

        ElasticsearchSinkBuilderBase<RowData, ? extends ElasticsearchSinkBuilderBase> builder =
                builderSupplier.get();
        builder.setEmitter(rowElasticsearchEmitter);
        builder.setHosts(config.getHosts().toArray(new HttpHost[0]));
        builder.setDeliveryGuarantee(config.getDeliveryGuarantee());
        builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
        builder.setBulkFlushMaxSizeMb(config.getBulkFlushMaxByteSize().getMebiBytes());
        builder.setBulkFlushInterval(config.getBulkFlushInterval());

        if (config.getBulkFlushBackoffType().isPresent()) {
            FlushBackoffType backoffType = config.getBulkFlushBackoffType().get();
            int backoffMaxRetries = config.getBulkFlushBackoffRetries().get();
            long backoffDelayMs = config.getBulkFlushBackoffDelay().get();

            builder.setBulkFlushBackoffStrategy(backoffType, backoffMaxRetries, backoffDelayMs);
        }

        if (config.getUsername().isPresent()
                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
            builder.setConnectionUsername(config.getUsername().get());
        }

        if (config.getPassword().isPresent()
                && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
            builder.setConnectionPassword(config.getPassword().get());
        }

        if (config.getPathPrefix().isPresent()
                && !StringUtils.isNullOrWhitespaceOnly(config.getPathPrefix().get())) {
            builder.setConnectionPathPrefix(config.getPathPrefix().get());
        }

        if (config.getConnectionRequestTimeout().isPresent()) {
            builder.setConnectionRequestTimeout(
                    (int) config.getConnectionRequestTimeout().get().getSeconds());
        }

        if (config.getConnectionTimeout().isPresent()) {
            builder.setConnectionTimeout((int) config.getConnectionTimeout().get().getSeconds());
        }

        if (config.getSocketTimeout().isPresent()) {
            builder.setSocketTimeout((int) config.getSocketTimeout().get().getSeconds());
        }

        return SinkV2Provider.of(builder.build(), config.getParallelism().orElse(null));
    }