in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [529:708]
public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request,
String rmtAddress,
boolean overtls) throws Exception {
// #lizard forgives
ProcessResult result = new ProcessResult();
final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder();
builder.setSuccess(false);
if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String consumerId = (String) result.getRetData();
if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// final String hostName = (String) result.getRetData();
if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String groupName = (String) result.getRetData();
checkNodeStatus(consumerId, strBuff);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final Set<String> reqTopicSet = (Set<String>) result.getRetData();
final Map<String, TreeSet<String>> reqTopicConditions =
DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
? ConsumeType.CONSUME_BAND
: ConsumeType.CONSUME_NORMAL;
final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
if (!PBParameterUtils.checkConsumerOffsetSetInfo(
csmType, reqTopicSet, requiredParts, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
Map<String, Long> requiredPartMap = (Map<String, Long>) result.getRetData();
String sessionKey = request.hasSessionKey() ? request.getSessionKey() : "";
long sessionTime = request.hasSessionTime()
? request.getSessionTime()
: System.currentTimeMillis();
int sourceCount = request.hasTotalCount()
? request.getTotalCount()
: -1;
int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId()
: TBaseConstants.META_VALUE_UNDEFINED;
List<SubscribeInfo> subscribeList =
DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
boolean isNotAllocated = true;
if (CollectionUtils.isNotEmpty(subscribeList)
|| ((request.hasNotAllocated() && !request.getNotAllocated()))) {
isNotAllocated = false;
}
boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
// build consumer object
ConsumerInfo inConsumerInfo =
new ConsumerInfo(consumerId, overtls, groupName,
reqTopicSet, reqTopicConditions, csmType,
sessionKey, sessionTime, sourceCount,
isSelectBig, requiredPartMap, rmtAddress);
if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData();
if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// need removed for authorize center begin
if (!this.defMetaDataService
.isConsumeTargetAuthorized(consumerId, groupName,
reqTopicSet, reqTopicConditions, strBuff, result)) {
if (strBuff.length() > 0) {
logger.warn(strBuff.toString());
}
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// need removed for authorize center end
Integer lid = null;
ConsumeGroupInfo consumeGroupInfo = null;
try {
lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
if (!consumerHolder.addConsumer(inConsumerInfo2,
isNotAllocated, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
consumeGroupInfo = (ConsumeGroupInfo) result.getRetData();
topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
if (CollectionUtils.isNotEmpty(subscribeList)) {
int reportCnt = 0;
Map<String, Partition> partMap;
Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
currentSubInfo.put(consumerId, topicPartSubMap);
strBuff.append("[SubInfo Report] client=").append(consumerId)
.append(", subscribed partitions=[");
for (SubscribeInfo info : subscribeList) {
partMap = topicPartSubMap.computeIfAbsent(
info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartition().getPartitionKey(), info.getPartition());
if (reportCnt++ > 0) {
strBuff.append(",");
}
strBuff.append(info.getPartitionStr());
}
strBuff.append("]");
logger.info(strBuff.toString());
strBuff.delete(0, strBuff.length());
}
heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
} catch (IOException e) {
logger.warn("Failed to lock.", e);
} finally {
if (lid != null) {
this.masterRowLock.releaseRowLock(lid);
}
}
logger.info(strBuff.append("[Consumer Register] ")
.append(consumerId).append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
strBuff.delete(0, strBuff.length());
if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) {
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setGroupFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setDefFlowControlInfo(" ");
builder.setGroupFlowControlInfo(" ");
ClusterSettingEntity defSetting =
defMetaDataService.getClusterDefSetting(false);
GroupResCtrlEntity groupResCtrlConf =
defMetaDataService.getGroupCtrlConf(groupName);
if (defSetting.enableFlowCtrl()) {
builder.setDefFlowCheckId(defSetting.getSerialId());
if (request.getDefFlowCheckId() != defSetting.getSerialId()) {
builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
}
}
if (groupResCtrlConf != null
&& groupResCtrlConf.isFlowCtrlEnable()) {
builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
if (request.getGroupFlowCheckId() != groupResCtrlConf.getSerialId()) {
builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
}
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
return builder.build();
}