in dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQInvoker.java [92:157]
protected Result doInvoke(Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(CommonConstants.PATH_KEY, getUrl().getPath());
inv.setAttachment(CommonConstants.VERSION_KEY, version);
try {
RocketMQChannel channel = new RocketMQChannel();
channel.setUrl(getUrl());
RpcContext.getContext().setLocalAddress(RocketMQProtocolConstant.LOCAL_ADDRESS);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.put(CommonConstants.TIMEOUT_KEY, timeout);
Request request = new Request();
request.setData(inv);
DynamicChannelBuffer buffer = new DynamicChannelBuffer(2048);
rocketMQCountCodec.encode(channel, buffer, request);
Message message = new Message(topic, null, buffer.array());
//message.putUserProperty(MessageConst.PROPERTY_MESSAGE_TYPE, "MixAll.REPLY_MESSAGE_FLAG");
if (!Objects.equals(this.groupModel, "topic")) {
message.putUserProperty(CommonConstants.GENERIC_KEY, this.group);
message.putUserProperty(CommonConstants.VERSION_KEY, this.version);
}
message.putUserProperty(RocketMQProtocolConstant.SEND_ADDRESS, NetUtils.getLocalHost());
Long messageTimeout = System.currentTimeMillis() + timeout;
message.putUserProperty(CommonConstants.TIMEOUT_KEY, messageTimeout.toString());
message.putUserProperty(RocketMQProtocolConstant.URL_STRING, getUrl().toString());
if (isOneway) {
if (Objects.isNull(messageQueue)) {
defaultMQProducer.sendOneway(message);
} else {
defaultMQProducer.sendOneway(message, messageQueue);
}
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
CompletableFuture<AppResponse> appResponseFuture =
DefaultFuture.newFuture(channel, request, timeout, this.getCallbackExecutor(getUrl(), inv))
.thenApply(obj -> (AppResponse) obj);
RequestCallback dubboRequestCallback = this.getRequestCallback();
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
if (Objects.isNull(messageQueue)) {
defaultMQProducer.request(message, dubboRequestCallback, timeout);
} else {
defaultMQProducer.request(message, messageQueue, dubboRequestCallback, timeout);
}
return result;
}
} catch (RemotingTooMuchRequestException e) {
String exceptionInfo = "Invoke remote method timeout. method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage();
logger.error(exceptionInfo, e);
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, exceptionInfo, e);
} catch (Exception e) {
String exceptionInfo = "Failed to invoke remote method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage();
logger.error(exceptionInfo, e);
throw new RpcException(RpcException.NETWORK_EXCEPTION, exceptionInfo, e);
}
}