in core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java [82:226]
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
traceIfNecessary(grpcRequest, true);
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//server is on starting.
if (!ApplicationUtils.isStarted()) {
Payload payloadResponse = GrpcUtils.convert(
ErrorResponse.build(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.INVALID_SERVER_STATUS, null, null, System.nanoTime() - startTime);
return;
}
// server check.
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, true,
0, null, null, System.nanoTime() - startTime);
return;
}
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//no handler found.
if (requestHandler == null) {
Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.NO_HANDLER, null, null, System.nanoTime() - startTime);
return;
}
//check connection status.
String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
boolean requestValid = connectionManager.checkValid(connectionId);
if (!requestValid) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.UN_REGISTER, null, null, System.nanoTime() - startTime);
return;
}
Object parseObj = null;
try {
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.BAD_GATEWAY, e.getClass().getSimpleName(), null, System.nanoTime() - startTime);
return;
}
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parse request is null", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.BAD_GATEWAY, null, null, System.nanoTime() - startTime);
return;
}
if (!(parseObj instanceof Request)) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive ,parsed payload is not a request,parseObj={}", connectionId,
parseObj);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
NacosException.BAD_GATEWAY, null, null, System.nanoTime() - startTime);
return;
}
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
prepareRequestContext(request, requestMeta, connection);
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
if (response.getErrorCode() == NacosException.OVER_THRESHOLD) {
RpcScheduledExecutor.CONTROL_SCHEDULER.schedule(() -> {
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}, 1000L, TimeUnit.MILLISECONDS);
} else {
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
MetricsMonitor.recordGrpcRequestEvent(type, response.isSuccess(),
response.getErrorCode(), null, request.getModule(), System.nanoTime() - startTime);
} catch (Throwable e) {
Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}], error message :{}", "grpc", connectionId,
e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(type, false,
ResponseCode.FAIL.getCode(), e.getClass().getSimpleName(), request.getModule(), System.nanoTime() - startTime);
} finally {
RequestContextHolder.removeContext();
}
}