in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java [191:332]
public StringBuilder getConsumeGroupDetailInfo(HttpServletRequest req,
StringBuilder sBuffer,
ProcessResult result) {
// get group name
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.GROUPNAME, true, null, sBuffer, result)) {
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.CONSUMEGROUP, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
}
String strConsumeGroup = (String) result.getRetData();
try {
ConsumeType consumeType = ConsumeType.CONSUME_NORMAL;
boolean isNotAllocate = false;
boolean isSelectBig = true;
String sessionKey = "";
int reqSourceCount = -1;
int curSourceCount = -1;
long rebalanceCheckTime = -1;
int defBClientRate = -2;
int confBClientRate = -2;
int curBClientRate = -2;
int minRequireClientCnt = -2;
int balanceStatus = -2;
Set<String> topicSet = new HashSet<>();
List<ConsumerInfo> consumerList = new ArrayList<>();
Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<>();
Map<String, TreeSet<String>> existedTopicConditions = new HashMap<>();
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(strConsumeGroup);
if (consumeGroupInfo != null) {
if (consumeGroupInfo.getTopicSet() != null) {
topicSet = consumeGroupInfo.getTopicSet();
}
if (consumeGroupInfo.getConsumerInfoList() != null) {
consumerList = consumeGroupInfo.getConsumerInfoList();
}
if (consumeGroupInfo.getTopicConditions() != null) {
existedTopicConditions = consumeGroupInfo.getTopicConditions();
}
nodeRebInfoMap = consumeGroupInfo.getBalanceMap();
consumeType = consumeGroupInfo.getConsumeType();
balanceStatus = consumeGroupInfo.getBalanceChkStatus();
defBClientRate = consumerHolder.getDefResourceRate();
confBClientRate = consumeGroupInfo.getConfResourceRate();
curBClientRate = consumeGroupInfo.getCurResourceRate();
minRequireClientCnt = consumeGroupInfo.getMinReqClientCnt();
if (consumeType == ConsumeType.CONSUME_BAND) {
isNotAllocate = consumeGroupInfo.isNotAllocate();
isSelectBig = consumeGroupInfo.isSelectedBig();
sessionKey = consumeGroupInfo.getSessionKey();
reqSourceCount = consumeGroupInfo.getSourceCount();
curSourceCount = consumeGroupInfo.getGroupCnt();
rebalanceCheckTime = consumeGroupInfo.getCurCheckCycle();
}
}
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
.append(",\"count\":").append(consumerList.size()).append(",\"topicSet\":[");
int itemCnt = 0;
for (String topicItem : topicSet) {
if (itemCnt++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(topicItem).append("\"");
}
sBuffer.append("],\"consumeGroup\":\"").append(strConsumeGroup).append("\",\"re-rebalance\":{");
itemCnt = 0;
for (Map.Entry<String, NodeRebInfo> entry : nodeRebInfoMap.entrySet()) {
if (itemCnt++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(entry.getKey()).append("\":");
sBuffer = entry.getValue().toJsonString(sBuffer);
}
sBuffer.append("},\"isBandConsume\":\"").append(consumeType.getName()).append("\"");
// Append band consume info
if (consumeType == ConsumeType.CONSUME_BAND) {
sBuffer.append(",\"isNotAllocate\":").append(isNotAllocate)
.append(",\"sessionKey\":\"").append(sessionKey)
.append("\",\"isSelectBig\":").append(isSelectBig)
.append(",\"reqSourceCount\":").append(reqSourceCount)
.append(",\"curSourceCount\":").append(curSourceCount)
.append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
} else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) {
Tuple2<Long, List<String>> metaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
sBuffer.append(",\"topicMetaId\":").append(metaInfoTuple.getF0())
.append(",\"metaDetails\":[");
for (String itemInfo : metaInfoTuple.getF1()) {
if (itemCnt++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(itemInfo).append("\"");
}
sBuffer.append("]");
}
sBuffer.append(",\"rebInfo\":{");
if (balanceStatus == -2) {
sBuffer.append("\"isRebalanced\":false");
} else if (balanceStatus == 0) {
sBuffer.append("\"isRebalanced\":true,\"checkPasted\":false")
.append(",\"defBClientRate\":").append(defBClientRate)
.append(",\"confBClientRate\":").append(confBClientRate)
.append(",\"curBClientRate\":").append(curBClientRate)
.append(",\"minRequireClientCnt\":").append(minRequireClientCnt);
} else {
sBuffer.append("\"isRebalanced\":true,\"checkPasted\":true")
.append(",\"defBClientRate\":").append(defBClientRate)
.append(",\"confBClientRate\":").append(confBClientRate)
.append(",\"curBClientRate\":").append(curBClientRate);
}
sBuffer.append("},\"filterConds\":{");
if (existedTopicConditions != null) {
int keyCount = 0;
for (Map.Entry<String, TreeSet<String>> entry : existedTopicConditions.entrySet()) {
if (keyCount++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(entry.getKey()).append("\":[");
if (entry.getValue() != null) {
int itemCount = 0;
for (String filterCond : entry.getValue()) {
if (itemCount++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(filterCond).append("\"");
}
}
sBuffer.append("]");
}
}
sBuffer.append("}");
// Append consumer info of the group
getConsumerInfoList(consumerList, consumeType, sBuffer);
sBuffer.append("}");
} catch (Exception e) {
sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
}
return sBuffer;
}