in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala [79:141]
def bindAndHandleAsync(
handler: HttpRequest => Future[HttpResponse],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {
val httpPlusSwitching: HttpPlusSwitching =
if (connectionContext.isSecure) httpsWithAlpn(connectionContext.asInstanceOf[HttpsConnectionContext])
else priorKnowledge
val effectivePort =
if (port >= 0) port
else if (connectionContext.isSecure) settings.defaultHttpsPort
else settings.defaultHttpPort
val http1: HttpImplementation =
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, settings, log))
.joinMat(GracefulTerminatorStage(system, settings).atop(http.serverLayer(settings, log = log)))(Keep.right)
val http2: HttpImplementation =
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher)
.joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry, Http().dateHeaderRendering))(Keep.right)
val masterTerminator = new MasterServerTerminator(log)
Tcp(system).bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false,
Duration.Inf) // we knowingly disable idle-timeout on TCP level, as we handle it explicitly in Pekko HTTP itself
.via(if (telemetry == NoOpTelemetry) Flow[Tcp.IncomingConnection] else telemetry.serverBinding)
.mapAsyncUnordered(settings.maxConnections) {
(incoming: Tcp.IncomingConnection) =>
try {
httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming))
.watchTermination() {
case (connectionTerminatorF, future) =>
connectionTerminatorF.foreach { connectionTerminator =>
masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
future.onComplete(_ => masterTerminator.removeConnection(connectionTerminator))(fm.executionContext)
}(fm.executionContext)
future // drop the terminator matValue, we already registered is which is all we need to do here
}
.join(HttpConnectionIdleTimeoutBidi(settings.idleTimeout, Some(incoming.remoteAddress)).join(
incoming.flow))
.addAttributes(Http.cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))
.run().recover {
// Ignore incoming errors from the connection as they will cancel the binding.
// As far as it is known currently, these errors can only happen if a TCP error bubbles up
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
throw e
}
}.mapMaterializedValue {
_.map(tcpBinding =>
ServerBinding(tcpBinding.localAddress)(
() => tcpBinding.unbind(),
timeout => masterTerminator.terminate(timeout)(fm.executionContext)))(fm.executionContext)
}.to(Sink.ignore).run()
}