public ElasticsearchSinkBase()

in streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/sink/elasticsearch/elastic/ElasticsearchSinkBase.java [209:286]


  public ElasticsearchSinkBase(
      ElasticsearchApiCallBridge callBridge,
      Map<String, String> userConfig,
      ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
      ActionRequestFailureHandler failureHandler) {

    this.callBridge = checkNotNull(callBridge);
    this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
    this.failureHandler = checkNotNull(failureHandler);

    // we eagerly check if the user-provided sink function and failure handler is serializable;
    // otherwise, if they aren't serializable, users will merely get a non-informative error message
    // "ElasticsearchSinkBase is not serializable"

    checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
        "The implementation of the provided ElasticsearchSinkFunction is not serializable. "
            + "The object probably contains or references non-serializable fields.");

    checkArgument(InstantiationUtil.isSerializable(failureHandler),
        "The implementation of the provided ActionRequestFailureHandler is not serializable. "
            + "The object probably contains or references non-serializable fields.");

    // extract and remove bulk processor related configuration from the user-provided config,
    // so that the resulting user config only contains configuration related to the Elasticsearch client.

    checkNotNull(userConfig);

    ParameterTool params = ParameterTool.fromMap(userConfig);

    if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
      bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
      userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
    } else {
      bulkProcessorFlushMaxActions = null;
    }

    if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
      bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
      userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
    } else {
      bulkProcessorFlushMaxSizeMb = null;
    }

    if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
      bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
      userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
    } else {
      bulkProcessorFlushIntervalMillis = null;
    }

    boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
    userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);

    if (bulkProcessorFlushBackoffEnable) {
      this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();

      if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
        bulkProcessorFlushBackoffPolicy.setBackoffType(
            FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
      }

      if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
        bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
      }

      if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
        bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
      }

    } else {
      bulkProcessorFlushBackoffPolicy = null;
    }

    this.userConfig = userConfig;
  }