private void onRequestMessage()

in core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java [146:214]


    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (!(message instanceof AbstractMessage)) {
            LOGGER.error("unrecognized message:{}", message);
            return;
        }
        // the batch send request message
        if (message instanceof MergedWarpMessage) {
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                for (int i = 0; i < msgs.size(); i++) {
                    AbstractMessage msg = msgs.get(i);
                    int msgId = msgIds.get(i);
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                    } else {
                        handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                    }
                }
            } else {
                List<AbstractResultMessage> results = new ArrayList<>();
                List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<>();
                        }
                        int finalI = i;
                        completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
                            ((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
                    } else {
                        results.add(i,
                            handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                    }
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
                            results.add(completableFuture.get());
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", e.getMessage(), e);
                    }
                }
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        } else {
            // the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            if (LOGGER.isInfoEnabled()) {
                String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message,
                    NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
                BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
            }
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
            if (LOGGER.isInfoEnabled()) {
                String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result,
                    NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
                BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
            }
        }
    }