in src/it/scala/com/gu/kinesis/KinesisTestProducer.scala [29:53]
def sink(
streamName: String,
producerConfig: KinesisProducerConfiguration
): Sink[(String, String), Future[Seq[(String, String)]]] = {
import scala.concurrent.ExecutionContext.Implicits.global
val producer = KinesisTestProducer(streamName, producerConfig)
Flow[(String, String)]
.groupBy(maxSubstreams = Int.MaxValue, { case (key, message) => key })
.detach
// `parallelism = 1` enforces message ordering, which is good for testing, but too slow for normal use.
.mapAsync(parallelism = 1) {
case keyMessage @ (key, message) =>
producer
.send(key, message)
.map(_ => keyMessage)
}
.mergeSubstreams
.watchTermination()(Keep.right)
.mapMaterializedValue { terminationFuture =>
terminationFuture.onComplete(_ => producer.shutdown())
NotUsed
}
.toMat(Sink.seq)(Keep.right)
}