public void flush()

in java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java [298:428]


    public void flush() {
        List<RetryableBulkOperation<Context>> sentRequests = new ArrayList<>();
        RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
            () -> {
                // May happen on manual and periodic flushes
                return !operations.isEmpty() && operations.stream()
                    .anyMatch(RetryableBulkOperation::isSendable);
            },
            () -> {
                // Selecting operations that can be sent immediately,
                // Dividing actual operations from contexts
                List<BulkOperation> immediateOps = new ArrayList<>();
                List<Context> contexts = new ArrayList<>();

                for(Iterator<RetryableBulkOperation<Context>> it = operations.iterator(); it.hasNext();){
                    RetryableBulkOperation<Context> op = it.next();
                    if (op.isSendable()) {
                        immediateOps.add(op.operation());
                        contexts.add(op.context());

                        sentRequests.add(op);
                        it.remove();
                    }
                }

                // Build the request
                BulkRequest request = newRequest().operations(immediateOps).build();

                // Prepare for next round
                currentSize = operations.size();
                addCondition.signalIfReady();

                long id = sendRequestCondition.invocations();

                if (listener != null) {
                    // synchronous execution to make sure it actually runs before
                    listener.beforeBulk(id, request, contexts);
                }

                CompletionStage<BulkResponse> result = client.bulk(request);
                requestsInFlightCount++;

                if (listener == null) {
                    // No need to keep the request around, it can be GC'ed
                    request = null;
                }

                return new RequestExecution<>(id, request, contexts, result);
            });

        if (exec != null) {
            // A request was actually sent
            exec.futureResponse.handle((resp, thr) -> {
                if (resp != null) {

                    // Success? Checking if total or partial
                    List<BulkResponseItem> failedRequestsCanRetry = resp.items().stream()
                        .filter(i -> i.error() != null && i.status() == 429)
                        .collect(Collectors.toList());

                    if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) {
                        // Total success! ...or there's no retry policy implemented. Either way, can call
                        listenerAfterBulkSuccess(resp, exec);
                    } else {
                        // Partial success, retrying failed requests if policy allows it
                        // Keeping list of retryable requests/responses, to exclude them for calling
                        // listener later
                        List<RetryableBulkOperation<Context>> retryableReq = new ArrayList<>();
                        List<RetryableBulkOperation<Context>> refires = new ArrayList<>();
                        List<BulkResponseItem> retryableResp = new ArrayList<>();

                        for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
                            int index = resp.items().indexOf(bulkItemResponse);
                            selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, retryableReq, refires);
                        }
                        // Scheduling flushes for just sent out retryable requests
                        if (!refires.isEmpty()) {
                            scheduleRetries(refires);
                        }
                        // Retrieving list of remaining successful or not retryable requests
                        retryableReq.forEach(sentRequests::remove);
                        if (!sentRequests.isEmpty()) {
                            if (listener != null) {
                                // Creating partial BulkRequest
                                List<BulkOperation> partialOps = new ArrayList<>();
                                List<Context> partialCtx = new ArrayList<>();
                                for (RetryableBulkOperation<Context> op : sentRequests) {
                                    partialOps.add(op.operation());
                                    partialCtx.add(op.context());
                                }
                                BulkRequest partialRequest = newRequest().operations(partialOps).build();

                                // Filtering response
                                List<BulkResponseItem> partialItems = resp.items()
                                    .stream()
                                    .filter(i -> !retryableResp.contains(i))
                                    .collect(Collectors.toList());

                                BulkResponse partialResp = BulkResponse.of(br -> br
                                    .items(partialItems)
                                    .errors(resp.errors())
                                    .took(resp.took())
                                    .ingestTook(resp.ingestTook()));

                                listenerInProgressCount.incrementAndGet();
                                scheduler.submit(() -> {
                                    try {
                                        listener.afterBulk(exec.id, partialRequest, partialCtx, partialResp);
                                    } finally {
                                        if (listenerInProgressCount.decrementAndGet() == 0) {
                                            closeCondition.signalIfReady();
                                        }
                                    }
                                });
                            }
                        }

                    }
                } else {
                    // Failure
                    listenerAfterBulkException(thr, exec);
                }

                sendRequestCondition.signalIfReadyAfter(() -> {
                    requestsInFlightCount--;
                    closeCondition.signalAllIfReady();
                });
                return null;
            });
        }
    }