in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java [68:109]
static NettyClient from(
NettySharedResources shared,
NettyRequestReplySpec spec,
URI endpointUrl,
Supplier<ChannelDuplexHandler> nettyRequestReplyHandlerSupplier) {
Endpoint endpoint = new Endpoint(endpointUrl);
long totalRequestBudgetInNanos = spec.callTimeout.toNanos();
ReadOnlyHttpHeaders headers = NettyHeaders.defaultHeadersFor(endpoint.serviceAddress());
// prepare a customized bootstrap for this specific spec.
// this bootstrap reuses the select loop and io threads as other endpoints.
Bootstrap bootstrap = shared.bootstrap().clone();
bootstrap.option(CONNECT_TIMEOUT_MILLIS, (int) spec.connectTimeout.toMillis());
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.remoteAddress(endpoint.serviceAddress());
// setup tls
final SslContext sslContext = endpoint.useTls() ? getSslContext(spec) : null;
// setup a channel pool handler
ChannelPoolHandler poolHandler =
new HttpConnectionPoolManager(
sslContext,
spec,
endpoint.serviceAddress().getHostString(),
endpoint.serviceAddress().getPort(),
nettyRequestReplyHandlerSupplier);
// setup a fixed capacity channel pool
FixedChannelPool pool =
new FixedChannelPool(
bootstrap,
poolHandler,
ChannelHealthChecker.ACTIVE,
FixedChannelPool.AcquireTimeoutAction.FAIL,
spec.connectTimeout.toMillis(),
spec.connectionPoolMaxSize,
2147483647,
true,
true);
shared.registerClosable(pool::closeAsync);
// use a dedicated, event loop to execute timers and tasks. An event loop is backed by a single
// thread.
EventLoop eventLoop = bootstrap.config().group().next();
return new NettyClient(shared, eventLoop, pool, endpoint, headers, totalRequestBudgetInNanos);
}