in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/utils/ElasticsearchJestSink.java [106:136]
public void open(Configuration configuration) {
ParameterTool params = ParameterTool.fromMap(userConfig);
final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, params.getRequired("region"), ES_SERVICE_NAME, clock);
final AWSSigningRequestInterceptor requestInterceptor = new AWSSigningRequestInterceptor(awsSigner);
final JestClientFactory factory = new JestClientFactory() {
@Override
protected HttpClientBuilder configureHttpClient(HttpClientBuilder builder) {
builder.addInterceptorLast(requestInterceptor);
return builder;
}
@Override
protected HttpAsyncClientBuilder configureHttpClient(HttpAsyncClientBuilder builder) {
builder.addInterceptorLast(requestInterceptor);
return builder;
}
};
factory.setHttpClientConfig(new HttpClientConfig
.Builder(params.getRequired("es-endpoint"))
.multiThreaded(true)
.build());
jestClient = factory.getObject();
documentBuffer = new ArrayList<>(batchSize);
}