in dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java [54:146]
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
throws RpcException {
RpcContext.getServiceContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
URL url = getUrl();
// The value range of broadcast.fail.threshold must be 0~100.
// 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be
// thrown.
// see https://github.com/apache/dubbo/pull/7174
int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);
if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
logger.info(String.format(
"The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. "
+ "The current setting is %s, which is reset to 100.",
broadcastFailPercent));
broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
}
int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
int failIndex = 0;
for (int i = 0, invokersSize = invokers.size(); i < invokersSize; i++) {
Invoker<T> invoker = invokers.get(i);
RpcContext.RestoreContext restoreContext = new RpcContext.RestoreContext();
try {
RpcInvocation subInvocation = new RpcInvocation(
invocation.getTargetServiceUniqueName(),
invocation.getServiceModel(),
invocation.getMethodName(),
invocation.getServiceName(),
invocation.getProtocolServiceKey(),
invocation.getParameterTypes(),
invocation.getArguments(),
invocation.copyObjectAttachments(),
invocation.getInvoker(),
Collections.synchronizedMap(new HashMap<>(invocation.getAttributes())),
invocation instanceof RpcInvocation ? ((RpcInvocation) invocation).getInvokeMode() : null);
result = invokeWithContext(invoker, subInvocation);
if (null != result && result.hasException()) {
Throwable resultException = result.getException();
if (null != resultException) {
exception = getRpcException(result.getException());
logger.warn(
CLUSTER_ERROR_RESPONSE,
"provider return error response",
"",
exception.getMessage(),
exception);
failIndex++;
if (failIndex == failThresholdIndex) {
break;
}
}
}
} catch (Throwable e) {
exception = getRpcException(e);
logger.warn(
CLUSTER_ERROR_RESPONSE,
"provider return error response",
"",
exception.getMessage(),
exception);
failIndex++;
if (failIndex == failThresholdIndex) {
break;
}
} finally {
if (i != invokersSize - 1) {
restoreContext.restore();
}
}
}
if (exception != null) {
if (failIndex == failThresholdIndex) {
if (logger.isDebugEnabled()) {
logger.debug(String.format(
"The number of BroadcastCluster call failures has reached the threshold %s",
failThresholdIndex));
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(String.format(
"The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
failThresholdIndex, failIndex));
}
}
throw exception;
}
return result;
}