def handleSubscription()

in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [325:376]


  def handleSubscription(subscription: SubscriptionRequest): Unit =
    try {
      subscription match {
        case Assign(assignedTps) =>
          checkOverlappingRequests("Assign", sender(), assignedTps)
          val previousAssigned = consumer.assignment()
          consumer.assign((assignedTps.toSeq ++ previousAssigned.asScala).asJava)
          progressTracker.assignedPositionsAndSeek(assignedTps, consumer, positionTimeout)

        case AssignWithOffset(assignedOffsets) =>
          checkOverlappingRequests("AssignWithOffset", sender(), assignedOffsets.keySet)
          val previousAssigned = consumer.assignment()
          consumer.assign((assignedOffsets.keys.toSeq ++ previousAssigned.asScala).asJava)
          assignedOffsets.foreach {
            case (tp, offset) =>
              consumer.seek(tp, offset)
          }
          progressTracker.assignedPositions(assignedOffsets.keySet, assignedOffsets)

        case AssignOffsetsForTimes(timestampsToSearch) =>
          checkOverlappingRequests("AssignOffsetsForTimes", sender(), timestampsToSearch.keySet)
          val previousAssigned = consumer.assignment()
          consumer.assign((timestampsToSearch.keys.toSeq ++ previousAssigned.asScala).asJava)
          val topicPartitionToOffsetAndTimestamp =
            consumer.offsetsForTimes(timestampsToSearch.map { case (k, v) => (k, long2Long(v)) }.toMap.asJava,
              offsetForTimesTimeout)
          val assignedOffsets = topicPartitionToOffsetAndTimestamp.asScala.filter(_._2 != null).toMap.map {
            case (tp, oat: OffsetAndTimestamp) =>
              val offset = oat.offset()
              val ts = oat.timestamp()
              log.debug("Get offset {} from topic {} with timestamp {}", offset, tp, ts)
              consumer.seek(tp, offset)
              tp -> offset
          }
          progressTracker.assignedPositions(assignedOffsets.keySet, assignedOffsets)

        case Subscribe(topics, rebalanceHandler) =>
          val callback = new RebalanceListenerImpl(rebalanceHandler)
          partitionAssignmentHandler = callback
          consumer.subscribe(topics.toList.asJava, callback)

        case SubscribePattern(pattern, rebalanceHandler) =>
          val callback = new RebalanceListenerImpl(rebalanceHandler)
          partitionAssignmentHandler = callback
          consumer.subscribe(Pattern.compile(pattern), callback)

      }
      scheduleFirstPollTask()
      stageActorsMap = stageActorsMap.updated(consumer.assignment().asScala.toSet, sender())
    } catch {
      case NonFatal(ex) => sendFailure(ex, sender())
    }