override def createLogic()

in atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/WebSocketSessionManager.scala [53:105]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) with InHandler with OutHandler {

      private var dataSourcePushed = false
      private var dataSource: Source[JsonSupport, NotUsed] = _

      // Messages that shouldn't get dropped and thus shouldn't go through the queue
      // with other data
      private var highPriorityMessages: List[JsonSupport] = Nil

      setHandlers(in, out, this)

      override def preStart(): Unit = {
        dataSource = registerFunc(streamMeta)
      }

      override def onPush(): Unit = {
        try {
          val lwcExpressions = grab(in) match {
            case str: ByteString => LwcMessages.parseBatch(str).asInstanceOf[List[LwcExpression]]
            case v               => throw new MatchError(s"invalid type: ${v.getClass.getName}")
          }
          val metadata =
            lwcExpressions.map(v => ExpressionMetadata(v.expression, v.exprType, v.step))
          // Update subscription here
          val messages = subscribeFunc(streamMeta.streamId, metadata)
          highPriorityMessages = highPriorityMessages ::: messages
        } catch {
          case NonFatal(t) =>
            highPriorityMessages = DiagnosticMessage.error(t) :: highPriorityMessages
        } finally {
          // Push out dataSource only once
          if (!dataSourcePushed) {
            push(out, dataSource)
            dataSourcePushed = true
          } else {
            // Only pull when no push happened, because push should have triggered a pull
            // from downstream
            onPull()
          }
        }
      }

      override def onPull(): Unit = {
        if (highPriorityMessages.nonEmpty) {
          push(out, Source(highPriorityMessages))
          highPriorityMessages = Nil
        } else {
          pull(in)
        }
      }
    }
  }