in flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java [227:315]
private OpensearchSink(
Map<String, String> userConfig,
List<HttpHost> httpHosts,
OpensearchSinkFunction<T> opensearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory) {
checkArgument(httpHosts != null && !httpHosts.isEmpty());
this.httpHosts = httpHosts;
this.restClientFactory = checkNotNull(restClientFactory);
this.opensearchSinkFunction = checkNotNull(opensearchSinkFunction);
this.failureHandler = checkNotNull(failureHandler);
// we eagerly check if the user-provided sink function and failure handler is serializable;
// otherwise, if they aren't serializable, users will merely get a non-informative error
// message
// "OpensearchSink is not serializable"
checkArgument(
InstantiationUtil.isSerializable(opensearchSinkFunction),
"The implementation of the provided OpensearchSinkFunction is not serializable. "
+ "The object probably contains or references non-serializable fields.");
checkArgument(
InstantiationUtil.isSerializable(failureHandler),
"The implementation of the provided ActionRequestFailureHandler is not serializable. "
+ "The object probably contains or references non-serializable fields.");
// extract and remove bulk processor related configuration from the user-provided config,
// so that the resulting user config only contains configuration related to the
// Opensearch client.
checkNotNull(userConfig);
// copy config so we can remove entries without side-effects
userConfig = new HashMap<>(userConfig);
ParameterTool params = ParameterTool.fromMap(userConfig);
if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
} else {
bulkProcessorFlushMaxActions = null;
}
if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
} else {
bulkProcessorFlushMaxSizeMb = null;
}
if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
} else {
bulkProcessorFlushIntervalMillis = null;
}
boolean bulkProcessorFlushBackoffEnable =
params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
if (bulkProcessorFlushBackoffEnable) {
this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
bulkProcessorFlushBackoffPolicy.setBackoffType(
FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
}
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
}
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
bulkProcessorFlushBackoffPolicy.setDelayMillis(
params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
}
} else {
bulkProcessorFlushBackoffPolicy = null;
}
this.userConfig = userConfig;
}