public SinkFunctionProvider getSinkRuntimeProvider()

in flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java [138:189]


    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
        return () -> {
            SerializationSchema<RowData> format =
                    this.format.createRuntimeEncoder(context, schema.toRowDataType());

            final RowElasticsearchSinkFunction upsertFunction =
                    new RowElasticsearchSinkFunction(
                            IndexGeneratorFactory.createIndexGenerator(
                                    config.getIndex(), schema, localTimeZoneId),
                            config.getDocumentType(),
                            format,
                            XContentType.JSON,
                            REQUEST_FACTORY,
                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));

            final ElasticsearchSink.Builder<RowData> builder =
                    builderProvider.createBuilder(config.getHosts(), upsertFunction);

            builder.setFailureHandler(config.getFailureHandler());
            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
            builder.setBulkFlushInterval(config.getBulkFlushInterval());
            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);

            // we must overwrite the default factory which is defined with a lambda because of a bug
            // in shading lambda serialization shading see FLINK-18006
            if (config.getUsername().isPresent()
                    && config.getPassword().isPresent()
                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
                builder.setRestClientFactory(
                        new AuthRestClientFactory(
                                config.getPathPrefix().orElse(null),
                                config.getUsername().get(),
                                config.getPassword().get()));
            } else {
                builder.setRestClientFactory(
                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
            }

            final ElasticsearchSink<RowData> sink = builder.build();

            if (config.isDisableFlushOnCheckpoint()) {
                sink.disableFlushOnCheckpoint();
            }

            return sink;
        };
    }