in core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicSubscription.scala [43:70]
protected def configureSubscription(partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
partitionRevokedCB: AsyncCallback[Set[TopicPartition]]): Unit = {
def rebalanceListener(autoSubscription: AutoSubscription): PartitionAssignmentHandler = {
PartitionAssignmentHelpers.chain(
addToPartitionAssignmentHandler(autoSubscription.partitionAssignmentHandler),
new PartitionAssignmentHelpers.AsyncCallbacks(autoSubscription,
sourceActor.ref,
partitionAssignedCB,
partitionRevokedCB))
}
subscription match {
case sub @ TopicSubscription(topics, _, _) =>
consumerActor.tell(
KafkaConsumerActor.Internal.Subscribe(
topics,
addToPartitionAssignmentHandler(rebalanceListener(sub))),
sourceActor.ref)
case sub @ TopicSubscriptionPattern(topics, _, _) =>
consumerActor.tell(
KafkaConsumerActor.Internal.SubscribePattern(
topics,
addToPartitionAssignmentHandler(rebalanceListener(sub))),
sourceActor.ref)
case s: ManualSubscription => configureManualSubscription(s)
}
}