in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [720:876]
public HeartResponseM2C consumerHeartbeatC2M(HeartRequestC2M request,
final String rmtAddress,
boolean overtls) throws Throwable {
// #lizard forgives
ProcessResult result = new ProcessResult();
final StringBuilder strBuff = new StringBuilder(512);
// response
HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder();
builder.setSuccess(false);
// identity valid
if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientId = (String) result.getRetData();
if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String groupName = (String) result.getRetData();
checkNodeStatus(clientId, strBuff);
ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
builder.setErrMsg(strBuff.append("Not found groupName ")
.append(groupName).append(" in holder!").toString());
return builder.build();
}
// authorize check
if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
groupName, consumeGroupInfo.getTopicSet(),
consumeGroupInfo.getTopicConditions(), rmtAddress, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// heartbeat check
try {
heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
} catch (HeartbeatException e) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
builder.setErrMsg(strBuff
.append("Update consumer node exception:")
.append(e.getMessage()).toString());
return builder.build();
}
//
Map<String, Map<String, Partition>> topicPartSubList =
currentSubInfo.get(clientId);
if (topicPartSubList == null) {
topicPartSubList = new HashMap<>();
Map<String, Map<String, Partition>> tmpTopicPartSubList =
currentSubInfo.putIfAbsent(clientId, topicPartSubList);
if (tmpTopicPartSubList != null) {
topicPartSubList = tmpTopicPartSubList;
}
}
long rebalanceId = request.hasEvent()
? request.getEvent().getRebalanceId()
: TBaseConstants.META_VALUE_UNDEFINED;
List<String> strSubInfoList = request.getSubscribeInfoList();
if (request.getReportSubscribeInfo()) {
List<SubscribeInfo> infoList = DataConverterUtil.convertSubInfo(strSubInfoList);
if (!checkIfConsist(topicPartSubList, infoList)) {
topicPartSubList.clear();
for (SubscribeInfo info : infoList) {
Map<String, Partition> partMap =
topicPartSubList.get(info.getTopic());
if (partMap == null) {
partMap = new HashMap<>();
topicPartSubList.put(info.getTopic(), partMap);
}
Partition regPart =
new Partition(info.getPartition().getBroker(),
info.getTopic(), info.getPartitionId());
partMap.put(regPart.getPartitionKey(), regPart);
}
if (rebalanceId <= 0) {
logger.warn(strBuff.append("[Consistent Warn]").append(clientId)
.append(" sub info is not consistent with master.").toString());
strBuff.delete(0, strBuff.length());
}
}
}
//
if (rebalanceId > 0) {
logger.info(strBuff.append("[Event Processed] rebalanceId=")
.append(request.getEvent().getRebalanceId())
.append(", clientId=").append(clientId).toString());
strBuff.delete(0, strBuff.length());
try {
consumeGroupInfo.settAllocated();
consumerEventManager.removeFirst(clientId, strBuff);
} catch (Throwable e) {
logger.warn("Unknown exception for remove first event:", e);
}
}
//
ConsumerEvent event =
consumerEventManager.peek(clientId);
if (event != null
&& event.getStatus() != EventStatus.PROCESSING) {
event.setStatus(EventStatus.PROCESSING);
strBuff.append("[Push Consumer Event]");
logger.info(event.toStrBuilder(clientId, strBuff).toString());
strBuff.delete(0, strBuff.length());
EventProto.Builder eventProtoBuilder =
EventProto.newBuilder();
eventProtoBuilder.setRebalanceId(event.getRebalanceId());
eventProtoBuilder.setOpType(event.getType().getValue());
eventProtoBuilder.addAllSubscribeInfo(
DataConverterUtil.formatSubInfo(event.getSubscribeInfoList()));
EventProto eventProto = eventProtoBuilder.build();
builder.setEvent(eventProto);
}
if (request.hasDefFlowCheckId()
|| request.hasGroupFlowCheckId()) {
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setGroupFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setDefFlowControlInfo(" ");
builder.setGroupFlowControlInfo(" ");
ClusterSettingEntity defSetting =
defMetaDataService.getClusterDefSetting(false);
GroupResCtrlEntity groupResCtrlConf =
defMetaDataService.getGroupCtrlConf(groupName);
if (defSetting.enableFlowCtrl()) {
builder.setDefFlowCheckId(defSetting.getSerialId());
if (request.getDefFlowCheckId() != defSetting.getSerialId()) {
builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
}
}
if (groupResCtrlConf != null
&& groupResCtrlConf.isFlowCtrlEnable()) {
builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
if (request.getGroupFlowCheckId() != groupResCtrlConf.getSerialId()) {
builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
}
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
return builder.build();
}