def messageExtractor[M]()

in cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala [66:209]


  def messageExtractor[M](topic: String,
      timeout: FiniteDuration,
      settings: ConsumerSettings[_, _]): Future[KafkaShardingMessageExtractor[M]] =
    getPartitionCount(topic, timeout, settings).map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)

  /**
   * Java API
   *
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
   * the Kafka Consumer connection required to retrieve the number of partitions. Each call to this method will result
   * in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractor[M](topic: String,
      timeout: java.time.Duration,
      settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
    getPartitionCount(topic, timeout.asScala, settings)
      .map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
      .asJava

  /**
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractor[M](kafkaPartitions: Int): KafkaShardingMessageExtractor[M] =
    new KafkaShardingMessageExtractor[M](kafkaPartitions)

  /**
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
   * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
   * a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
   * in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractorNoEnvelope[M](topic: String,
      timeout: FiniteDuration,
      entityIdExtractor: M => String,
      settings: ConsumerSettings[_, _]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
    getPartitionCount(topic, timeout, settings)
      .map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, entityIdExtractor))(system.dispatcher)

  /**
   * Java API
   *
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
   * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
   * a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
   * in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractorNoEnvelope[M](
      topic: String,
      timeout: java.time.Duration,
      entityIdExtractor: java.util.function.Function[M, String],
      settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
    getPartitionCount(topic, timeout.asScala, settings)
      .map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
        system.dispatcher)
      .asJava

  /**
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractorNoEnvelope[M](kafkaPartitions: Int,
      entityIdExtractor: M => String): KafkaShardingNoEnvelopeExtractor[M] =
    new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)

  /**
   * API MAY CHANGE
   *
   * Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
   * based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
   *
   * The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
   *
   * All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
   * that entities are routed to the same Entity type.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
  def messageExtractorNoEnvelope[M](
      kafkaPartitions: Int,
      entityIdExtractor: java.util.function.Function[M, String]): KafkaShardingNoEnvelopeExtractor[M] =
    new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, e => entityIdExtractor.apply(e))

  private val metadataConsumerActorNum = new AtomicInteger
  private def getPartitionCount[M](topic: String,
      timeout: FiniteDuration,
      settings: ConsumerSettings[_, _]): Future[Int] = {
    implicit val ec: ExecutionContextExecutor = system.dispatcher
    val num = metadataConsumerActorNum.getAndIncrement()
    val consumerActor = system
      .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$num")
    val metadataClient = MetadataClient.create(consumerActor, timeout)
    val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
    numPartitions.onComplete(_ => system.stop(consumerActor))
    numPartitions.map { count =>
      system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
      count
    }
  }