in cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala [302:347]
def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
Behaviors.setup { ctx =>
import ctx.executionContext
val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
val address = Cluster(ctx.system).selfMember.address
Behaviors.receiveMessage[ConsumerRebalanceEvent] {
case TopicPartitionsAssigned(_, partitions) =>
if (log.isInfoEnabled) {
log.info("Consumer group '{}' assigned topic partitions to cluster member '{}': [{}]",
typeKey.name,
address,
partitions.mkString(","))
}
val updates = shardAllocationClient.updateShardLocations(partitions.map { tp =>
val shardId = tp.partition().toString
// the Kafka partition number becomes the Apache Pekko shard id
(shardId, address)
}.toMap)
// There's no point blocking here because the rebalance listener is triggered asynchronously. If we want to block during
// rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead
updates
.onComplete {
case Success(_) =>
if (log.isInfoEnabled) {
log.info(
"Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
typeKey.name,
address,
partitions.mkString(","))
}
case Failure(ex) =>
log.error("A failure occurred while updating cluster shards", ex)
}
Behaviors.same
case TopicPartitionsRevoked(_, partitions) =>
val partitionsList = partitions.mkString(",")
log.info("Consumer group '{}' revoked topic partitions from cluster member '{}': [{}]",
typeKey.name,
address,
partitionsList)
Behaviors.same
}
}