in pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala [52:84]
private def startConsumingFromTopic(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings)(
implicit actorSystem: TypedActorSystem[_]): Future[Done] = {
implicit val ec: ExecutionContext = actorSystem.executionContext
implicit val scheduler: Scheduler = actorSystem.toClassic.scheduler
val classic = actorSystem.toClassic
val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(processorSettings.entityTypeKey)
val subscription = Subscriptions
.topics(processorSettings.topics: _*)
.withRebalanceListener(rebalanceListener.toClassic)
Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription)
// MapAsync and Retries can be replaced by reliable delivery
.mapAsync(20) { record =>
logger.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}")
retry(
() =>
shardRegion.ask[Done](replyTo => {
val purchaseProto = UserPurchaseProto.parseFrom(record.value())
UserEvents.UserPurchase(
purchaseProto.userId,
purchaseProto.product,
purchaseProto.quantity,
purchaseProto.price,
replyTo)
})(processorSettings.askTimeout, actorSystem.scheduler),
attempts = 5,
delay = 1.second)
}
.runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic)))
}