private DefaultHttpRequestReplyClient createClient()

in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java [65:102]


  private DefaultHttpRequestReplyClient createClient(
      ObjectNode transportProperties, URI endpointUrl) {
    try (SetContextClassLoader ignored = new SetContextClassLoader(this)) {
      OkHttpClient sharedClient = this.sharedClient;
      if (sharedClient == null) {
        sharedClient = OkHttpUtils.newClient();
        this.sharedClient = sharedClient;
      }
      final OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();

      final DefaultHttpRequestReplyClientSpec transportClientSpec =
          parseTransportProperties(transportProperties);

      clientBuilder.callTimeout(transportClientSpec.getTimeouts().getCallTimeout());
      clientBuilder.connectTimeout(transportClientSpec.getTimeouts().getConnectTimeout());
      clientBuilder.readTimeout(transportClientSpec.getTimeouts().getReadTimeout());
      clientBuilder.writeTimeout(transportClientSpec.getTimeouts().getWriteTimeout());

      HttpUrl url;
      if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
        UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);

        url =
            new HttpUrl.Builder()
                .scheme("http")
                .host("unused")
                .addPathSegment(endpoint.pathSegment)
                .build();

        configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
      } else {
        url = HttpUrl.get(endpointUrl);
      }

      return new DefaultHttpRequestReplyClient(
          url, clientBuilder.build(), () -> isShutdown(this.sharedClient));
    }
  }