protected StreamObserver buildStreamObserver()

in src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java [67:107]


    protected StreamObserver<BanyandbMeasure.WriteRequest> buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
                                                                               CompletableFuture<Void> batch) {
        return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
            private final Set<String> schemaExpired = new HashSet<>();

            @Override
            public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
                BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
                switch (status) {
                    case STATUS_SUCCEED:
                        break;
                    case STATUS_EXPIRED_SCHEMA:
                        BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
                        String schemaKey = metadata.getGroup() + "." + metadata.getName();
                        if (!schemaExpired.contains(schemaKey)) {
                            log.warn("The schema {} is expired, trying update the schema...", schemaKey);
                            try {
                                client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
                                schemaExpired.add(schemaKey);
                            } catch (BanyanDBException e) {
                                log.error(e.getMessage(), e);
                            }
                        }
                        break;
                    default:
                        log.warn("Write measure failed with status: {}", status);
                }
            }

            @Override
            public void onError(Throwable t) {
                batch.completeExceptionally(t);
                log.error("Error occurs in flushing measures", t);
            }

            @Override
            public void onCompleted() {
                batch.complete(null);
            }
        });
    }