in testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala [33:77]
def waitUntilCluster(timeout: FiniteDuration,
sleepInBetween: FiniteDuration,
adminClient: Admin,
predicate: DescribeClusterResult => Boolean,
log: Logger): Unit =
periodicalCheck("cluster state", timeout, sleepInBetween)(() => adminClient.describeCluster())(predicate)(log)
def waitUntilConsumerGroup(groupId: String,
timeout: FiniteDuration,
sleepInBetween: FiniteDuration,
adminClient: Admin,
predicate: ConsumerGroupDescription => Boolean,
log: Logger): Unit =
periodicalCheck("consumer group state", timeout, sleepInBetween)(() =>
adminClient
.describeConsumerGroups(
Collections.singleton(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(timeout.toMillis.toInt))
.describedGroups()
.get(groupId)
.get(timeout.toMillis, TimeUnit.MILLISECONDS))(predicate)(log)
def periodicalCheck[T](description: String, timeout: FiniteDuration, sleepInBetween: FiniteDuration)(
data: () => T)(predicate: T => Boolean)(log: Logger): Unit = {
val maxTries = (timeout / sleepInBetween).toInt
@tailrec def check(triesLeft: Int): Unit =
Try(predicate(data())).recover {
case ex =>
log.debug(s"Ignoring [${ex.getClass.getName}: ${ex.getMessage}] while waiting for desired state")
false
} match {
case Success(false) if triesLeft > 0 =>
Thread.sleep(sleepInBetween.toMillis)
check(triesLeft - 1)
case Success(false) =>
throw new Error(
s"Timeout while waiting for desired $description. Tried [$maxTries] times, slept [$sleepInBetween] in between.")
case Failure(ex) =>
throw ex
case Success(true) => // predicate has been fulfilled, stop checking
}
check(maxTries)
}