in http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala [223:299]
private[http] def bindImpl(interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter): Source[Http.IncomingConnection, Future[ServerBinding]] =
bind(interface, port, connectionContext, settings, log)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[pekko.stream.scaladsl.Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `pekko.http.server.max-connections` setting. Please see the documentation in the reference.conf for more
* information about what kind of guarantees to expect.
*
* To configure additional settings for a server started using this method,
* use the `pekko.http.server` config section or pass in a [[pekko.http.scaladsl.settings.ServerSettings]] explicitly.
*/
@deprecated("Use Http().newServerAt(...)...bindFlow() to create server bindings.", since = "Akka HTTP 10.2.0")
@nowarn("msg=deprecated")
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
if (settings.previewServerSettings.enableHttp2)
log.warning(
s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")
val fullLayer: Flow[ByteString, ByteString, (Future[Done], ServerTerminator)] =
fuseServerFlow(fuseServerBidiFlow(settings, connectionContext, log), handler)
val masterTerminator = new MasterServerTerminator(log)
tcpBind(interface, choosePort(port, connectionContext, settings), settings)
.mapAsyncUnordered(settings.maxConnections) { incoming =>
try {
fullLayer
.watchTermination() {
case ((done, connectionTerminator), whenTerminates) =>
whenTerminates.onComplete { _ =>
masterTerminator.removeConnection(connectionTerminator)
}(fm.executionContext)
(done, connectionTerminator)
}
.addAttributes(prepareAttributes(settings, incoming))
.join(incoming.flow)
.mapMaterializedValue {
case (future, connectionTerminator) =>
masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
future // drop the terminator matValue, we already registered is which is all we need to do here
}
.addAttributes(cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))
.run()
.recover {
// Ignore incoming errors from the connection as they will cancel the binding.
// As far as it is known currently, these errors can only happen if a TCP error bubbles up
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) => Done
}(ExecutionContexts.sameThreadExecutionContext)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
throw e
}
}
.mapMaterializedValue { m =>
m.map(tcpBinding =>
ServerBinding(
tcpBinding.localAddress)(
() => tcpBinding.unbind(),
timeout => masterTerminator.terminate(timeout)(fm.executionContext)))(fm.executionContext)
}
.to(Sink.ignore)
.run()
}