in flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java [83:115]
public Elasticsearch8AsyncWriter(
ElementConverter<InputT, Operation> elementConverter,
WriterInitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
NetworkConfig networkConfig,
Collection<BufferedRequestState<Operation>> state) {
super(
elementConverter,
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
.setMaxInFlightRequests(maxInFlightRequests)
.setMaxBufferedRequests(maxBufferedRequests)
.setMaxTimeInBufferMS(maxTimeInBufferMS)
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
.build(),
state);
this.esClient = networkConfig.createEsClient();
final SinkWriterMetricGroup metricGroup = context.metricGroup();
checkNotNull(metricGroup);
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.numRecordsSendPartialFailureCounter =
metricGroup.counter("numRecordsSendPartialFailure");
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
}