in dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java [62:199]
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
throws RpcException {
String merger = getUrl().getMethodParameter(invocation.getMethodName(), MERGER_KEY);
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
try {
return invokeWithContext(invoker, invocation);
} catch (RpcException e) {
if (e.isNoInvokerAvailableAfterFilter()) {
log.debug(
"No available provider for service" + getUrl().getServiceKey() + " on group "
+ invoker.getUrl().getGroup() + ", will continue to try another group.",
e);
} else {
throw e;
}
}
}
}
return invokeWithContext(invokers.iterator().next(), invocation);
}
Class<?> returnType;
try {
returnType = getInterface()
.getMethod(invocation.getMethodName(), invocation.getParameterTypes())
.getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
Map<String, Result> results = new HashMap<>();
for (final Invoker<T> invoker : invokers) {
RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);
subInvocation.setAttachment(Constants.ASYNC_KEY, "true");
try {
results.put(invoker.getUrl().getServiceKey(), invokeWithContext(invoker, subInvocation));
} catch (RpcException e) {
if (e.isNoInvokerAvailableAfterFilter()) {
log.warn(
LoggerCodeConstants.CLUSTER_NO_VALID_PROVIDER,
e.getCause().getMessage(),
"",
"No available provider for service" + getUrl().getServiceKey() + " on group "
+ invoker.getUrl().getGroup() + ", will continue to try another group.",
e);
} else {
throw e;
}
}
}
Object result;
List<Result> resultList = new ArrayList<>(results.size());
for (Map.Entry<String, Result> entry : results.entrySet()) {
Result asyncResult = entry.getValue();
try {
Result r = asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error(
LoggerCodeConstants.CLUSTER_FAILED_GROUP_MERGE,
"Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: "
+ r.getException().getMessage(),
"",
r.getException().getMessage());
} else {
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
}
}
if (resultList.isEmpty()) {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
return AsyncRpcResult.newDefaultAsyncResult(resultList.get(0).getValue(), invocation);
}
if (returnType == void.class) {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException | NullPointerException e) {
throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ "
+ returnType.getName() + " ]");
}
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException("Can not merge result: " + e.getMessage(), e);
}
} else {
Merger resultMerger;
ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(
invocation.getModuleModel().getApplicationModel());
if (ConfigUtils.isDefault(merger)) {
resultMerger = applicationModel
.getBeanFactory()
.getBean(MergerFactory.class)
.getMerger(returnType);
} else {
resultMerger = applicationModel.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}