in src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java [65:108]
protected void flush(List data) {
final CompletableFuture<Void> batch = new CompletableFuture<>();
final StreamObserver<REQ> writeRequestStreamObserver
= this.buildStreamObserver(stub.withDeadlineAfter(flushInterval, TimeUnit.SECONDS), batch);
List sentData = new ArrayList(data.size());
try {
data.forEach(holder -> {
Holder h = (Holder) holder;
AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.writeEntity();
REQ request;
try {
request = entity.build();
} catch (Throwable bt) {
log.error("building the entity fails: {}", entity.toString(), bt);
h.future().completeExceptionally(bt);
return;
}
writeRequestStreamObserver.onNext(request);
sentData.add(h);
});
} catch (Throwable t) {
log.error("Transform and send request to BanyanDB fail.", t);
batch.completeExceptionally(t);
} finally {
writeRequestStreamObserver.onCompleted();
}
batch.whenComplete((ignored, exp) -> {
if (exp != null) {
sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future())
.forEach((Consumer<CompletableFuture<Void>>) it -> it.completeExceptionally(exp));
log.error("Failed to execute requests in bulk", exp);
} else {
log.debug("Succeeded to execute {} requests in bulk", data.size());
sentData.stream().map((Function<Object, CompletableFuture<Void>>) o -> ((Holder) o).future())
.forEach((Consumer<CompletableFuture<Void>>) it -> it.complete(null));
}
});
try {
batch.get(30, TimeUnit.SECONDS);
} catch (Throwable t) {
log.error("Waiting responses from BanyanDB fail.", t);
}
}