in dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java [102:214]
public Result doInvoke(Invocation invocation) throws Throwable {
if (exporter == null) {
exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
}
// Solve local exposure, the server opens the token, and the client call fails.
Invoker<?> invoker = exporter.getInvoker();
URL serverURL = invoker.getUrl();
boolean serverHasToken = serverURL.hasParameter(Constants.TOKEN_KEY);
if (serverHasToken) {
invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
}
if (consumerUrl == null) {
// no need to sync, multi-objects is acceptable and will be gc-ed.
consumerUrl =
new DubboServiceAddressURL(serverURL.getUrlAddress(), serverURL.getUrlParam(), getUrl(), null);
}
int timeout =
RpcUtils.calculateTimeout(consumerUrl, invocation, RpcUtils.getMethodName(invocation), DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(
new RpcException(
RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ RpcUtils.getMethodName(invocation) + ", terminate directly."),
invocation);
}
invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
String desc = ReflectUtils.getDesc(invocation.getParameterTypes());
// recreate invocation ---> deep copy parameters
Invocation copiedInvocation = recreateInvocation(invocation, invoker, desc);
if (isAsync(invoker.getUrl(), getUrl())) {
((RpcInvocation) copiedInvocation).setInvokeMode(InvokeMode.ASYNC);
// use consumer executor
ExecutorService executor = executorRepository.createExecutorIfAbsent(
ExecutorUtil.setThreadName(getUrl(), SERVER_THREAD_POOL_NAME));
CompletableFuture<AppResponse> appResponseFuture = CompletableFuture.supplyAsync(
() -> {
// clear thread local before child invocation, prevent context pollution
InternalThreadLocalMap originTL = InternalThreadLocalMap.getAndRemove();
try {
RpcContext.getServiceContext().setRemoteAddress(LOCALHOST_VALUE, 0);
RpcContext.getServiceContext().setRemoteApplicationName(getUrl().getApplication());
Result result = invoker.invoke(copiedInvocation);
if (result.hasException()) {
AppResponse appResponse = new AppResponse(result.getException());
appResponse.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
return appResponse;
} else {
rebuildValue(invocation, invoker, result);
AppResponse appResponse = new AppResponse(result.getValue());
appResponse.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
return appResponse;
}
} finally {
InternalThreadLocalMap.set(originTL);
}
},
executor);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
if (setFutureWhenSync || ((RpcInvocation) invocation).getInvokeMode() != InvokeMode.SYNC) {
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
}
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, copiedInvocation);
result.setExecutor(executor);
return result;
} else {
Result result;
// clear thread local before child invocation, prevent context pollution
InternalThreadLocalMap originTL = InternalThreadLocalMap.getAndRemove();
try {
RpcContext.getServiceContext().setRemoteAddress(LOCALHOST_VALUE, 0);
RpcContext.getServiceContext().setRemoteApplicationName(getUrl().getApplication());
result = invoker.invoke(copiedInvocation);
} finally {
InternalThreadLocalMap.set(originTL);
}
CompletableFuture<AppResponse> future = new CompletableFuture<>();
AppResponse rpcResult = new AppResponse(copiedInvocation);
if (result instanceof AsyncRpcResult) {
result.whenCompleteWithContext((r, t) -> {
if (t != null) {
rpcResult.setException(t);
} else {
if (r.hasException()) {
rpcResult.setException(r.getException());
} else {
Object rebuildValue = rebuildValue(invocation, invoker, r.getValue());
rpcResult.setValue(rebuildValue);
}
}
rpcResult.setObjectAttachments(new HashMap<>(r.getObjectAttachments()));
future.complete(rpcResult);
});
} else {
if (result.hasException()) {
rpcResult.setException(result.getException());
} else {
Object rebuildValue = rebuildValue(invocation, invoker, result.getValue());
rpcResult.setValue(rebuildValue);
}
rpcResult.setObjectAttachments(new HashMap<>(result.getObjectAttachments()));
future.complete(rpcResult);
}
return new AsyncRpcResult(future, invocation);
}
}