in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [1193:1350]
public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
String rmtAddress,
boolean overtls) throws Throwable {
ProcessResult result = new ProcessResult();
final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder();
if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String consumerId = (String) result.getRetData();
if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// final String hostName = (String) result.getRetData();
if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String groupName = (String) result.getRetData();
// check master current status
checkNodeStatus(consumerId, strBuff);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final Set<String> reqTopicSet = (Set<String>) result.getRetData();
final Map<String, TreeSet<String>> reqTopicConditions =
DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
int sourceCount = request.getSourceCount();
int nodeId = request.getNodeId();
if (sourceCount > 0) {
if (nodeId < 0 || nodeId > (sourceCount - 1)) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg("Request nodeId value must be between in [0, sourceCount-1]!");
return builder.build();
}
}
final String clientJdkVer = request.hasJdkVersion()
? request.getJdkVersion()
: "";
final ConsumeType csmType = ConsumeType.CONSUME_CLIENT_REB;
OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
if (request.hasOpsTaskInfo()) {
opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
}
ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
if (request.hasSubRepInfo()) {
clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
}
// build consumer object
ConsumerInfo inConsumerInfo =
new ConsumerInfo(consumerId, overtls, groupName, csmType,
sourceCount, nodeId, reqTopicSet, reqTopicConditions,
opsTaskInfo.getCsmFromMaxOffsetCtrlId(), clientSyncInfo,
rmtAddress);
// need removed for authorize center begin
if (!this.defMetaDataService
.isConsumeTargetAuthorized(consumerId, groupName,
reqTopicSet, reqTopicConditions, strBuff, result)) {
if (strBuff.length() > 0) {
logger.warn(strBuff.toString());
}
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// need removed for authorize center end
// check resource require
if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
Integer lid = null;
try {
lid = masterRowLock.getLock(null,
StringUtils.getBytesUtf8(consumerId), true);
if (!consumerHolder.addConsumer(inConsumerInfo, false, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
} catch (IOException e) {
logger.warn("Failed to lock.", e);
} finally {
if (lid != null) {
this.masterRowLock.releaseRowLock(lid);
}
}
ConsumeGroupInfo consumeGroupInfo =
consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
.append(" visit consume group(").append(groupName)
.append(" info failure, null information").toString());
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
builder.setErrMsg(strBuff.toString());
strBuff.delete(0, strBuff.length());
return builder.build();
}
inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
if (inConsumerInfo == null) {
logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
.append(" visit consume info failure, null information").toString());
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
builder.setErrMsg(strBuff.toString());
strBuff.delete(0, strBuff.length());
return builder.build();
}
Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
currentSubInfo.put(consumerId, topicPartSubMap);
Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
if (reportInfo.getF0()) {
for (Partition info : reportInfo.getF1()) {
Map<String, Partition> partMap =
topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartitionKey(), info);
}
printReportInfo(consumerId, null, topicPartSubMap, strBuff);
}
logger.info(strBuff.append("[Consumer Register] ")
.append(consumerId).append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
strBuff.delete(0, strBuff.length());
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
builder.setBrokerConfigId(brokerStaticInfo.getF0());
if (clientSyncInfo.getBrokerConfigId() != brokerStaticInfo.getF0()) {
builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
}
builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
return builder.build();
}