in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java [372:504]
public StringBuilder adminQueryTopicBrokerCfgAndRunInfo(HttpServletRequest req,
StringBuilder sBuffer,
ProcessResult result) {
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
Set<String> topicNameSet = (Set<String>) result.getRetData();
// query topic configure info
Map<String, List<TopicDeployEntity>> topicConfMap =
defMetaDataService.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
BrokerRunManager brokerRunManager = master.getBrokerRunManager();
// set statistic data items
int recordCount = 0;
int brokerTotalCfgCnt = 0;
int storeTotalCfgCnt = 0;
int partTotalCfgCnt = 0;
int tmpPartTtlCount = 0;
int brokerTotalRunCnt = 0;
int storeTotalRunCnt = 0;
int partTotalRunCnt = 0;
int brokerAccPubTotalCnt = 0;
int brokerAccSubTotalCnt = 0;
int storeAccPubTotalCnt = 0;
int storeAccSubTotalCnt = 0;
int partAccPubTotalCnt = 0;
int partAccSubTotalCnt = 0;
boolean hasRunConfig = false;
boolean topicSrvAccPubStatus = false;
boolean topicSrvAccSubStatus = false;
boolean enableAuthControl = false;
// build query result
TopicPropGroup topicProps;
TopicCtrlEntity authEntity;
BrokerConfEntity brokerConfEntity;
Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) {
if (recordCount++ > 0) {
sBuffer.append(",");
}
// reset the value of data items
brokerTotalCfgCnt = 0;
brokerTotalRunCnt = 0;
storeTotalCfgCnt = 0;
storeTotalRunCnt = 0;
partTotalCfgCnt = 0;
partTotalRunCnt = 0;
brokerAccPubTotalCnt = 0;
brokerAccSubTotalCnt = 0;
storeAccPubTotalCnt = 0;
storeAccSubTotalCnt = 0;
partAccPubTotalCnt = 0;
partAccSubTotalCnt = 0;
topicSrvAccPubStatus = false;
topicSrvAccSubStatus = false;
enableAuthControl = false;
for (TopicDeployEntity entity : entry.getValue()) {
brokerConfEntity =
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
if (brokerConfEntity == null) {
continue;
}
// query detail information
topicProps = entity.getTopicProps();
brokerRunManager.getPubBrokerTopicInfo(
entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
// accumulate configured data
brokerTotalCfgCnt++;
storeTotalCfgCnt += topicProps.getNumTopicStores();
partTotalCfgCnt +=
topicProps.getNumPartitions() * topicProps.getNumTopicStores();
if (topicInfoTuple.getF2() != null) {
hasRunConfig = false;
tmpPartTtlCount =
topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
if (topicInfoTuple.getF0() && topicInfoTuple.getF2().isAcceptPublish()) {
hasRunConfig = true;
topicSrvAccPubStatus = true;
brokerAccPubTotalCnt++;
storeAccPubTotalCnt += topicInfoTuple.getF2().getTopicStoreNum();
partAccPubTotalCnt += tmpPartTtlCount;
}
if (topicInfoTuple.getF1() && topicInfoTuple.getF2().isAcceptSubscribe()) {
hasRunConfig = true;
topicSrvAccSubStatus = true;
brokerAccSubTotalCnt++;
storeAccSubTotalCnt += topicInfoTuple.getF2().getTopicStoreNum();
partAccSubTotalCnt += tmpPartTtlCount;
}
if (hasRunConfig) {
// accumulate running data
brokerTotalRunCnt++;
storeTotalRunCnt += topicInfoTuple.getF2().getTopicStoreNum();
partTotalRunCnt += tmpPartTtlCount;
}
}
}
// query authenticate information
authEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
if (authEntity != null) {
enableAuthControl = authEntity.isAuthCtrlEnable();
}
sBuffer.append("{\"topicName\":\"").append(entry.getKey())
.append("\",\"brokerTotalCfgCnt\":").append(brokerTotalCfgCnt)
.append(",\"brokerTotalRunCnt\":").append(brokerTotalRunCnt)
.append(",\"storeTotalCfgCnt\":").append(storeTotalCfgCnt)
.append(",\"storeTotalRunCnt\":").append(storeTotalRunCnt)
.append(",\"partTotalCfgCnt\":").append(partTotalCfgCnt)
.append(",\"partTotalRunCnt\":").append(partTotalRunCnt)
.append(",\"brokerAccPubTotalCnt\":").append(brokerAccPubTotalCnt)
.append(",\"brokerAccSubTotalCnt\":").append(brokerAccSubTotalCnt)
.append(",\"storeAccPubTotalCnt\":").append(storeAccPubTotalCnt)
.append(",\"storeAccSubTotalCnt\":").append(storeAccSubTotalCnt)
.append(",\"partAccPubTotalCnt\":").append(partAccPubTotalCnt)
.append(",\"partAccSubTotalCnt\":").append(partAccSubTotalCnt)
.append(",\"topicSrvAccPubStatus\":").append(topicSrvAccPubStatus)
.append(",\"topicSrvAccSubStatus\":").append(topicSrvAccSubStatus)
.append(",\"enableAuthControl\":").append(enableAuthControl)
.append("}");
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, recordCount);
return sBuffer;
}