public void open()

in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/utils/ElasticsearchJestSink.java [106:136]


  public void open(Configuration configuration) {
    ParameterTool params = ParameterTool.fromMap(userConfig);

    final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
    final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
    final AWSSigner awsSigner = new AWSSigner(credentialsProvider, params.getRequired("region"), ES_SERVICE_NAME, clock);

    final AWSSigningRequestInterceptor requestInterceptor = new AWSSigningRequestInterceptor(awsSigner);

    final JestClientFactory factory = new JestClientFactory() {
      @Override
      protected HttpClientBuilder configureHttpClient(HttpClientBuilder builder) {
        builder.addInterceptorLast(requestInterceptor);
        return builder;
      }

      @Override
      protected HttpAsyncClientBuilder configureHttpClient(HttpAsyncClientBuilder builder) {
        builder.addInterceptorLast(requestInterceptor);
        return builder;
      }
    };

    factory.setHttpClientConfig(new HttpClientConfig
        .Builder(params.getRequired("es-endpoint"))
        .multiThreaded(true)
        .build());

    jestClient = factory.getObject();
    documentBuffer = new ArrayList<>(batchSize);
  }