private[http] def bindImpl()

in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala [224:300]


  private[http] def bindImpl(interface: String, port: Int,
      connectionContext: ConnectionContext,
      settings: ServerSettings,
      log: LoggingAdapter): Source[Http.IncomingConnection, Future[ServerBinding]] =
    bind(interface, port, connectionContext, settings, log)

  /**
   * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
   * [[pekko.stream.scaladsl.Flow]] for processing all incoming connections.
   *
   * The number of concurrently accepted connections can be configured by overriding
   * the `pekko.http.server.max-connections` setting. Please see the documentation in the reference.conf for more
   * information about what kind of guarantees to expect.
   *
   * To configure additional settings for a server started using this method,
   * use the `pekko.http.server` config section or pass in a [[pekko.http.scaladsl.settings.ServerSettings]] explicitly.
   */
  @deprecated("Use Http().newServerAt(...)...bindFlow() to create server bindings.", since = "Akka HTTP 10.2.0")
  @nowarn("msg=deprecated")
  def bindAndHandle(
      handler: Flow[HttpRequest, HttpResponse, Any],
      interface: String, port: Int = DefaultPortForProtocol,
      connectionContext: ConnectionContext = defaultServerHttpContext,
      settings: ServerSettings = ServerSettings(system),
      log: LoggingAdapter = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
    if (settings.previewServerSettings.enableHttp2)
      log.warning(
        s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")

    val fullLayer: Flow[ByteString, ByteString, (Future[Done], ServerTerminator)] =
      fuseServerFlow(fuseServerBidiFlow(settings, connectionContext, log), handler)

    val masterTerminator = new MasterServerTerminator(log)

    tcpBind(interface, choosePort(port, connectionContext, settings), settings)
      .mapAsyncUnordered(settings.maxConnections) { incoming =>
        try {
          fullLayer
            .watchTermination() {
              case ((done, connectionTerminator), whenTerminates) =>
                whenTerminates.onComplete { _ =>
                  masterTerminator.removeConnection(connectionTerminator)
                }(fm.executionContext)
                (done, connectionTerminator)
            }
            .addAttributes(prepareAttributes(settings, incoming))
            .join(incoming.flow)
            .mapMaterializedValue {
              case (future, connectionTerminator) =>
                masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
                future // drop the terminator matValue, we already registered is which is all we need to do here
            }
            .addAttributes(cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))
            .run()
            .recover {
              // Ignore incoming errors from the connection as they will cancel the binding.
              // As far as it is known currently, these errors can only happen if a TCP error bubbles up
              // from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
              // See https://github.com/akka/akka/issues/17992
              case NonFatal(ex) => Done
            }(ExecutionContexts.parasitic)
        } catch {
          case NonFatal(e) =>
            log.error(e, "Could not materialize handling flow for {}", incoming)
            throw e
        }
      }
      .mapMaterializedValue { m =>
        m.map(tcpBinding =>
          ServerBinding(
            tcpBinding.localAddress)(
            () => tcpBinding.unbind(),
            timeout => masterTerminator.terminate(timeout)(fm.executionContext)))(fm.executionContext)
      }
      .to(Sink.ignore)
      .run()
  }