in core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala [129:268]
def apply[K, V](
system: pekko.actor.ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(system, Option(keyDeserializer), Option(valueDeserializer))
/**
* Create settings from the default configuration
* `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def apply[K, V](
system: pekko.actor.ClassicActorSystemProvider,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(system, Option(keyDeserializer), Option(valueDeserializer))
/**
* Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(config, Option(keyDeserializer), Option(valueDeserializer))
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
system: pekko.actor.ActorSystem,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]): ConsumerSettings[K, V] =
apply(system, keyDeserializer.toScala, valueDeserializer.toScala)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def create[K, V](
system: pekko.actor.ClassicActorSystemProvider,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]): ConsumerSettings[K, V] =
apply(system, keyDeserializer.toScala, valueDeserializer.toScala)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
config: Config,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]): ConsumerSettings[K, V] =
apply(config, keyDeserializer.toScala, valueDeserializer.toScala)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
system: pekko.actor.ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(system, keyDeserializer, valueDeserializer)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def create[K, V](
system: pekko.actor.ClassicActorSystemProvider,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(system, keyDeserializer, valueDeserializer)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]): ConsumerSettings[K, V] =
apply(config, keyDeserializer, valueDeserializer)
/**
* Create a [[org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer]] instance from the settings.
*/
def createKafkaConsumer[K, V](settings: ConsumerSettings[K, V]): Consumer[K, V] =
new KafkaConsumer[K, V](settings.getProperties,
settings.keyDeserializerOpt.orNull,
settings.valueDeserializerOpt.orNull)
}
/**
* Settings for consumers. See `pekko.kafka.consumer` section in
* `reference.conf`. Note that the [[pekko.kafka.ConsumerSettings companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
*
* The constructor is Internal API.
*/
class ConsumerSettings[K, V] @InternalApi private[kafka] (
val properties: Map[String, String],
val keyDeserializerOpt: Option[Deserializer[K]],
val valueDeserializerOpt: Option[Deserializer[V]],
val pollInterval: FiniteDuration,
val pollTimeout: FiniteDuration,
val stopTimeout: FiniteDuration,
val closeTimeout: FiniteDuration,
val commitTimeout: FiniteDuration,
val commitRefreshInterval: Duration,
val dispatcher: String,
val commitTimeWarning: FiniteDuration,
val waitClosePartition: FiniteDuration,
val positionTimeout: FiniteDuration,
val offsetForTimesTimeout: FiniteDuration,
val metadataRequestTimeout: FiniteDuration,
val drainingCheckInterval: FiniteDuration,
val enrichAsync: Option[ConsumerSettings[K, V] => Future[ConsumerSettings[K, V]]],
val consumerFactory: ConsumerSettings[K, V] => Consumer[K, V],
val connectionCheckerSettings: ConnectionCheckerSettings,
val partitionHandlerWarning: FiniteDuration,
val resetProtectionSettings: OffsetResetProtectionSettings) {