protected StreamObserver buildStreamObserver()

in src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java [68:108]


    protected StreamObserver<BanyandbStream.WriteRequest> buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, CompletableFuture<Void> batch) {
        return stub.write(
                new StreamObserver<BanyandbStream.WriteResponse>() {
                    private final Set<String> schemaExpired = new HashSet<>();

                    @Override
                    public void onNext(BanyandbStream.WriteResponse writeResponse) {
                        BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
                        switch (status) {
                            case STATUS_SUCCEED:
                                break;
                            case STATUS_EXPIRED_SCHEMA:
                                BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
                                String schemaKey = metadata.getGroup() + "." + metadata.getName();
                                if (!schemaExpired.contains(schemaKey)) {
                                    log.warn("The schema {} is expired, trying update the schema...", schemaKey);
                                    try {
                                        client.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
                                        schemaExpired.add(schemaKey);
                                    } catch (BanyanDBException e) {
                                        log.error(e.getMessage(), e);
                                    }
                                }
                                break;
                            default:
                                log.warn("Write stream failed with status: {}", status);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        batch.completeExceptionally(t);
                        log.error("Error occurs in flushing streams", t);
                    }

                    @Override
                    public void onCompleted() {
                        batch.complete(null);
                    }
                });
    }