in atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/WebSocketSessionManager.scala [53:105]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
private var dataSourcePushed = false
private var dataSource: Source[JsonSupport, NotUsed] = _
// Messages that shouldn't get dropped and thus shouldn't go through the queue
// with other data
private var highPriorityMessages: List[JsonSupport] = Nil
setHandlers(in, out, this)
override def preStart(): Unit = {
dataSource = registerFunc(streamMeta)
}
override def onPush(): Unit = {
try {
val lwcExpressions = grab(in) match {
case str: ByteString => LwcMessages.parseBatch(str).asInstanceOf[List[LwcExpression]]
case v => throw new MatchError(s"invalid type: ${v.getClass.getName}")
}
val metadata =
lwcExpressions.map(v => ExpressionMetadata(v.expression, v.exprType, v.step))
// Update subscription here
val messages = subscribeFunc(streamMeta.streamId, metadata)
highPriorityMessages = highPriorityMessages ::: messages
} catch {
case NonFatal(t) =>
highPriorityMessages = DiagnosticMessage.error(t) :: highPriorityMessages
} finally {
// Push out dataSource only once
if (!dataSourcePushed) {
push(out, dataSource)
dataSourcePushed = true
} else {
// Only pull when no push happened, because push should have triggered a pull
// from downstream
onPull()
}
}
}
override def onPull(): Unit = {
if (highPriorityMessages.nonEmpty) {
push(out, Source(highPriorityMessages))
highPriorityMessages = Nil
} else {
pull(in)
}
}
}
}