def getConsumer()

in common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala [46:104]


  def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
    implicit logging: Logging,
    actorSystem: ActorSystem): MessageConsumer =
    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)

  def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] = None)(
    implicit logging: Logging,
    actorSystem: ActorSystem): MessageProducer =
    new KafkaProducerConnector(config.kafkaHosts, maxRequestSize = maxRequestSize)

  def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String, maxMessageBytes: Option[ByteSize] = None)(
    implicit logging: Logging): Try[Unit] = {
    val kafkaConfig = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
    val topicConfig = KafkaConfiguration.configMapToKafkaConfig(
      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + "." + topicConfigKey)) ++
      (maxMessageBytes.map { max =>
        Map(s"max.message.bytes" -> max.size.toString)
      } getOrElse Map.empty)

    val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))

    Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
      .flatMap(client => {
        val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt
        val safeTopicConfig = topicConfig - topicPartitionsConfigKey
        val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)

        def createTopic(retries: Int = 5): Try[Unit] = {
          Try(client.listTopics().names().get())
            .flatMap(topics =>
              if (topics.contains(topic)) {
                Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
              } else {
                Try(client.createTopics(List(nt).asJava).values().get(topic).get())
                  .map(_ => logging.info(this, s"created topic $topic"))
                  .recoverWith {
                    case CausedBy(_: TopicExistsException) =>
                      Success(logging.info(this, s"topic $topic already existed"))
                    case CausedBy(t: RetriableException) if retries > 0 =>
                      logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
                      Thread.sleep(1.second.toMillis)
                      createTopic(retries - 1)
                    case t =>
                      logging.error(this, s"ensureTopic for $topic failed due to $t")
                      Failure(t)
                  }
            })
        }

        val result = createTopic()
        client.close()
        result
      })
      .recoverWith {
        case e =>
          logging.error(this, s"ensureTopic for $topic failed due to $e")
          Failure(e)
      }
  }