in java/e2e-spring/src/main/java/org/apache/rocketmq/utils/MQAdmin.java [109:141]
public static boolean createConsumerGroup(String clusterName, String groupId, int waitTimeSec) {
boolean createResult = false;
try {
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeBroadcastEnable(false);
subscriptionGroupConfig.setConsumeFromMinEnable(false);
subscriptionGroupConfig.setGroupName(groupId);
subscriptionGroupConfig.setConsumeMessageOrderly(true);
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
for (String addr : masterSet) {
mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
log.info(String.format("create subscription group %s to %s success.\n", groupId, addr));
}
} catch (Exception e) {
e.printStackTrace();
Assertions.fail(String.format("create groupId:%s failed", groupId));
}
// long startTime = System.currentTimeMillis();
// while (!createResult) {
// createResult = checkConsumerGroupExist(mqAdminExt, groupId);
// if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) {
// TestUtils.waitForMoment(100);
// } else {
// log.error(String.format("timeout,but create consumeGroup[%s] failed!", groupId));
// break;
// }
// }
// if (createResult) {
// log.info("create consumeGroup:{} success", groupId);
// }
return createResult;
}