protected def configureSubscription()

in core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicSubscription.scala [43:70]


  protected def configureSubscription(partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
      partitionRevokedCB: AsyncCallback[Set[TopicPartition]]): Unit = {

    def rebalanceListener(autoSubscription: AutoSubscription): PartitionAssignmentHandler = {
      PartitionAssignmentHelpers.chain(
        addToPartitionAssignmentHandler(autoSubscription.partitionAssignmentHandler),
        new PartitionAssignmentHelpers.AsyncCallbacks(autoSubscription,
          sourceActor.ref,
          partitionAssignedCB,
          partitionRevokedCB))
    }

    subscription match {
      case sub @ TopicSubscription(topics, _, _) =>
        consumerActor.tell(
          KafkaConsumerActor.Internal.Subscribe(
            topics,
            addToPartitionAssignmentHandler(rebalanceListener(sub))),
          sourceActor.ref)
      case sub @ TopicSubscriptionPattern(topics, _, _) =>
        consumerActor.tell(
          KafkaConsumerActor.Internal.SubscribePattern(
            topics,
            addToPartitionAssignmentHandler(rebalanceListener(sub))),
          sourceActor.ref)
      case s: ManualSubscription => configureManualSubscription(s)
    }
  }