in dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java [113:263]
public DubboProtocol(FrameworkModel frameworkModel) {
requestHandler = new ExchangeHandlerAdapter(frameworkModel) {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(
channel,
"Unsupported request: "
+ (message == null
? null
: (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: "
+ channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread()
.setContextClassLoader(
invoker.getUrl().getServiceModel().getClassLoader());
}
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(
PROTOCOL_FAILED_REFER_INVOKER,
"",
"",
new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl())
+ " ,invocation is :" + inv);
return null;
}
}
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isDebugEnabled()) {
logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
tryToGetStubService(channel, invocation);
}
received(channel, invocation);
} catch (Throwable t) {
logger.warn(
PROTOCOL_FAILED_REFER_INVOKER,
"",
"",
"Failed to invoke event method " + invocation.getMethodName() + "(), cause: "
+ t.getMessage(),
t);
}
}
}
private void tryToGetStubService(Channel channel, Invocation invocation) throws RemotingException {
try {
Invoker<?> invoker = getInvoker(channel, invocation);
} catch (RemotingException e) {
String serviceKey = serviceKey(
0,
(String) invocation.getObjectAttachmentWithoutConvert(PATH_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(VERSION_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(GROUP_KEY));
throw new RemotingException(
channel, "The stub service[" + serviceKey + "] is not found, it may not be exported yet");
}
}
/**
* FIXME channel.getUrl() always binds to a fixed service, and this service is random.
* we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
* service this connection is binding to.
* @param channel
* @param url
* @param methodKey
* @return
*/
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(
url.getServiceModel(),
method,
url.getParameter(INTERFACE_KEY),
"",
new Class<?>[0],
new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getGroup());
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getVersion());
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
this.frameworkModel = frameworkModel;
this.frameworkModel.getBeanFactory().registerBean(new DubboGracefulShutdown(this));
}