in aios/apps/facility/swift/client/SwiftSinglePartitionReader.cpp [561:700]
ErrorCode SwiftSinglePartitionReader::handleGetMessageResponse(ClientMetricsCollector &collector) {
MessageResponse *response = NULL;
ErrorCode ec = _transportAdapter->stealResponse(response);
if (!response) {
AUTIL_LOG(WARN, "[%s %d] Invalid broker response, response is empty", _config.topicName.c_str(), _partitionId);
return ec;
}
if (response->requestuuid() == 0) {
response->set_requestuuid(_curRequestUuid);
} else if (response->requestuuid() != _curRequestUuid) {
AUTIL_LOG(INFO,
"[%s %d] request uuid not match expect [%lu] actual [%lu]",
_config.topicName.c_str(),
_partitionId,
_curRequestUuid,
response->requestuuid());
}
unique_ptr<MessageResponse> auto_response(response);
if (ERROR_BROKER_INVALID_USER == ec) {
AUTIL_LOG(WARN,
"username[%s] not permitter to read[%s %d]",
_transportAdapter->getUsername().c_str(),
_config.topicName.c_str(),
_partitionId);
return ec;
}
AUTIL_LOG(DEBUG, "handle [%s %d %ld] response", _config.topicName.c_str(), _partitionId, response->requestuuid());
// handle session change or partition not found, maybe caused by partition changed
HANDLE_SESSION_OR_PARTITION_CHANGE;
if (ec != ERROR_NONE && ec != ERROR_BROKER_NO_DATA && ec != ERROR_BROKER_SOME_MESSAGE_LOST &&
ec != ERROR_SEALED_TOPIC_READ_FINISH) {
AUTIL_LOG(WARN,
"getMessage from [%s %d %ld] failed for [%s]!",
_config.topicName.c_str(),
_partitionId,
response->requestuuid(),
ErrorCode_Name(ec).c_str());
return ec;
}
if (ERROR_BROKER_SOME_MESSAGE_LOST == ec) {
AUTIL_LOG(WARN,
"some message lost in[%s %d, %ld]! maybe broker read file failed",
_config.topicName.c_str(),
_partitionId,
response->requestuuid());
}
assert(response);
if (response->has_sessionid()) {
_sessionId = response->sessionid();
}
if (ERROR_SEALED_TOPIC_READ_FINISH == ec && 0 == _buffer.getUnReadMsgCount()) {
_sealedTopicReadFinish = true;
AUTIL_LOG(INFO,
"[%s:%d %ld] read finsh[%s]",
_config.topicName.c_str(),
_partitionId,
response->requestuuid(),
ErrorCode_Name(ec).c_str());
return ec;
}
ErrorCode ec2;
ec2 = fillPartitionInfo(_transportAdapter->getLastTransportClosureDoneTime(), *response);
if (ec2 != ERROR_NONE) {
return ec2;
}
if (!isValidateResponse(response, ec)) {
return ERROR_CLIENT_INVALID_RESPONSE;
}
_lastSuccessResponseTime = TimeUtility::currentTime();
int64_t maxMsgId = response->maxmsgid();
uint32_t msgSize = 0;
int64_t lastTimeStamp = -1;
if (response->messageformat() == MF_FB) {
protocol::FBMessageReader reader;
if (!reader.init(response->fbmsgs(), false)) {
return ERROR_CLIENT_INVALID_RESPONSE;
}
msgSize = reader.size();
if (msgSize > 0) {
lastTimeStamp = (reader.read(msgSize - 1))->timestamp();
}
} else {
msgSize = response->msgs_size();
if (msgSize > 0) {
lastTimeStamp = (response->msgs()[msgSize - 1]).timestamp();
}
}
if (response->has_maxtimestamp() && -1 != lastTimeStamp) {
if (_metricsReporter) {
ReaderDelayCollector delayCollector;
delayCollector.readDelay = response->maxtimestamp() - lastTimeStamp;
delayCollector.currentDelay = TimeUtility::currentTime() - lastTimeStamp;
_metricsReporter->reportDelay(delayCollector, &_metricsTags);
}
}
collector.requestMsgCount = msgSize;
if (ec == ERROR_BROKER_NO_DATA) {
ec = ERROR_CLIENT_NO_MORE_MESSAGE;
} else if (msgSize > 0) {
if (!_buffer.addResponse(auto_response.release())) {
string errInfo = "[" + _config.topicName + " " + StringUtil::toString(_partitionId) + " " +
StringUtil::toString(response->requestuuid()) +
"] add response to read buffer failed. has msg count: " + StringUtil::toString(msgSize);
AUTIL_LOG(ERROR, "%s", errInfo.c_str());
ec = ERROR_CLIENT_INVALID_RESPONSE;
auto *errCollector = ErrorCollectorSingleton::getInstance();
errCollector->addRequestHasErrorDataInfo(errInfo);
auto *responseCollector = ResponseCollectorSingleton::getInstance();
string content;
if (response->SerializeToString(&content)) {
responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE, content);
} else {
responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE_FBMSG_PART, response->fbmsgs());
}
// return ec; // TODO, retry or skip request bug
}
_nextMsgId = response->nextmsgid();
} else {
if (_nextMsgId <= maxMsgId) {
_nextMsgId = response->nextmsgid();
} else {
int64_t oldNextId = _nextMsgId;
_nextMsgId = maxMsgId + 1;
if (_nextMsgId < oldNextId) {
AUTIL_LOG(WARN,
"response [%ld], Next msgId rollback from [%ld] to [%ld]",
response->requestuuid(),
oldNextId,
_nextMsgId);
}
ec = ERROR_CLIENT_NO_MORE_MESSAGE;
}
}
if (response->has_nexttimestamp()) {
_nextTimestamp = response->nexttimestamp();
}
return ec;
}