in src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java [67:107]
protected StreamObserver<BanyandbMeasure.WriteRequest> buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
CompletableFuture<Void> batch) {
return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
private final Set<String> schemaExpired = new HashSet<>();
@Override
public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
switch (status) {
case STATUS_SUCCEED:
break;
case STATUS_EXPIRED_SCHEMA:
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
String schemaKey = metadata.getGroup() + "." + metadata.getName();
if (!schemaExpired.contains(schemaKey)) {
log.warn("The schema {} is expired, trying update the schema...", schemaKey);
try {
client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
schemaExpired.add(schemaKey);
} catch (BanyanDBException e) {
log.error(e.getMessage(), e);
}
}
break;
default:
log.warn("Write measure failed with status: {}", status);
}
}
@Override
public void onError(Throwable t) {
batch.completeExceptionally(t);
log.error("Error occurs in flushing measures", t);
}
@Override
public void onCompleted() {
batch.complete(null);
}
});
}