in dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocol.java [224:261]
private Response invoke(MessageExt messageExt, Channel channel, URL url) {
Response response = new Response();
try {
String timeoutString = messageExt.getUserProperty(CommonConstants.TIMEOUT_KEY);
Long timeout = Long.valueOf(timeoutString);
if (logger.isDebugEnabled()) {
logger.debug(String.format("reply message ext is : %s", messageExt));
}
if (Objects.isNull(messageExt.getProperty(MessageConst.PROPERTY_CLUSTER))) {
MQClientException exception = new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION,
"create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
response.setErrorMessage(exception.getMessage());
response.setStatus(Response.BAD_REQUEST);
logger.error(exception);
} else {
HeapChannelBuffer heapChannelBuffer = new HeapChannelBuffer(messageExt.getBody());
Object object = rocketmqCountCodec.decode(channel, heapChannelBuffer);
String topic = messageExt.getTopic();
Invocation inv = (Invocation) ((Request) object).getData();
if (timeout < System.currentTimeMillis()) {
logger.warn(String.format("message timeoute time is %d invocation is %s ", timeout, inv));
return null;
}
Invoker<?> invoker = exporterMap.get(topic).getInvoker();
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
response.setStatus(Response.OK);
response.setResult(result);
}
} catch (Exception e) {
String exceptionInfo = String.format("data decode or invoke fail, url is %s cause is %s", url, e.getMessage());
response.setErrorMessage(exceptionInfo);
response.setStatus(Response.BAD_REQUEST);
logger.error(exceptionInfo, e);
}
return response;
}