in cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala [66:209]
def messageExtractor[M](topic: String,
timeout: FiniteDuration,
settings: ConsumerSettings[_, _]): Future[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout, settings).map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
/**
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.asJava
/**
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractor[M](kafkaPartitions: Int): KafkaShardingMessageExtractor[M] =
new KafkaShardingMessageExtractor[M](kafkaPartitions)
/**
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
* a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](topic: String,
timeout: FiniteDuration,
entityIdExtractor: M => String,
settings: ConsumerSettings[_, _]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, entityIdExtractor))(system.dispatcher)
/**
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
* a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
topic: String,
timeout: java.time.Duration,
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
system.dispatcher)
.asJava
/**
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](kafkaPartitions: Int,
entityIdExtractor: M => String): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)
/**
* API MAY CHANGE
*
* Asynchronously return a [[pekko.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
*
* All topics used in a Consumer [[pekko.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
kafkaPartitions: Int,
entityIdExtractor: java.util.function.Function[M, String]): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, e => entityIdExtractor.apply(e))
private val metadataConsumerActorNum = new AtomicInteger
private def getPartitionCount[M](topic: String,
timeout: FiniteDuration,
settings: ConsumerSettings[_, _]): Future[Int] = {
implicit val ec: ExecutionContextExecutor = system.dispatcher
val num = metadataConsumerActorNum.getAndIncrement()
val consumerActor = system
.systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$num")
val metadataClient = MetadataClient.create(consumerActor, timeout)
val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
numPartitions.onComplete(_ => system.stop(consumerActor))
numPartitions.map { count =>
system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
count
}
}