def sink()

in src/it/scala/com/gu/kinesis/KinesisTestProducer.scala [29:53]


  def sink(
      streamName: String,
      producerConfig: KinesisProducerConfiguration
  ): Sink[(String, String), Future[Seq[(String, String)]]] = {
    import scala.concurrent.ExecutionContext.Implicits.global
    val producer = KinesisTestProducer(streamName, producerConfig)

    Flow[(String, String)]
      .groupBy(maxSubstreams = Int.MaxValue, { case (key, message) => key })
      .detach
      // `parallelism = 1` enforces message ordering, which is good for testing, but too slow for normal use.
      .mapAsync(parallelism = 1) {
        case keyMessage @ (key, message) =>
          producer
            .send(key, message)
            .map(_ => keyMessage)
      }
      .mergeSubstreams
      .watchTermination()(Keep.right)
      .mapMaterializedValue { terminationFuture =>
        terminationFuture.onComplete(_ => producer.shutdown())
        NotUsed
      }
      .toMat(Sink.seq)(Keep.right)
  }