in runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala [54:190]
def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)(
implicit sys: ClassicActorSystemProvider): InternalChannel = {
implicit val ec = sys.classicSystem.dispatcher
log.debug("Creating gRPC client channel")
// https://github.com/grpc/grpc/blob/master/doc/compression.md
// since a client can't assume what algorithms a server supports, we
// must default to no compression.
// Configuring a different default could be a future feature.
// Configuring compression per call could be a future power API feature.
implicit val writer = GrpcProtocolNative.newWriter(Identity)
// TODO FIXME adapt to new API's for discovery, loadbalancing etc
// https://github.com/akka/akka-grpc/issues/1196
// https://github.com/akka/akka-grpc/issues/1197
var roundRobin: Int = 0
val clientConnectionSettings =
ClientConnectionSettings(sys).withTransport(ClientTransport.withCustomResolver((host, _) => {
settings.overrideAuthority.foreach { authority =>
assert(host == authority)
}
settings.serviceDiscovery.lookup(settings.serviceName, 10.seconds).map { resolved =>
// quasi-roundrobin is nicer than random selection: somewhat lower chance of making
// an 'unlucky choice' multiple times in a row.
roundRobin += 1
val target = resolved.addresses(roundRobin % resolved.addresses.size)
target.address match {
case Some(address) =>
new InetSocketAddress(address, target.port.getOrElse(settings.defaultPort))
case None =>
new InetSocketAddress(target.host, target.port.getOrElse(settings.defaultPort))
}
}
}))
val builder = Http()
.connectionTo(settings.overrideAuthority.getOrElse(settings.serviceName))
.withClientConnectionSettings(clientConnectionSettings)
val http2client =
if (settings.useTls) {
val connectionContext =
ConnectionContext.httpsClient {
settings.sslContext.getOrElse {
settings.trustManager match {
case None => SSLContext.getDefault
case Some(trustManager) =>
val sslContext: SSLContext = SSLContext.getInstance("TLS")
sslContext.init(Array[KeyManager](), Array[TrustManager](trustManager), new SecureRandom)
sslContext
}
}
}
builder.withCustomHttpsConnectionContext(connectionContext).managedPersistentHttp2()
} else {
builder.managedPersistentHttp2WithPriorKnowledge()
}
val (queue, doneFuture) =
Source
.queue[HttpRequest](4242, OverflowStrategy.fail)
.via(http2client)
.toMat(Sink.foreach { res =>
res.attribute(ResponsePromise.Key).get.promise.trySuccess(res)
})(Keep.both)
.run()
def singleRequest(request: HttpRequest): Future[HttpResponse] = {
val p = Promise[HttpResponse]()
queue.offer(request.addAttribute(ResponsePromise.Key, ResponsePromise(p))).flatMap(_ => p.future)
}
implicit def serializerFromMethodDescriptor[I, O](descriptor: MethodDescriptor[I, O]): ProtobufSerializer[I] =
descriptor.getRequestMarshaller.asInstanceOf[WithProtobufSerializer[I]].protobufSerializer
implicit def deserializerFromMethodDescriptor[I, O](descriptor: MethodDescriptor[I, O]): ProtobufSerializer[O] =
descriptor.getResponseMarshaller.asInstanceOf[WithProtobufSerializer[O]].protobufSerializer
new InternalChannel() {
override def shutdown(): Unit = queue.complete()
override def done: Future[Done] = doneFuture
override def invoke[I, O](
request: I,
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
options: CallOptions): Future[O] =
invokeWithMetadata(request, headers, descriptor, options).map(_.value)
override def invokeWithMetadata[I, O](
request: I,
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
options: CallOptions): Future[GrpcSingleResponse[O]] = {
val src =
invokeWithMetadata(Source.single(request), headers, descriptor, streamingResponse = false, options)
val (metadataFuture, resultFuture) = src.toMat(Sink.head)(Keep.both).run()
metadataFuture.zip(resultFuture).map {
case (metadata, result) =>
new GrpcSingleResponse[O] {
def value: O = result
def getValue(): O = result
def headers = metadata.headers
def getHeaders() = metadata.getHeaders()
def trailers = metadata.trailers
def getTrailers() = metadata.getTrailers()
}
}
}
override def invokeWithMetadata[I, O](
source: Source[I, NotUsed],
headers: MetadataImpl,
descriptor: MethodDescriptor[I, O],
streamingResponse: Boolean,
options: CallOptions): Source[O, Future[GrpcResponseMetadata]] = {
implicit val serializer: ProtobufSerializer[I] = descriptor
val deserializer: ProtobufSerializer[O] = descriptor
val scheme = if (settings.useTls) "https" else "http"
val httpRequest = GrpcRequestHelpers(
Uri(
s"${scheme}://${settings.overrideAuthority.getOrElse(settings.serviceName)}/" + descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(headers.entries),
source)
responseToSource(singleRequest(httpRequest), deserializer)
}
}
}