in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [325:376]
def handleSubscription(subscription: SubscriptionRequest): Unit =
try {
subscription match {
case Assign(assignedTps) =>
checkOverlappingRequests("Assign", sender(), assignedTps)
val previousAssigned = consumer.assignment()
consumer.assign((assignedTps.toSeq ++ previousAssigned.asScala).asJava)
progressTracker.assignedPositionsAndSeek(assignedTps, consumer, positionTimeout)
case AssignWithOffset(assignedOffsets) =>
checkOverlappingRequests("AssignWithOffset", sender(), assignedOffsets.keySet)
val previousAssigned = consumer.assignment()
consumer.assign((assignedOffsets.keys.toSeq ++ previousAssigned.asScala).asJava)
assignedOffsets.foreach {
case (tp, offset) =>
consumer.seek(tp, offset)
}
progressTracker.assignedPositions(assignedOffsets.keySet, assignedOffsets)
case AssignOffsetsForTimes(timestampsToSearch) =>
checkOverlappingRequests("AssignOffsetsForTimes", sender(), timestampsToSearch.keySet)
val previousAssigned = consumer.assignment()
consumer.assign((timestampsToSearch.keys.toSeq ++ previousAssigned.asScala).asJava)
val topicPartitionToOffsetAndTimestamp =
consumer.offsetsForTimes(timestampsToSearch.map { case (k, v) => (k, long2Long(v)) }.toMap.asJava,
offsetForTimesTimeout)
val assignedOffsets = topicPartitionToOffsetAndTimestamp.asScala.filter(_._2 != null).toMap.map {
case (tp, oat: OffsetAndTimestamp) =>
val offset = oat.offset()
val ts = oat.timestamp()
log.debug("Get offset {} from topic {} with timestamp {}", offset, tp, ts)
consumer.seek(tp, offset)
tp -> offset
}
progressTracker.assignedPositions(assignedOffsets.keySet, assignedOffsets)
case Subscribe(topics, rebalanceHandler) =>
val callback = new RebalanceListenerImpl(rebalanceHandler)
partitionAssignmentHandler = callback
consumer.subscribe(topics.toList.asJava, callback)
case SubscribePattern(pattern, rebalanceHandler) =>
val callback = new RebalanceListenerImpl(rebalanceHandler)
partitionAssignmentHandler = callback
consumer.subscribe(Pattern.compile(pattern), callback)
}
scheduleFirstPollTask()
stageActorsMap = stageActorsMap.updated(consumer.assignment().asScala.toSet, sender())
} catch {
case NonFatal(ex) => sendFailure(ex, sender())
}