in common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java [249:323]
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
grpcConn.getConnectionId(), payload.toString());
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
if (request instanceof SetupAckRequest) {
// there is no connection ready this time
setupRequestHandler.requestReply(request, null);
return;
}
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
sendResponse(response);
} else {
LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(),
request.getRequestId());
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR,
"Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
// remove and notify
recAbilityContext.release(null);
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
grpcConn.getConnectionId(), throwable);
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server",
grpcConn.getConnectionId());
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
});
}