in core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java [146:214]
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (!(message instanceof AbstractMessage)) {
LOGGER.error("unrecognized message:{}", message);
return;
}
// the batch send request message
if (message instanceof MergedWarpMessage) {
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
if (PARALLEL_REQUEST_HANDLE) {
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
List<AbstractResultMessage> results = new ArrayList<>();
List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
} else {
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
try {
for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
results.add(completableFuture.get());
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
} else {
// the single send request message
final AbstractMessage msg = (AbstractMessage) message;
if (LOGGER.isInfoEnabled()) {
String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
}
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
if (LOGGER.isInfoEnabled()) {
String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
}
}
}