private def startConsumingFromTopic()

in pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala [52:84]


  private def startConsumingFromTopic(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings)(
      implicit actorSystem: TypedActorSystem[_]): Future[Done] = {

    implicit val ec: ExecutionContext = actorSystem.executionContext
    implicit val scheduler: Scheduler = actorSystem.toClassic.scheduler
    val classic = actorSystem.toClassic

    val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(processorSettings.entityTypeKey)

    val subscription = Subscriptions
      .topics(processorSettings.topics: _*)
      .withRebalanceListener(rebalanceListener.toClassic)

    Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription)
      // MapAsync and Retries can be replaced by reliable delivery
      .mapAsync(20) { record =>
        logger.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}")
        retry(
          () =>
            shardRegion.ask[Done](replyTo => {
              val purchaseProto = UserPurchaseProto.parseFrom(record.value())
              UserEvents.UserPurchase(
                purchaseProto.userId,
                purchaseProto.product,
                purchaseProto.quantity,
                purchaseProto.price,
                replyTo)
            })(processorSettings.askTimeout, actorSystem.scheduler),
          attempts = 5,
          delay = 1.second)
      }
      .runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic)))
  }