public CompletableFuture onFilter()

in transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/WebSocketClientCodecFilter.java [68:132]


  public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
    invocation.getInvocationStageTrace().startConsumerConnection();

    CompletableFuture<Response> createWebSocket = new CompletableFuture<>();
    URIEndpointObject endpoint = (URIEndpointObject) invocation.getEndpoint().getAddress();
    HttpClientOptionsSPI optionsSPI;
    if (endpoint.isHttp2Enabled()) {
      optionsSPI = SPIServiceUtils.getTargetService(HttpClientOptionsSPI.class,
          HttpTransportHttpClientOptionsSPI.class);
    } else {
      optionsSPI = SPIServiceUtils.getTargetService(HttpClientOptionsSPI.class,
          Http2TransportHttpClientOptionsSPI.class);
    }
    WebSocketClient webSocketClient = HttpClients.createWebSocketClient(optionsSPI, endpoint.isSslEnabled());

    try {
      webSocketClient.connect(endpoint.getPort(), endpoint.getHostOrIp(), createRequestPath(invocation,
              RestMetaUtils.getRestOperationMeta(invocation.getOperationMeta())))
          .onComplete(asyncResult -> {
            invocation.getInvocationStageTrace().finishConsumerConnection();
            if (asyncResult.failed()) {
              createWebSocket.completeExceptionally(asyncResult.cause());
              return;
            }
            if (invocation.isEdge()) {
              WebSocketTransportContext parentContext = invocation.getTransportContext();
              ServerWebSocket serverWebSocket = parentContext.getServerWebSocket();
              WebSocket clientWebSocket = asyncResult.result();
              serverWebSocket.closeHandler(v -> {
                if (!clientWebSocket.isClosed()) {
                  clientWebSocket.close();
                }
              });
              serverWebSocket.textMessageHandler(clientWebSocket::writeTextMessage);
              serverWebSocket.binaryMessageHandler(clientWebSocket::writeBinaryMessage);
              serverWebSocket.exceptionHandler(e -> {
                LOGGER.warn("consumer exception.", e);
                if (!serverWebSocket.isClosed()) {
                  serverWebSocket.close();
                }
              });
              clientWebSocket.closeHandler(v -> {
                if (!serverWebSocket.isClosed()) {
                  serverWebSocket.close();
                }
              });
              clientWebSocket.textMessageHandler(serverWebSocket::writeTextMessage);
              clientWebSocket.binaryMessageHandler(serverWebSocket::writeBinaryMessage);
              clientWebSocket.exceptionHandler(e -> {
                LOGGER.warn("producer exception.", e);
                if (!clientWebSocket.isClosed()) {
                  clientWebSocket.close();
                }
              });
            }
            invocation.setTransportContext(new WebSocketClientTransportContext(
                asyncResult.result()));
            createWebSocket.complete(Response.createSuccess(asyncResult.result()));
          });
    } catch (Exception e) {
      createWebSocket.completeExceptionally(e);
    }

    return createWebSocket;
  }