public void request()

in core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java [82:226]


    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        
        traceIfNecessary(grpcRequest, true);
        String type = grpcRequest.getMetadata().getType();
        long startTime = System.nanoTime();
        
        //server is on starting.
        if (!ApplicationUtils.isStarted()) {
            Payload payloadResponse = GrpcUtils.convert(
                    ErrorResponse.build(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.INVALID_SERVER_STATUS, null, null, System.nanoTime() - startTime);
            return;
        }

        // server check.
        if (ServerCheckRequest.class.getSimpleName().equals(type)) {
            Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true));
            traceIfNecessary(serverCheckResponseP, false);
            responseObserver.onNext(serverCheckResponseP);
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, true,
                    0, null, null, System.nanoTime() - startTime);
            return;
        }
        
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        //no handler found.
        if (requestHandler == null) {
            Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
            Payload payloadResponse = GrpcUtils
                    .convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.NO_HANDLER, null, null, System.nanoTime() - startTime);
            return;
        }
        
        //check connection status.
        String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
        boolean requestValid = connectionManager.checkValid(connectionId);
        if (!requestValid) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
            Payload payloadResponse = GrpcUtils
                    .convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.UN_REGISTER, null, null, System.nanoTime() - startTime);
            return;
        }
        
        Object parseObj = null;
        try {
            parseObj = GrpcUtils.parse(grpcRequest);
        } catch (Exception e) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
            Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.BAD_GATEWAY, e.getClass().getSimpleName(), null, System.nanoTime() - startTime);
            return;
        }
        
        if (parseObj == null) {
            Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parse request is null", connectionId);
            Payload payloadResponse = GrpcUtils
                    .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();

            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.BAD_GATEWAY, null, null, System.nanoTime() - startTime);
            return;
        }
        
        if (!(parseObj instanceof Request)) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid request receive  ,parsed payload is not a request,parseObj={}", connectionId,
                            parseObj);
            Payload payloadResponse = GrpcUtils
                    .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();

            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    NacosException.BAD_GATEWAY, null, null, System.nanoTime() - startTime);
            return;
        }
        
        Request request = (Request) parseObj;
        try {
            Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            RequestMeta requestMeta = new RequestMeta();
            requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
            requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
            requestMeta.setLabels(connection.getMetaInfo().getLabels());
            requestMeta.setAbilityTable(connection.getAbilityTable());
            connectionManager.refreshActiveTime(requestMeta.getConnectionId());
            prepareRequestContext(request, requestMeta, connection);
            Response response = requestHandler.handleRequest(request, requestMeta);
            Payload payloadResponse = GrpcUtils.convert(response);
            traceIfNecessary(payloadResponse, false);
            if (response.getErrorCode() == NacosException.OVER_THRESHOLD) {
                RpcScheduledExecutor.CONTROL_SCHEDULER.schedule(() -> {
                    traceIfNecessary(payloadResponse, false);
                    responseObserver.onNext(payloadResponse);
                    responseObserver.onCompleted();
                }, 1000L, TimeUnit.MILLISECONDS);
            } else {
                traceIfNecessary(payloadResponse, false);
                responseObserver.onNext(payloadResponse);
                responseObserver.onCompleted();
            }
            MetricsMonitor.recordGrpcRequestEvent(type, response.isSuccess(),
                    response.getErrorCode(), null, request.getModule(), System.nanoTime() - startTime);
        } catch (Throwable e) {
            Loggers.REMOTE_DIGEST
                    .error("[{}] Fail to handle request from connection [{}], error message :{}", "grpc", connectionId,
                            e);
            Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            MetricsMonitor.recordGrpcRequestEvent(type, false,
                    ResponseCode.FAIL.getCode(), e.getClass().getSimpleName(), request.getModule(), System.nanoTime() - startTime);
        } finally {
            RequestContextHolder.removeContext();
        }
        
    }