protected void flush()

in src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java [65:108]


    protected void flush(List data) {
        final CompletableFuture<Void> batch = new CompletableFuture<>();
        final StreamObserver<REQ> writeRequestStreamObserver
                = this.buildStreamObserver(stub.withDeadlineAfter(flushInterval, TimeUnit.SECONDS), batch);

        List sentData = new ArrayList(data.size());
        try {
            data.forEach(holder -> {
                Holder h = (Holder) holder;
                AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.writeEntity();
                REQ request;
                try {
                    request = entity.build();
                } catch (Throwable bt) {
                    log.error("building the entity fails: {}", entity.toString(), bt);
                    h.future().completeExceptionally(bt);
                    return;
                }
                writeRequestStreamObserver.onNext(request);
                sentData.add(h);
            });
        } catch (Throwable t) {
            log.error("Transform and send request to BanyanDB fail.", t);
            batch.completeExceptionally(t);
        } finally {
            writeRequestStreamObserver.onCompleted();
        }
        batch.whenComplete((ignored, exp) -> {
            if (exp != null) {
                sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future())
                        .forEach((Consumer<CompletableFuture<Void>>) it -> it.completeExceptionally(exp));
                log.error("Failed to execute requests in bulk", exp);
            } else {
                log.debug("Succeeded to execute {} requests in bulk", data.size());
                sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future())
                        .forEach((Consumer<CompletableFuture<Void>>) it -> it.complete(null));
            }
        });
        try {
            batch.get(30, TimeUnit.SECONDS);
        } catch (Throwable t) {
            log.error("Waiting responses from BanyanDB fail.", t);
        }
    }