in common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala [46:104]
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageConsumer =
new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] = None)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageProducer =
new KafkaProducerConnector(config.kafkaHosts, maxRequestSize = maxRequestSize)
def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String, maxMessageBytes: Option[ByteSize] = None)(
implicit logging: Logging): Try[Unit] = {
val kafkaConfig = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
val topicConfig = KafkaConfiguration.configMapToKafkaConfig(
loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + "." + topicConfigKey)) ++
(maxMessageBytes.map { max =>
Map(s"max.message.bytes" -> max.size.toString)
} getOrElse Map.empty)
val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
.flatMap(client => {
val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt
val safeTopicConfig = topicConfig - topicPartitionsConfigKey
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)
def createTopic(retries: Int = 5): Try[Unit] = {
Try(client.listTopics().names().get())
.flatMap(topics =>
if (topics.contains(topic)) {
Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
} else {
Try(client.createTopics(List(nt).asJava).values().get(topic).get())
.map(_ => logging.info(this, s"created topic $topic"))
.recoverWith {
case CausedBy(_: TopicExistsException) =>
Success(logging.info(this, s"topic $topic already existed"))
case CausedBy(t: RetriableException) if retries > 0 =>
logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
Thread.sleep(1.second.toMillis)
createTopic(retries - 1)
case t =>
logging.error(this, s"ensureTopic for $topic failed due to $t")
Failure(t)
}
})
}
val result = createTopic()
client.close()
result
})
.recoverWith {
case e =>
logging.error(this, s"ensureTopic for $topic failed due to $e")
Failure(e)
}
}