in src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java [144:175]
protected CompletableFuture<Void> doFlush(final List<Holder> data) {
// The batch is used to control the completion of the flush operation.
// There is at most one error per batch,
// because the database server would terminate the batch process when the first error occurs.
final CompletableFuture<Void> batch = new CompletableFuture<>();
final StreamObserver<REQ> writeRequestStreamObserver
= this.buildStreamObserver(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS), batch);
try {
data.forEach(h -> {
AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.writeEntity();
REQ request;
try {
request = entity.build();
} catch (Throwable bt) {
log.error("building the entity fails: {}", entity.toString(), bt);
h.future().completeExceptionally(bt);
return;
}
writeRequestStreamObserver.onNext(request);
h.future().complete(null);
});
} finally {
writeRequestStreamObserver.onCompleted();
}
batch.whenComplete((ignored, exp) -> {
if (exp != null) {
log.error("Failed to execute requests in bulk", exp);
}
});
return batch;
}