in testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java [280:319]
private String readinessCheckScript() {
String connect = kafkaTopicConnectParam();
String command = "#!/bin/bash \n";
command += "set -e \n";
command +=
"[[ $(kafka-topics "
+ connect
+ " --describe --topic "
+ READINESS_CHECK_TOPIC
+ " | wc -l) > 1 ]] && "
+ "kafka-topics "
+ connect
+ " --delete --topic "
+ READINESS_CHECK_TOPIC
+ " \n";
command +=
"kafka-topics "
+ connect
+ " --topic "
+ READINESS_CHECK_TOPIC
+ " --create --partitions "
+ this.brokersNum
+ " --replication-factor "
+ this.brokersNum
+ " --config min.insync.replicas="
+ this.brokersNum
+ " \n";
command += "MESSAGE=\"`date -u`\" \n";
command +=
"echo \"$MESSAGE\" | kafka-console-producer --broker-list localhost:9092 --topic "
+ READINESS_CHECK_TOPIC
+ " --producer-property acks=all \n";
command +=
"kafka-console-consumer --bootstrap-server localhost:9092 --topic "
+ READINESS_CHECK_TOPIC
+ " --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n";
command += "kafka-topics " + connect + " --delete --topic " + READINESS_CHECK_TOPIC + " \n";
command += "echo \"test succeeded\" \n";
return command;
}