private void streamObserverOnNext()

in core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java [134:211]


    private void streamObserverOnNext(Payload payload, String connectionId, Integer localPort, int remotePort,
            String remoteIp, StreamObserver<Payload> responseObserver) {
        String clientIp = payload.getMetadata().getClientIp();
        traceDetailIfNecessary(payload);
        Object parseObj;
        try {
            parseObj = GrpcUtils.parse(payload);
        } catch (Throwable throwable) {
            Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
            return;
        }
        if (parseObj == null) {
            Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
                    payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
            return;
        }
        if (parseObj instanceof ConnectionSetupRequest) {
            ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
            Map<String, String> labels = setUpRequest.getLabels();
            String appName = "-";
            if (labels != null && labels.containsKey(Constants.APPNAME)) {
                appName = labels.get(Constants.APPNAME);
            }
            ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), remoteIp,
                    remotePort, localPort, ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName,
                    setUpRequest.getLabels());
            metaInfo.setNamespaceId(setUpRequest.getTenant());
            Channel channel = GrpcServerConstants.CONTEXT_KEY_CHANNEL.get();
            Attribute<Boolean> tlsProtected = channel.attr(AttributeKey.valueOf("TLS_PROTECTED"));
            metaInfo.setTlsProtected(tlsProtected != null && tlsProtected.get() != null && tlsProtected.get());
            GrpcConnection connection = new GrpcConnection(metaInfo, responseObserver,
                    GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
            // null if supported
            if (setUpRequest.getAbilityTable() != null) {
                // map to table
                connection.setAbilityTable(setUpRequest.getAbilityTable());
            }
            boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
            if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                //Not register to the connection manager if current server is over limit or server is starting.
                try {
                    Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
                            rejectSdkOnStarting ? " server is not started" : " server is over limited.");
                    connection.close();
                } catch (Exception e) {
                    //Do nothing.
                    if (connectionManager.traced(clientIp)) {
                        Loggers.REMOTE_DIGEST.warn("[{}]Send connect reset request error,error={}", connectionId, e);
                    }
                }
            } else {
                try {
                    // server sends abilities only when:
                    //      1. client sends setUpRequest with its abilities table
                    //      2. client sends setUpRequest with empty table
                    if (setUpRequest.getAbilityTable() != null) {
                        // finish register, tell client has set up successfully
                        // async response without client ack
                        connection.sendRequestNoAck(new SetupAckRequest(
                                NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(AbilityMode.SERVER)));
                    }
                } catch (Exception e) {
                    // nothing to do
                }
            }
        } else if (parseObj instanceof Response) {
            Response response = (Response) parseObj;
            if (connectionManager.traced(clientIp)) {
                Loggers.REMOTE_DIGEST.warn("[{}]Receive response of server request  ,response={}", connectionId,
                        response);
            }
            RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
            connectionManager.refreshActiveTime(connectionId);
        } else {
            Loggers.REMOTE_DIGEST.warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
                    parseObj);
        }
    }