protected Result doInvoke()

in dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java [62:199]


    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
            throws RpcException {
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), MERGER_KEY);
        if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
            for (final Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    try {
                        return invokeWithContext(invoker, invocation);
                    } catch (RpcException e) {
                        if (e.isNoInvokerAvailableAfterFilter()) {
                            log.debug(
                                    "No available provider for service" + getUrl().getServiceKey() + " on group "
                                            + invoker.getUrl().getGroup() + ", will continue to try another group.",
                                    e);
                        } else {
                            throw e;
                        }
                    }
                }
            }
            return invokeWithContext(invokers.iterator().next(), invocation);
        }

        Class<?> returnType;
        try {
            returnType = getInterface()
                    .getMethod(invocation.getMethodName(), invocation.getParameterTypes())
                    .getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }

        Map<String, Result> results = new HashMap<>();
        for (final Invoker<T> invoker : invokers) {
            RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);
            subInvocation.setAttachment(Constants.ASYNC_KEY, "true");
            try {
                results.put(invoker.getUrl().getServiceKey(), invokeWithContext(invoker, subInvocation));
            } catch (RpcException e) {
                if (e.isNoInvokerAvailableAfterFilter()) {
                    log.warn(
                            LoggerCodeConstants.CLUSTER_NO_VALID_PROVIDER,
                            e.getCause().getMessage(),
                            "",
                            "No available provider for service" + getUrl().getServiceKey() + " on group "
                                    + invoker.getUrl().getGroup() + ", will continue to try another group.",
                            e);
                } else {
                    throw e;
                }
            }
        }

        Object result;

        List<Result> resultList = new ArrayList<>(results.size());

        for (Map.Entry<String, Result> entry : results.entrySet()) {
            Result asyncResult = entry.getValue();
            try {
                Result r = asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error(
                            LoggerCodeConstants.CLUSTER_FAILED_GROUP_MERGE,
                            "Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: "
                                    + r.getException().getMessage(),
                            "",
                            r.getException().getMessage());
                } else {
                    resultList.add(r);
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }

        if (resultList.isEmpty()) {
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else if (resultList.size() == 1) {
            return AsyncRpcResult.newDefaultAsyncResult(resultList.get(0).getValue(), invocation);
        }

        if (returnType == void.class) {
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        }

        if (merger.startsWith(".")) {
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException | NullPointerException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ "
                        + returnType.getName() + " ]");
            }
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {
            Merger resultMerger;
            ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(
                    invocation.getModuleModel().getApplicationModel());

            if (ConfigUtils.isDefault(merger)) {
                resultMerger = applicationModel
                        .getBeanFactory()
                        .getBean(MergerFactory.class)
                        .getMerger(returnType);
            } else {
                resultMerger = applicationModel.getExtensionLoader(Merger.class).getExtension(merger);
            }
            if (resultMerger != null) {
                List<Object> rets = new ArrayList<>(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
    }