in src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java [222:289]
public CompletableFuture<Void> write(StreamWrite streamWrite) {
checkState(this.streamServiceStub != null, "stream service is null");
CompletableFuture<Void> future = new CompletableFuture<>();
final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
= this.streamServiceStub
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
.write(
new StreamObserver<BanyandbStream.WriteResponse>() {
private BanyanDBException responseException;
@Override
public void onNext(BanyandbStream.WriteResponse writeResponse) {
BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
switch (status) {
case STATUS_SUCCEED:
break;
case STATUS_INVALID_TIMESTAMP:
responseException = new InvalidArgumentException(
"Invalid timestamp: " + streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false);
break;
case STATUS_NOT_FOUND:
responseException = new InvalidArgumentException(
"Invalid metadata: " + streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false);
break;
case STATUS_EXPIRED_SCHEMA:
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
log.warn("The schema {}.{} is expired, trying update the schema...",
metadata.getGroup(), metadata.getName());
try {
BanyanDBClient.this.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
} catch (BanyanDBException e) {
String warnMessage = String.format("Failed to refresh the stream schema %s.%s",
metadata.getGroup(), metadata.getName());
log.warn(warnMessage, e);
}
responseException = new InvalidArgumentException(
"Expired revision: " + metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true);
break;
default:
responseException = new InternalException(
String.format("Internal error (%s) occurs in server", writeResponse.getStatus()), null, Status.Code.INTERNAL, true);
break;
}
}
@Override
public void onError(Throwable throwable) {
log.error("Error occurs in flushing streams.", throwable);
future.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
if (responseException == null) {
future.complete(null);
} else {
future.completeExceptionally(responseException);
}
}
});
try {
writeRequestStreamObserver.onNext(streamWrite.build());
} finally {
writeRequestStreamObserver.onCompleted();
}
return future;
}