public CompletableFuture write()

in src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java [222:289]


    public CompletableFuture<Void> write(StreamWrite streamWrite) {
        checkState(this.streamServiceStub != null, "stream service is null");

        CompletableFuture<Void> future = new CompletableFuture<>();
        final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
                = this.streamServiceStub
                .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
                .write(
                        new StreamObserver<BanyandbStream.WriteResponse>() {
                            private BanyanDBException responseException;

                            @Override
                            public void onNext(BanyandbStream.WriteResponse writeResponse) {
                                BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
                                switch (status) {
                                    case STATUS_SUCCEED:
                                        break;
                                    case STATUS_INVALID_TIMESTAMP:
                                        responseException = new InvalidArgumentException(
                                                "Invalid timestamp: " + streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false);
                                        break;
                                    case STATUS_NOT_FOUND:
                                        responseException = new InvalidArgumentException(
                                                "Invalid metadata: " + streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false);
                                        break;
                                    case STATUS_EXPIRED_SCHEMA:
                                        BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
                                        log.warn("The schema {}.{} is expired, trying update the schema...",
                                                metadata.getGroup(), metadata.getName());
                                        try {
                                            BanyanDBClient.this.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
                                        } catch (BanyanDBException e) {
                                            String warnMessage = String.format("Failed to refresh the stream schema %s.%s",
                                                    metadata.getGroup(), metadata.getName());
                                            log.warn(warnMessage, e);
                                        }
                                        responseException = new InvalidArgumentException(
                                                "Expired revision: " + metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true);
                                        break;
                                    default:
                                        responseException = new InternalException(
                                              String.format("Internal error (%s) occurs in server", writeResponse.getStatus()), null, Status.Code.INTERNAL, true);
                                        break;
                                }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                log.error("Error occurs in flushing streams.", throwable);
                                future.completeExceptionally(throwable);
                            }

                            @Override
                            public void onCompleted() {
                                if (responseException == null) {
                                    future.complete(null);
                                } else {
                                    future.completeExceptionally(responseException);
                                }
                            }
                        });
        try {
            writeRequestStreamObserver.onNext(streamWrite.build());
        } finally {
            writeRequestStreamObserver.onCompleted();
        }
        return future;
    }