in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java [279:422]
public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
final long startTime = System.currentTimeMillis();
final GetMessageResponseB2C.Builder builder =
GetMessageResponseB2C.newBuilder();
builder.setSuccess(false);
builder.setCurrOffset(-1);
builder.setEscFlowCtrl(false);
builder.setCurrDataDlt(-1);
builder.setMinLimitTime(0);
if (!this.started.get()
|| ServiceStatusHolder.isReadServiceStop()) {
builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
builder.setErrMsg("Read StoreService temporary unavailable!");
return builder.build();
}
ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
// get and check clientId field
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientId = (String) result.getRetData();
// get and check groupName field
if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String groupName = (String) result.getRetData();
// get and check topicName field
if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
this.metadataManager, strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// get consumer info
final String topicName = (String) result.getRetData();
final int partitionId = request.getPartitionId();
boolean isEscFlowCtrl = request.hasEscFlowCtrl() && request.getEscFlowCtrl();
String partStr = getPartStr(groupName, topicName, partitionId);
String consumerId = null;
ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
if (consumerNodeInfo != null) {
consumerId = consumerNodeInfo.getConsumerId();
}
if (consumerId == null) {
logger.warn(strBuffer.append("[UnRegistered Consumer]").append(clientId)
.append(TokenConstants.SEGMENT_SEP).append(partStr).toString());
strBuffer.delete(0, strBuffer.length());
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
builder.setErrMsg(strBuffer.append("UnRegistered Consumer:")
.append(clientId)
.append(", you have to register firstly!").toString());
return builder.build();
}
if (!clientId.equals(consumerId)) {
strBuffer.append("[Duplicated Request] Partition=").append(partStr)
.append(" of Broker=").append(tubeConfig.getBrokerId())
.append(" has been consumed by ").append(consumerId)
.append(";Current consumer ").append(clientId);
logger.warn(strBuffer.toString());
builder.setErrCode(TErrCodeConstants.DUPLICATE_PARTITION);
builder.setErrMsg(strBuffer.toString());
return builder.build();
}
String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
try {
heartbeatManager.updConsumerNode(getHeartbeatNodeId(clientId, partStr));
} catch (HeartbeatException e) {
logger.warn(strBuffer.append("[Invalid Request]").append(clientId)
.append(TokenConstants.SEGMENT_SEP).append(topicName)
.append(TokenConstants.ATTR_SEP).append(partitionId).toString());
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
builder.setErrMsg(e.getMessage());
return builder.build();
}
Integer topicStatusId = this.metadataManager.getClosedTopicStatusId(topicName);
if ((topicStatusId != null)
&& (topicStatusId > TStatusConstants.STATUS_TOPIC_SOFT_DELETE)) {
strBuffer.append("[Partition Closed] Partition has been closed, for topic=")
.append(topicName).append(",partitionId=").append(partitionId)
.append(" of Broker=").append(tubeConfig.getBrokerId());
logger.warn(strBuffer.toString());
builder.setErrCode(TErrCodeConstants.FORBIDDEN);
builder.setErrMsg(strBuffer.toString());
return builder.build();
}
// query data from store manager.
boolean isGetStore = false;
MessageStore dataStore = null;
try {
dataStore = this.storeManager.getOrCreateMessageStore(topicName, partitionId);
isGetStore = true;
GetMessageResult msgResult =
getMessages(dataStore, consumerNodeInfo, groupName, topicName, partitionId,
request.getLastPackConsumed(), request.getManualCommitOffset(),
clientId, this.tubeConfig.getHostName(), rmtAddrInfo, isEscFlowCtrl, strBuffer);
if (msgResult.isSuccess) {
long endTime = System.currentTimeMillis();
consumerNodeInfo.setLastProcInfo(endTime,
msgResult.lastRdDataOffset, msgResult.totalMsgSize);
getCounterGroup.add(msgResult.tmpCounters);
AuditUtils.addConsumeRecord(msgResult.tmpCounters);
builder.setEscFlowCtrl(false);
builder.setRequireSlow(msgResult.isSlowFreq);
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setCurrOffset(msgResult.reqOffset);
builder.setCurrDataDlt(msgResult.waitTime);
builder.setErrMsg("OK!");
builder.addAllMessages(msgResult.transferedMessageList);
builder.setMaxOffset(msgResult.getMaxOffset());
BrokerSrvStatsHolder.updGetMsgLatency(endTime - startTime);
return builder.build();
} else {
builder.setErrCode(msgResult.getRetCode());
builder.setErrMsg(msgResult.getErrInfo());
builder.setMinLimitTime((int) msgResult.waitTime);
return builder.build();
}
} catch (Throwable ee) {
strBuffer.delete(0, strBuffer.length());
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
if (isGetStore) {
strBuffer.append("[GetMessage] Throwable error while getMessage,")
.append(ee.getMessage()).append(", position is")
.append(this.tubeConfig.getBrokerId())
.append(TokenConstants.ATTR_SEP).append(topicName)
.append(TokenConstants.ATTR_SEP).append(partitionId);
logger.error(strBuffer.toString(), ee);
builder.setErrMsg(ee.getMessage() == null ? strBuffer.toString() : ee.getMessage());
} else {
builder.setErrMsg(strBuffer.append("Get the store of topic ")
.append(topicName).append(" in partition ")
.append(partitionId).append(" failure!").toString());
}
return builder.build();
}
}