in stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala [59:214]
override def initialAttributes = Attributes.name("ConnectionSource")
val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out)
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes): (GraphStageLogic, Future[ServerBinding]) =
throw new UnsupportedOperationException("Not used")
// TODO: Timeout on bind
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes, eagerMaterialzer: Materializer) = {
val bindingPromise = Promise[ServerBinding]()
val logic = new TimerGraphStageLogic(shape) with StageLogging {
implicit def self: ActorRef = stageActor.ref
val connectionFlowsAwaitingInitialization = new AtomicLong()
var listener: ActorRef = _
val unbindPromise = Promise[Unit]()
var unbindStarted = false
override def preStart(): Unit = {
getStageActor(receive)
tcpManager ! Tcp.Bind(self, endpoint, backlog, options, pullMode = true)
}
private def receive(evt: (ActorRef, Any)): Unit = {
val sender = evt._1
val msg = evt._2
msg match {
case Bound(localAddress) =>
listener = sender
stageActor.watch(listener)
if (isAvailable(out)) listener ! ResumeAccepting(1)
val thisStage = self
bindingPromise.success(ServerBinding(localAddress)(
() => {
// To allow unbind() to be invoked multiple times with minimal chance of dead letters, we check if
// it's already unbound before sending the message.
if (!unbindPromise.isCompleted) {
// Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage
// stopped.
thisStage.tell(Unbind, thisStage)
}
unbindPromise.future
}, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic)))
case f: CommandFailed =>
val ex = new BindFailedException {
// cannot modify the actual exception class for compatibility reasons
override def getMessage: String = s"Bind failed${f.causedByString}"
}
f.cause.foreach(ex.initCause)
bindingPromise.failure(ex)
unbindPromise.tryFailure(ex)
failStage(ex)
case c: Connected =>
push(out, connectionFor(c, sender))
case Unbind =>
if (!isClosed(out) && (listener ne null)) tryUnbind()
case Unbound =>
unbindCompleted()
case Terminated(ref) if ref == listener =>
if (unbindStarted) {
unbindCompleted()
} else {
val ex = new IllegalStateException(
"IO Listener actor terminated unexpectedly for remote endpoint [" +
endpoint.getHostString + ":" + endpoint.getPort + "]")
unbindPromise.tryFailure(ex)
failStage(ex)
}
case other =>
log.warning("Unexpected message to TcpStage: [{}]", other.getClass)
}
}
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
// Ignore if still binding
if (listener ne null) listener ! ResumeAccepting(1)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (log.isDebugEnabled) {
cause match {
case _: SubscriptionWithCancelException.NonFailureCancellation =>
log.debug(
"Unbinding from {}:{} because downstream cancelled stream",
endpoint.getHostString,
endpoint.getPort)
case ex =>
log.debug(
"Unbinding from {}:{} because of downstream failure: {}",
endpoint.getHostString,
endpoint.getPort,
ex)
}
}
tryUnbind()
}
})
private def connectionFor(connected: Connected, connection: ActorRef): StreamTcp.IncomingConnection = {
connectionFlowsAwaitingInitialization.incrementAndGet()
val tcpFlow =
Flow
.fromGraph(
new IncomingConnectionStage(
connection,
connected.remoteAddress,
halfClose,
() => connectionFlowsAwaitingInitialization.decrementAndGet()))
.via(detacher[ByteString]) // must read ahead for proper completions
// FIXME: Previous code was wrong, must add new tests
val handler = idleTimeout match {
case d: FiniteDuration => tcpFlow.join(TcpIdleTimeout(d, Some(connected.remoteAddress)))
case _ => tcpFlow
}
StreamTcp.IncomingConnection(connected.localAddress, connected.remoteAddress, handler)
}
private def tryUnbind(): Unit = {
if ((listener ne null) && !unbindStarted) {
unbindStarted = true
setKeepGoing(true)
listener ! Unbind
}
}
private def unbindCompleted(): Unit = {
stageActor.unwatch(listener)
unbindPromise.trySuccess(())
if (connectionFlowsAwaitingInitialization.get() == 0) completeStage()
else scheduleOnce(BindShutdownTimer, bindShutdownTimeout)
}
override def onTimer(timerKey: Any): Unit = timerKey match {
case BindShutdownTimer =>
completeStage() // TODO need to manually shut down instead right?
case other =>
throw new IllegalArgumentException(s"Unknown timer key $other")
}
override def postStop(): Unit = {
// a bit unexpected to succeed here rather than fail with abrupt stage termination
// but there was an existing test case covering this behavior
unbindPromise.trySuccess(())
bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished"))
}
}
(logic, bindingPromise.future)
}