in src/it/scala/com/gu/kinesis/Inspectable.scala [19:53]
def sink[A]: Sink[A, () => IndexedSeq[A]] = {
val stage: GraphStageWithMaterializedValue[SinkShape[A], ConcurrentLinkedQueue[A]] =
new GraphStageWithMaterializedValue[SinkShape[A], ConcurrentLinkedQueue[A]] {
val in: Inlet[A] = Inlet("InspectableSink")
override val shape: SinkShape[A] = SinkShape(in)
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes
): (GraphStageLogic, ConcurrentLinkedQueue[A]) = {
val nonBlockingQueue = new ConcurrentLinkedQueue[A]
val logic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(in)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
nonBlockingQueue.add(grab(in))
pull(in)
}
}
)
}
(logic, nonBlockingQueue)
}
}
Sink
.fromGraph(stage)
.mapMaterializedValue { nonBlockingQueue => () =>
nonBlockingQueue.asScala.toIndexedSeq
}
}