in transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/WebSocketClientCodecFilter.java [68:132]
public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
invocation.getInvocationStageTrace().startConsumerConnection();
CompletableFuture<Response> createWebSocket = new CompletableFuture<>();
URIEndpointObject endpoint = (URIEndpointObject) invocation.getEndpoint().getAddress();
HttpClientOptionsSPI optionsSPI;
if (endpoint.isHttp2Enabled()) {
optionsSPI = SPIServiceUtils.getTargetService(HttpClientOptionsSPI.class,
HttpTransportHttpClientOptionsSPI.class);
} else {
optionsSPI = SPIServiceUtils.getTargetService(HttpClientOptionsSPI.class,
Http2TransportHttpClientOptionsSPI.class);
}
WebSocketClient webSocketClient = HttpClients.createWebSocketClient(optionsSPI, endpoint.isSslEnabled());
try {
webSocketClient.connect(endpoint.getPort(), endpoint.getHostOrIp(), createRequestPath(invocation,
RestMetaUtils.getRestOperationMeta(invocation.getOperationMeta())))
.onComplete(asyncResult -> {
invocation.getInvocationStageTrace().finishConsumerConnection();
if (asyncResult.failed()) {
createWebSocket.completeExceptionally(asyncResult.cause());
return;
}
if (invocation.isEdge()) {
WebSocketTransportContext parentContext = invocation.getTransportContext();
ServerWebSocket serverWebSocket = parentContext.getServerWebSocket();
WebSocket clientWebSocket = asyncResult.result();
serverWebSocket.closeHandler(v -> {
if (!clientWebSocket.isClosed()) {
clientWebSocket.close();
}
});
serverWebSocket.textMessageHandler(clientWebSocket::writeTextMessage);
serverWebSocket.binaryMessageHandler(clientWebSocket::writeBinaryMessage);
serverWebSocket.exceptionHandler(e -> {
LOGGER.warn("consumer exception.", e);
if (!serverWebSocket.isClosed()) {
serverWebSocket.close();
}
});
clientWebSocket.closeHandler(v -> {
if (!serverWebSocket.isClosed()) {
serverWebSocket.close();
}
});
clientWebSocket.textMessageHandler(serverWebSocket::writeTextMessage);
clientWebSocket.binaryMessageHandler(serverWebSocket::writeBinaryMessage);
clientWebSocket.exceptionHandler(e -> {
LOGGER.warn("producer exception.", e);
if (!clientWebSocket.isClosed()) {
clientWebSocket.close();
}
});
}
invocation.setTransportContext(new WebSocketClientTransportContext(
asyncResult.result()));
createWebSocket.complete(Response.createSuccess(asyncResult.result()));
});
} catch (Exception e) {
createWebSocket.completeExceptionally(e);
}
return createWebSocket;
}