in runtime/src/main/scala/org/apache/pekko/grpc/internal/NettyClientUtils.scala [46:176]
def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)(
implicit ec: ExecutionContext): InternalChannel = {
@nowarn("cat=deprecation")
var builder =
NettyChannelBuilder
// Not sure why netty wants to be able to shoe-horn the target into a URI... but ok,
// we follow their lead and encode the service name as the 'authority' of the URI.
.forTarget("//" + settings.serviceName)
.flowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW)
// TODO avoid nameResolverFactory #1092, then 'nowarn' can be removed above
.nameResolverFactory(
new PekkoDiscoveryNameResolverProvider(
settings.serviceDiscovery,
settings.defaultPort,
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout))
if (!settings.useTls)
builder = builder.usePlaintext()
else {
builder = builder.negotiationType(NegotiationType.TLS)
builder = settings.sslContext match {
case Some(sslContext) =>
builder.sslContext(createNettySslContext(sslContext))
case None =>
(settings.trustManager, settings.sslProvider) match {
case (None, None) =>
builder
case (tm, provider) =>
val context = provider match {
case None =>
GrpcSslContexts.configure(SslContextBuilder.forClient())
case Some(sslProvider) =>
GrpcSslContexts.configure(SslContextBuilder.forClient(), sslProvider)
}
builder.sslContext((tm match {
case None => context
case Some(trustManager) => context.trustManager(trustManager)
}).build())
}
}
}
builder = settings.loadBalancingPolicy.map(builder.defaultLoadBalancingPolicy(_)).getOrElse(builder)
builder = settings.overrideAuthority.map(builder.overrideAuthority(_)).getOrElse(builder)
builder = settings.userAgent.map(builder.userAgent(_)).getOrElse(builder)
builder = settings.channelBuilderOverrides(builder)
val connectionAttempts = settings.loadBalancingPolicy match {
case None | Some("pick_first") => settings.connectionAttempts
case _ =>
// When loadbalancing we cannot count the individual attempts, so
// the only options are '1' ('don't retry') or 'retry indefinitely'
settings.connectionAttempts.flatMap {
case 1 => Some(1)
case _ => None
}
}
val channel = builder.build()
val channelReadyPromise = Promise[Unit]()
val channelClosedPromise = Promise[Done]()
ChannelUtils.monitorChannel(channelReadyPromise, channelClosedPromise, channel, connectionAttempts, log)
channelReadyPromise.future.onComplete {
case Success(()) =>
// OK!
case Failure(e) =>
// shutdown is idempotent in ManagedChannelImpl
channel.shutdown()
channelClosedPromise.tryFailure(e)
}
new InternalChannel {
override def shutdown() = channel.shutdown()
override def done = channelClosedPromise.future
override def invoke[I, O](
request: I,
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
options: CallOptions): Future[O] = {
val listener = new UnaryCallAdapter[O]
val call = channel.newCall(descriptor, callOptionsWithDeadline(options, settings))
call.start(listener, headers.toGoogleGrpcMetadata())
call.sendMessage(request)
call.halfClose()
call.request(2)
listener.future
}
override def invokeWithMetadata[I, O](
request: I,
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
options: CallOptions): Future[GrpcSingleResponse[O]] = {
val listener = new UnaryCallWithMetadataAdapter[O]
val call = channel.newCall(descriptor, callOptionsWithDeadline(options, settings))
call.start(listener, headers.toGoogleGrpcMetadata())
call.sendMessage(request)
call.halfClose()
call.request(2)
listener.future
}
override def invokeWithMetadata[I, O](
source: Source[I, NotUsed],
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
streamingResponse: Boolean,
options: CallOptions) = {
val flow =
createFlow(headers, descriptor, streamingResponse, callOptionsWithDeadline(options, settings))
source.viaMat(flow)(Keep.right)
}
// TODO can't you derive the method name from the descriptor?
private def createFlow[I, O](
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
streamingResponse: Boolean,
options: CallOptions): Flow[I, O, Future[GrpcResponseMetadata]] =
Flow.fromGraph(new PekkoNettyGrpcClientGraphStage(descriptor, channel, options, streamingResponse, headers))
}
}