def bindAndHandleAsync()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala [79:141]


  def bindAndHandleAsync(
      handler: HttpRequest => Future[HttpResponse],
      interface: String, port: Int = DefaultPortForProtocol,
      connectionContext: ConnectionContext,
      settings: ServerSettings = ServerSettings(system),
      log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {

    val httpPlusSwitching: HttpPlusSwitching =
      if (connectionContext.isSecure) httpsWithAlpn(connectionContext.asInstanceOf[HttpsConnectionContext])
      else priorKnowledge

    val effectivePort =
      if (port >= 0) port
      else if (connectionContext.isSecure) settings.defaultHttpsPort
      else settings.defaultHttpPort

    val http1: HttpImplementation =
      Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, settings, log))
        .joinMat(GracefulTerminatorStage(system, settings).atop(http.serverLayer(settings, log = log)))(Keep.right)
    val http2: HttpImplementation =
      Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher)
        .joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry, Http().dateHeaderRendering))(Keep.right)

    val masterTerminator = new MasterServerTerminator(log)

    Tcp(system).bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false,
      Duration.Inf) // we knowingly disable idle-timeout on TCP level, as we handle it explicitly in Pekko HTTP itself
      .via(if (telemetry == NoOpTelemetry) Flow[Tcp.IncomingConnection] else telemetry.serverBinding)
      .mapAsyncUnordered(settings.maxConnections) {
        (incoming: Tcp.IncomingConnection) =>
          try {
            httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming))
              .watchTermination() {
                case (connectionTerminatorF, future) =>
                  connectionTerminatorF.foreach { connectionTerminator =>
                    masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
                    future.onComplete(_ => masterTerminator.removeConnection(connectionTerminator))(fm.executionContext)
                  }(fm.executionContext)
                  future // drop the terminator matValue, we already registered is which is all we need to do here
              }
              .join(HttpConnectionIdleTimeoutBidi(settings.idleTimeout, Some(incoming.remoteAddress)).join(
                incoming.flow))
              .addAttributes(Http.cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))
              .run().recover {
                // Ignore incoming errors from the connection as they will cancel the binding.
                // As far as it is known currently, these errors can only happen if a TCP error bubbles up
                // from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
                // See https://github.com/akka/akka/issues/17992
                case NonFatal(ex) =>
                  Done
              }(ExecutionContexts.parasitic)
          } catch {
            case NonFatal(e) =>
              log.error(e, "Could not materialize handling flow for {}", incoming)
              throw e
          }
      }.mapMaterializedValue {
        _.map(tcpBinding =>
          ServerBinding(tcpBinding.localAddress)(
            () => tcpBinding.unbind(),
            timeout => masterTerminator.terminate(timeout)(fm.executionContext)))(fm.executionContext)
      }.to(Sink.ignore).run()
  }