in flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java [215:369]
public FlightServer build() {
// Add the auth middleware if applicable.
if (headerAuthenticator != CallHeaderAuthenticator.NO_OP) {
this.middleware(
FlightServerMiddleware.Key.of(Auth2Constants.AUTHORIZATION_HEADER),
new ServerCallHeaderAuthMiddleware.Factory(headerAuthenticator));
}
this.middleware(FlightConstants.HEADER_KEY, new ServerHeaderMiddleware.Factory());
final NettyServerBuilder builder;
switch (location.getUri().getScheme()) {
case LocationSchemes.GRPC_DOMAIN_SOCKET:
{
// The implementation is platform-specific, so we have to find the classes at runtime
builder = NettyServerBuilder.forAddress(location.toSocketAddress());
try {
try {
// Linux
builder.channelType(
Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel")
.asSubclass(ServerChannel.class));
final EventLoopGroup elg =
Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getConstructor()
.newInstance();
builder.bossEventLoopGroup(elg).workerEventLoopGroup(elg);
} catch (ClassNotFoundException e) {
// BSD
builder.channelType(
Class.forName("io.netty.channel.kqueue.KQueueServerDomainSocketChannel")
.asSubclass(ServerChannel.class));
final EventLoopGroup elg =
Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getConstructor()
.newInstance();
builder.bossEventLoopGroup(elg).workerEventLoopGroup(elg);
}
} catch (ClassNotFoundException
| InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new UnsupportedOperationException(
"Could not find suitable Netty native transport implementation for domain socket address.");
}
break;
}
case LocationSchemes.GRPC:
case LocationSchemes.GRPC_INSECURE:
{
builder = NettyServerBuilder.forAddress(location.toSocketAddress());
break;
}
case LocationSchemes.GRPC_TLS:
{
if (certChain == null) {
throw new IllegalArgumentException(
"Must provide a certificate and key to serve gRPC over TLS");
}
builder = NettyServerBuilder.forAddress(location.toSocketAddress());
break;
}
default:
throw new IllegalArgumentException(
"Scheme is not supported: " + location.getUri().getScheme());
}
if (certChain != null) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forServer(certChain, key);
if (mTlsCACert != null) {
sslContextBuilder.clientAuth(ClientAuth.REQUIRE).trustManager(mTlsCACert);
}
try {
sslContext = sslContextBuilder.build();
} catch (SSLException e) {
throw new RuntimeException(e);
} finally {
closeMTlsCACert();
closeCertChain();
closeKey();
}
builder.sslContext(sslContext);
}
// Share one executor between the gRPC service, DoPut, and Handshake
final ExecutorService exec;
// We only want to have FlightServer close the gRPC executor if we created it here. We should
// not close
// user-supplied executors.
final ExecutorService grpcExecutor;
if (executor != null) {
exec = executor;
grpcExecutor = null;
} else {
exec =
Executors.newCachedThreadPool(
// Name threads for better debuggability
new ThreadFactoryBuilder()
.setNameFormat("flight-server-default-executor-%d")
.build());
grpcExecutor = exec;
}
final FlightBindingService flightService =
new FlightBindingService(allocator, producer, authHandler, exec);
builder
.executor(exec)
.maxInboundMessageSize(maxInboundMessageSize)
.maxInboundMetadataSize(maxHeaderListSize)
.addService(
ServerInterceptors.intercept(
flightService,
new ServerBackpressureThresholdInterceptor(backpressureThreshold),
new ServerAuthInterceptor(authHandler)));
// Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow
// versions or
// Flight implementations.
builderOptions.computeIfPresent(
"grpc.builderConsumer",
(key, builderConsumer) -> {
final Consumer<NettyServerBuilder> consumer =
(Consumer<NettyServerBuilder>) builderConsumer;
consumer.accept(builder);
return null;
});
// Allow explicitly setting some Netty-specific options
builderOptions.computeIfPresent(
"netty.channelType",
(key, channelType) -> {
builder.channelType((Class<? extends ServerChannel>) channelType);
return null;
});
builderOptions.computeIfPresent(
"netty.bossEventLoopGroup",
(key, elg) -> {
builder.bossEventLoopGroup((EventLoopGroup) elg);
return null;
});
builderOptions.computeIfPresent(
"netty.workerEventLoopGroup",
(key, elg) -> {
builder.workerEventLoopGroup((EventLoopGroup) elg);
return null;
});
builder.intercept(new ServerInterceptorAdapter(interceptors));
return new FlightServer(location, builder.build(), grpcExecutor);
}