protected Result doInvoke()

in dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQInvoker.java [92:157]


    protected Result doInvoke(Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(CommonConstants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(CommonConstants.VERSION_KEY, version);
        try {

            RocketMQChannel channel = new RocketMQChannel();

            channel.setUrl(getUrl());

            RpcContext.getContext().setLocalAddress(RocketMQProtocolConstant.LOCAL_ADDRESS);

            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(CommonConstants.TIMEOUT_KEY, timeout);

            Request request = new Request();
            request.setData(inv);
            DynamicChannelBuffer buffer = new DynamicChannelBuffer(2048);

            rocketMQCountCodec.encode(channel, buffer, request);

            Message message = new Message(topic, null, buffer.array());
            //message.putUserProperty(MessageConst.PROPERTY_MESSAGE_TYPE, "MixAll.REPLY_MESSAGE_FLAG");

            if (!Objects.equals(this.groupModel, "topic")) {
                message.putUserProperty(CommonConstants.GENERIC_KEY, this.group);
                message.putUserProperty(CommonConstants.VERSION_KEY, this.version);
            }
            message.putUserProperty(RocketMQProtocolConstant.SEND_ADDRESS, NetUtils.getLocalHost());
            Long messageTimeout = System.currentTimeMillis() + timeout;
            message.putUserProperty(CommonConstants.TIMEOUT_KEY, messageTimeout.toString());
            message.putUserProperty(RocketMQProtocolConstant.URL_STRING, getUrl().toString());
            if (isOneway) {
                if (Objects.isNull(messageQueue)) {
                    defaultMQProducer.sendOneway(message);
                } else {
                    defaultMQProducer.sendOneway(message, messageQueue);
                }
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                CompletableFuture<AppResponse> appResponseFuture =
                    DefaultFuture.newFuture(channel, request, timeout, this.getCallbackExecutor(getUrl(), inv))
                        .thenApply(obj -> (AppResponse) obj);
                RequestCallback dubboRequestCallback = this.getRequestCallback();
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                if (Objects.isNull(messageQueue)) {
                    defaultMQProducer.request(message, dubboRequestCallback, timeout);
                } else {
                    defaultMQProducer.request(message, messageQueue, dubboRequestCallback, timeout);
                }
                return result;
            }
        } catch (RemotingTooMuchRequestException e) {
            String exceptionInfo = "Invoke remote method timeout. method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage();
            logger.error(exceptionInfo, e);
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, exceptionInfo, e);
        } catch (Exception e) {
            String exceptionInfo = "Failed to invoke remote method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage();
            logger.error(exceptionInfo, e);
            throw new RpcException(RpcException.NETWORK_EXCEPTION, exceptionInfo, e);
        }
    }