in core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java [134:211]
private void streamObserverOnNext(Payload payload, String connectionId, Integer localPort, int remotePort,
String remoteIp, StreamObserver<Payload> responseObserver) {
String clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload);
Object parseObj;
try {
parseObj = GrpcUtils.parse(payload);
} catch (Throwable throwable) {
Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
return;
}
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
return;
}
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
Map<String, String> labels = setUpRequest.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), remoteIp,
remotePort, localPort, ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName,
setUpRequest.getLabels());
metaInfo.setNamespaceId(setUpRequest.getTenant());
Channel channel = GrpcServerConstants.CONTEXT_KEY_CHANNEL.get();
Attribute<Boolean> tlsProtected = channel.attr(AttributeKey.valueOf("TLS_PROTECTED"));
metaInfo.setTlsProtected(tlsProtected != null && tlsProtected.get() != null && tlsProtected.get());
GrpcConnection connection = new GrpcConnection(metaInfo, responseObserver,
GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
// null if supported
if (setUpRequest.getAbilityTable() != null) {
// map to table
connection.setAbilityTable(setUpRequest.getAbilityTable());
}
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
//Not register to the connection manager if current server is over limit or server is starting.
try {
Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
rejectSdkOnStarting ? " server is not started" : " server is over limited.");
connection.close();
} catch (Exception e) {
//Do nothing.
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Send connect reset request error,error={}", connectionId, e);
}
}
} else {
try {
// server sends abilities only when:
// 1. client sends setUpRequest with its abilities table
// 2. client sends setUpRequest with empty table
if (setUpRequest.getAbilityTable() != null) {
// finish register, tell client has set up successfully
// async response without client ack
connection.sendRequestNoAck(new SetupAckRequest(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(AbilityMode.SERVER)));
}
} catch (Exception e) {
// nothing to do
}
}
} else if (parseObj instanceof Response) {
Response response = (Response) parseObj;
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Receive response of server request ,response={}", connectionId,
response);
}
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
connectionManager.refreshActiveTime(connectionId);
} else {
Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
parseObj);
}
}