override def createLogic()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [186:288]


    override def createLogic(effectiveAttributes: Attributes) =
      new GraphStageLogic(shape) with InHandler with OutHandler {
        private var entitySource: SubSourceOutlet[ResponseOutput] = _
        private def entitySubstreamStarted = entitySource ne null
        private def idle = this
        private var completionDeferred = false
        private var completeOnMessageEnd = false

        def setIdleHandlers(): Unit =
          if (completeOnMessageEnd || completionDeferred) completeStage()
          else setHandlers(responseOutputIn, httpResponseOut, idle)

        def onPush(): Unit = grab(responseOutputIn) match {
          case ResponseStart(statusCode, protocol, attributes, headers, entityCreator, closeRequested) =>
            val entity = createEntity(entityCreator).withSizeLimit(parserSettings.maxContentLength)
            push(httpResponseOut, new HttpResponse(statusCode, headers, attributes, entity, protocol))
            completeOnMessageEnd = closeRequested

          case MessageStartError(_, info) =>
            throw IllegalResponseException(info)

          case other =>
            throw new IllegalStateException(s"ResponseStart expected but $other received.")
        }

        def onPull(): Unit = {
          if (!entitySubstreamStarted) pull(responseOutputIn)
        }

        override def onDownstreamFinish(): Unit = {
          // if downstream cancels while streaming entity,
          // make sure we also cancel the entity source, but
          // after being done with streaming the entity
          if (entitySubstreamStarted) {
            completionDeferred = true
          } else {
            completeStage()
          }
        }

        setIdleHandlers()

        // with a strict message there still is a MessageEnd to wait for
        lazy val waitForMessageEnd = new InHandler with OutHandler {
          def onPush(): Unit = grab(responseOutputIn) match {
            case MessageEnd =>
              if (isAvailable(httpResponseOut)) pull(responseOutputIn)
              setIdleHandlers()
            case other => throw new IllegalStateException(s"MessageEnd expected but $other received.")
          }

          override def onPull(): Unit = {
            // ignore pull as we will anyways pull when we get MessageEnd
          }
        }

        // with a streamed entity we push the chunks into the substream
        // until we reach MessageEnd
        private lazy val substreamHandler = new InHandler with OutHandler {
          override def onPush(): Unit = grab(responseOutputIn) match {
            case MessageEnd =>
              entitySource.complete()
              entitySource = null
              // there was a deferred pull from upstream
              // while we were streaming the entity
              if (isAvailable(httpResponseOut)) pull(responseOutputIn)
              setIdleHandlers()

            case messagePart =>
              entitySource.push(messagePart)
          }

          override def onPull(): Unit = pull(responseOutputIn)

          override def onUpstreamFinish(): Unit = {
            entitySource.complete()
            completeStage()
          }

          override def onUpstreamFailure(reason: Throwable): Unit = {
            entitySource.fail(reason)
            failStage(reason)
          }
        }

        private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = {
          creator match {
            case StrictEntityCreator(entity) =>
              // upstream demanded one element, which it just got
              // but we want MessageEnd as well
              pull(responseOutputIn)
              setHandler(responseOutputIn, waitForMessageEnd)
              setHandler(httpResponseOut, waitForMessageEnd)
              entity

            case StreamedEntityCreator(creator) =>
              entitySource = new SubSourceOutlet[ResponseOutput]("EntitySource")
              entitySource.setHandler(substreamHandler)
              setHandler(responseOutputIn, substreamHandler)
              creator(Source.fromGraph(entitySource.source))
          }
        }
      }