in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java [65:102]
private DefaultHttpRequestReplyClient createClient(
ObjectNode transportProperties, URI endpointUrl) {
try (SetContextClassLoader ignored = new SetContextClassLoader(this)) {
OkHttpClient sharedClient = this.sharedClient;
if (sharedClient == null) {
sharedClient = OkHttpUtils.newClient();
this.sharedClient = sharedClient;
}
final OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
final DefaultHttpRequestReplyClientSpec transportClientSpec =
parseTransportProperties(transportProperties);
clientBuilder.callTimeout(transportClientSpec.getTimeouts().getCallTimeout());
clientBuilder.connectTimeout(transportClientSpec.getTimeouts().getConnectTimeout());
clientBuilder.readTimeout(transportClientSpec.getTimeouts().getReadTimeout());
clientBuilder.writeTimeout(transportClientSpec.getTimeouts().getWriteTimeout());
HttpUrl url;
if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
url =
new HttpUrl.Builder()
.scheme("http")
.host("unused")
.addPathSegment(endpoint.pathSegment)
.build();
configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
} else {
url = HttpUrl.get(endpointUrl);
}
return new DefaultHttpRequestReplyClient(
url, clientBuilder.build(), () -> isShutdown(this.sharedClient));
}
}