def sink[A]: Sink[A,()

in src/it/scala/com/gu/kinesis/Inspectable.scala [19:53]


  def sink[A]: Sink[A, () => IndexedSeq[A]] = {
    val stage: GraphStageWithMaterializedValue[SinkShape[A], ConcurrentLinkedQueue[A]] =
      new GraphStageWithMaterializedValue[SinkShape[A], ConcurrentLinkedQueue[A]] {
        val in: Inlet[A] = Inlet("InspectableSink")
        override val shape: SinkShape[A] = SinkShape(in)

        override def createLogicAndMaterializedValue(
            inheritedAttributes: Attributes
        ): (GraphStageLogic, ConcurrentLinkedQueue[A]) = {
          val nonBlockingQueue = new ConcurrentLinkedQueue[A]

          val logic = new GraphStageLogic(shape) {
            override def preStart(): Unit = pull(in)

            setHandler(
              in,
              new InHandler {
                override def onPush(): Unit = {
                  nonBlockingQueue.add(grab(in))
                  pull(in)
                }
              }
            )
          }

          (logic, nonBlockingQueue)
        }
      }

    Sink
      .fromGraph(stage)
      .mapMaterializedValue { nonBlockingQueue => () =>
        nonBlockingQueue.asScala.toIndexedSeq
      }
  }