in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java [698:867]
private FetchContext fetchMessage(PartitionSelectResult partSelectResult,
StringBuilder sBuffer) {
// Fetch task context based on selected partition
FetchContext taskContext =
new FetchContext(partSelectResult);
Partition partition = taskContext.getPartition();
String topic = partition.getTopic();
String partitionKey = partition.getPartitionKey();
long startTime = System.currentTimeMillis();
// Response from broker
ClientBroker.GetMessageResponseB2C msgRspB2C = null;
try {
msgRspB2C =
getBrokerService(partition.getBroker())
.getMessagesC2B(createBrokerGetMessageRequest(
partition, taskContext.isLastConsumed()),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
} catch (Throwable ee) {
// Process the exception
clientStatsInfo.bookFailRpcCall(TErrCodeConstants.BAD_REQUEST);
clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(400, sBuffer
.append("Get message error, reason is ")
.append(ee.toString()).toString());
sBuffer.delete(0, sBuffer.length());
return taskContext;
}
long dltTime = System.currentTimeMillis() - startTime;
if (msgRspB2C == null) {
clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(500, "Get message null");
return taskContext;
}
try {
// Process the response based on the return code
switch (msgRspB2C.getErrCode()) {
case TErrCodeConstants.SUCCESS: {
int msgSize = 0;
int msgCount = 0;
// Convert the message payload data
List<Message> tmpMessageList =
DataConverterUtil.convertMessage(topic, msgRspB2C.getMessagesList());
boolean isEscLimit =
(msgRspB2C.hasEscFlowCtrl() && msgRspB2C.getEscFlowCtrl());
// Filter the message based on its content
// Calculate the message size and do some flow control
boolean needFilter = false;
Set<String> topicFilterSet = null;
TopicProcessor topicProcessor = consumeSubInfo.getTopicProcessor(topic);
if (topicProcessor != null) {
topicFilterSet = topicProcessor.getFilterConds();
if (topicFilterSet != null && !topicFilterSet.isEmpty()) {
needFilter = true;
}
}
List<Message> messageList = new ArrayList<>();
for (Message message : tmpMessageList) {
if (message == null) {
continue;
}
if (needFilter && (TStringUtils.isBlank(message.getMsgType())
|| !topicFilterSet.contains(message.getMsgType()))) {
continue;
}
msgCount++;
messageList.add(message);
msgSize += message.getData().length;
}
// Set the process result of current stage. Process the result based on the response
long dataDltVal = msgRspB2C.hasCurrDataDlt()
? msgRspB2C.getCurrDataDlt()
: -1;
long currOffset = msgRspB2C.hasCurrOffset()
? msgRspB2C.getCurrOffset()
: TBaseConstants.META_VALUE_UNDEFINED;
long maxOffset = msgRspB2C.hasMaxOffset()
? msgRspB2C.getMaxOffset()
: TBaseConstants.META_VALUE_UNDEFINED;
boolean isRequireSlow =
(msgRspB2C.hasRequireSlow() && msgRspB2C.getRequireSlow());
clientRmtDataCache
.setPartitionContextInfo(partitionKey, currOffset, 1,
msgRspB2C.getErrCode(), isEscLimit, msgSize, 0,
dataDltVal, isRequireSlow, maxOffset);
taskContext.setSuccessProcessResult(currOffset,
sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
.append(taskContext.getUsedToken()).toString(),
messageList, maxOffset);
sBuffer.delete(0, sBuffer.length());
clientStatsInfo.bookSuccGetMsg(dltTime,
topic, partitionKey, msgCount, msgSize);
break;
}
case TErrCodeConstants.HB_NO_NODE:
case TErrCodeConstants.CERTIFICATE_FAILURE:
case TErrCodeConstants.DUPLICATE_PARTITION: {
// Release the partitions when meeting these error codes
clientRmtDataCache.removePartition(partition);
taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
break;
}
case TErrCodeConstants.SERVER_CONSUME_SPEED_LIMIT: {
// Process with server side speed limit
long defDltTime =
msgRspB2C.hasMinLimitTime()
? msgRspB2C.getMinLimitTime()
: consumerConfig.getMsgNotFoundWaitPeriodMs();
clientRmtDataCache.errRspRelease(partitionKey, topic,
taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
0, msgRspB2C.getErrCode(), false, 0,
defDltTime, isFilterConsume(topic), TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED);
taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
break;
}
case TErrCodeConstants.NOT_FOUND:
case TErrCodeConstants.FORBIDDEN:
case TErrCodeConstants.SERVICE_UNAVAILABLE:
case TErrCodeConstants.MOVED:
default: {
// Slow down the request based on the limitation configuration when meet these errors
long limitDlt = 300;
switch (msgRspB2C.getErrCode()) {
case TErrCodeConstants.FORBIDDEN: {
limitDlt = 2000;
break;
}
case TErrCodeConstants.SERVICE_UNAVAILABLE: {
limitDlt = 300;
break;
}
case TErrCodeConstants.MOVED: {
limitDlt = 200;
break;
}
case TErrCodeConstants.NOT_FOUND: {
limitDlt = consumerConfig.getMsgNotFoundWaitPeriodMs();
break;
}
default: {
//
}
}
clientRmtDataCache.errRspRelease(partitionKey, topic,
taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
0, msgRspB2C.getErrCode(), false, 0,
limitDlt, isFilterConsume(topic), -1, TBaseConstants.META_VALUE_UNDEFINED);
taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
break;
}
}
if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
}
return taskContext;
} catch (Throwable ee) {
clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
logger.error("Process response code error", ee);
clientRmtDataCache.succRspRelease(partitionKey, topic,
taskContext.getUsedToken(), false, isFilterConsume(topic),
TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
taskContext.setFailProcessResult(500, sBuffer
.append("Get message failed,topic=")
.append(topic).append(",partition=").append(partition)
.append(", throw info is ").append(ee.toString()).toString());
sBuffer.delete(0, sBuffer.length());
}
return taskContext;
}