public void configure()

in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java [244:351]


  public void configure(Context context) {
    if (!isLocal) {
      if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
        serverAddresses = StringUtils.deleteWhitespace(
            context.getString(HOSTNAMES)).split(",");
      }
      Preconditions.checkState(serverAddresses != null
          && serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
    }

    if (StringUtils.isNotBlank(context.getString(INDEX_NAME))) {
      this.indexName = context.getString(INDEX_NAME);
    }

    if (StringUtils.isNotBlank(context.getString(INDEX_TYPE))) {
      this.indexType = context.getString(INDEX_TYPE);
    }

    if (StringUtils.isNotBlank(context.getString(CLUSTER_NAME))) {
      this.clusterName = context.getString(CLUSTER_NAME);
    }

    if (StringUtils.isNotBlank(context.getString(BATCH_SIZE))) {
      this.batchSize = Integer.parseInt(context.getString(BATCH_SIZE));
    }

    if (StringUtils.isNotBlank(context.getString(TTL))) {
      this.ttlMs = parseTTL(context.getString(TTL));
      Preconditions.checkState(ttlMs > 0, TTL
          + " must be greater than 0 or not set.");
    }

    if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) {
      clientType = context.getString(CLIENT_TYPE);
    }

    elasticSearchClientContext = new Context();
    elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));

    String serializerClazz = DEFAULT_SERIALIZER_CLASS;
    if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
      serializerClazz = context.getString(SERIALIZER);
    }

    Context serializerContext = new Context();
    serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX));

    try {
      @SuppressWarnings("unchecked")
      Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class
          .forName(serializerClazz);
      Configurable serializer = clazz.newInstance();

      if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
        indexRequestFactory
            = (ElasticSearchIndexRequestBuilderFactory) serializer;
        indexRequestFactory.configure(serializerContext);
      } else if (serializer instanceof ElasticSearchEventSerializer) {
        eventSerializer = (ElasticSearchEventSerializer) serializer;
        eventSerializer.configure(serializerContext);
      } else {
        throw new IllegalArgumentException(serializerClazz
            + " is not an ElasticSearchEventSerializer");
      }
    } catch (Exception e) {
      logger.error("Could not instantiate event serializer.", e);
      Throwables.propagate(e);
    }

    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }

    String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS;
    if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) {
      indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER);
    }

    Context indexnameBuilderContext = new Context();
    serializerContext.putAll(
            context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));

    try {
      @SuppressWarnings("unchecked")
      Class<? extends IndexNameBuilder> clazz
              = (Class<? extends IndexNameBuilder>) Class
              .forName(indexNameBuilderClass);
      indexNameBuilder = clazz.newInstance();
      indexnameBuilderContext.put(INDEX_NAME, indexName);
      indexNameBuilder.configure(indexnameBuilderContext);
    } catch (Exception e) {
      logger.error("Could not instantiate index name builder.", e);
      Throwables.propagate(e);
    }

    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }

    Preconditions.checkState(StringUtils.isNotBlank(indexName),
        "Missing Param:" + INDEX_NAME);
    Preconditions.checkState(StringUtils.isNotBlank(indexType),
        "Missing Param:" + INDEX_TYPE);
    Preconditions.checkState(StringUtils.isNotBlank(clusterName),
        "Missing Param:" + CLUSTER_NAME);
    Preconditions.checkState(batchSize >= 1, BATCH_SIZE
        + " must be greater than 0");
  }