def apply()

in cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala [302:347]


    def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
      Behaviors.setup { ctx =>
        import ctx.executionContext
        val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
        val address = Cluster(ctx.system).selfMember.address
        Behaviors.receiveMessage[ConsumerRebalanceEvent] {
          case TopicPartitionsAssigned(_, partitions) =>
            if (log.isInfoEnabled) {
              log.info("Consumer group '{}' assigned topic partitions to cluster member '{}': [{}]",
                typeKey.name,
                address,
                partitions.mkString(","))
            }

            val updates = shardAllocationClient.updateShardLocations(partitions.map { tp =>
              val shardId = tp.partition().toString
              // the Kafka partition number becomes the Apache Pekko shard id
              (shardId, address)
            }.toMap)

            // There's no point blocking here because the rebalance listener is triggered asynchronously. If we want to block during
            // rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead
            updates
              .onComplete {
                case Success(_) =>
                  if (log.isInfoEnabled) {
                    log.info(
                      "Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
                      typeKey.name,
                      address,
                      partitions.mkString(","))
                  }

                case Failure(ex) =>
                  log.error("A failure occurred while updating cluster shards", ex)
              }
            Behaviors.same
          case TopicPartitionsRevoked(_, partitions) =>
            val partitionsList = partitions.mkString(",")
            log.info("Consumer group '{}' revoked topic partitions from cluster member '{}': [{}]",
              typeKey.name,
              address,
              partitionsList)
            Behaviors.same
        }
      }