override def initialAttributes = Attributes.name()

in stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala [58:213]


  override def initialAttributes = Attributes.name("ConnectionSource")
  val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out)

  override def createLogicAndMaterializedValue(
      inheritedAttributes: Attributes): (GraphStageLogic, Future[ServerBinding]) =
    throw new UnsupportedOperationException("Not used")

  // TODO: Timeout on bind
  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes, eagerMaterialzer: Materializer) = {
    val bindingPromise = Promise[ServerBinding]()

    val logic = new TimerGraphStageLogic(shape) with StageLogging {
      implicit def self: ActorRef = stageActor.ref

      val connectionFlowsAwaitingInitialization = new AtomicLong()
      var listener: ActorRef = _
      val unbindPromise = Promise[Unit]()
      var unbindStarted = false

      override def preStart(): Unit = {
        getStageActor(receive)
        tcpManager ! Tcp.Bind(self, endpoint, backlog, options, pullMode = true)
      }

      private def receive(evt: (ActorRef, Any)): Unit = {
        val sender = evt._1
        val msg = evt._2
        msg match {
          case Bound(localAddress) =>
            listener = sender
            stageActor.watch(listener)
            if (isAvailable(out)) listener ! ResumeAccepting(1)
            val thisStage = self
            bindingPromise.success(ServerBinding(localAddress)(
              () => {
                // To allow unbind() to be invoked multiple times with minimal chance of dead letters, we check if
                // it's already unbound before sending the message.
                if (!unbindPromise.isCompleted) {
                  // Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage
                  // stopped.
                  thisStage.tell(Unbind, thisStage)
                }
                unbindPromise.future
              }, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic)))
          case f: CommandFailed =>
            val ex = new BindFailedException {
              // cannot modify the actual exception class for compatibility reasons
              override def getMessage: String = s"Bind failed${f.causedByString}"
            }
            f.cause.foreach(ex.initCause)
            bindingPromise.failure(ex)
            unbindPromise.tryFailure(ex)
            failStage(ex)
          case c: Connected =>
            push(out, connectionFor(c, sender))
          case Unbind =>
            if (!isClosed(out) && (listener ne null)) tryUnbind()
          case Unbound =>
            unbindCompleted()
          case Terminated(ref) if ref == listener =>
            if (unbindStarted) {
              unbindCompleted()
            } else {
              val ex = new IllegalStateException(
                "IO Listener actor terminated unexpectedly for remote endpoint [" +
                endpoint.getHostString + ":" + endpoint.getPort + "]")
              unbindPromise.tryFailure(ex)
              failStage(ex)
            }
          case other =>
            log.warning("Unexpected message to TcpStage: [{}]", other.getClass)
        }
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit = {
            // Ignore if still binding
            if (listener ne null) listener ! ResumeAccepting(1)
          }

          override def onDownstreamFinish(cause: Throwable): Unit = {
            if (log.isDebugEnabled) {
              cause match {
                case _: SubscriptionWithCancelException.NonFailureCancellation =>
                  log.debug(
                    "Unbinding from {}:{} because downstream cancelled stream",
                    endpoint.getHostString,
                    endpoint.getPort)
                case ex =>
                  log.debug(
                    "Unbinding from {}:{} because of downstream failure: {}",
                    endpoint.getHostString,
                    endpoint.getPort,
                    ex)
              }
            }
            tryUnbind()
          }
        })

      private def connectionFor(connected: Connected, connection: ActorRef): StreamTcp.IncomingConnection = {
        connectionFlowsAwaitingInitialization.incrementAndGet()

        val tcpFlow =
          Flow
            .fromGraph(
              new IncomingConnectionStage(
                connection,
                connected.remoteAddress,
                halfClose,
                () => connectionFlowsAwaitingInitialization.decrementAndGet()))
            .via(detacher[ByteString]) // must read ahead for proper completions

        // FIXME: Previous code was wrong, must add new tests
        val handler = idleTimeout match {
          case d: FiniteDuration => tcpFlow.join(TcpIdleTimeout(d, Some(connected.remoteAddress)))
          case _                 => tcpFlow
        }

        StreamTcp.IncomingConnection(connected.localAddress, connected.remoteAddress, handler)
      }

      private def tryUnbind(): Unit = {
        if ((listener ne null) && !unbindStarted) {
          unbindStarted = true
          setKeepGoing(true)
          listener ! Unbind
        }
      }

      private def unbindCompleted(): Unit = {
        stageActor.unwatch(listener)
        unbindPromise.trySuccess(())
        if (connectionFlowsAwaitingInitialization.get() == 0) completeStage()
        else scheduleOnce(BindShutdownTimer, bindShutdownTimeout)
      }

      override def onTimer(timerKey: Any): Unit = timerKey match {
        case BindShutdownTimer =>
          completeStage() // TODO need to manually shut down instead right?
        case other =>
          throw new IllegalArgumentException(s"Unknown timer key $other")
      }

      override def postStop(): Unit = {
        // a bit unexpected to succeed here rather than fail with abrupt stage termination
        // but there was an existing test case covering this behavior
        unbindPromise.trySuccess(())
        bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished"))
      }
    }

    (logic, bindingPromise.future)
  }