ErrorCode SwiftSinglePartitionReader::handleGetMessageResponse()

in aios/apps/facility/swift/client/SwiftSinglePartitionReader.cpp [561:700]


ErrorCode SwiftSinglePartitionReader::handleGetMessageResponse(ClientMetricsCollector &collector) {
    MessageResponse *response = NULL;
    ErrorCode ec = _transportAdapter->stealResponse(response);
    if (!response) {
        AUTIL_LOG(WARN, "[%s %d] Invalid broker response, response is empty", _config.topicName.c_str(), _partitionId);
        return ec;
    }
    if (response->requestuuid() == 0) {
        response->set_requestuuid(_curRequestUuid);
    } else if (response->requestuuid() != _curRequestUuid) {
        AUTIL_LOG(INFO,
                  "[%s %d] request uuid not match expect [%lu] actual [%lu]",
                  _config.topicName.c_str(),
                  _partitionId,
                  _curRequestUuid,
                  response->requestuuid());
    }

    unique_ptr<MessageResponse> auto_response(response);
    if (ERROR_BROKER_INVALID_USER == ec) {
        AUTIL_LOG(WARN,
                  "username[%s] not permitter to read[%s %d]",
                  _transportAdapter->getUsername().c_str(),
                  _config.topicName.c_str(),
                  _partitionId);
        return ec;
    }
    AUTIL_LOG(DEBUG, "handle [%s %d %ld] response", _config.topicName.c_str(), _partitionId, response->requestuuid());
    // handle session change or partition not found, maybe caused by partition changed
    HANDLE_SESSION_OR_PARTITION_CHANGE;

    if (ec != ERROR_NONE && ec != ERROR_BROKER_NO_DATA && ec != ERROR_BROKER_SOME_MESSAGE_LOST &&
        ec != ERROR_SEALED_TOPIC_READ_FINISH) {
        AUTIL_LOG(WARN,
                  "getMessage from [%s %d %ld] failed for [%s]!",
                  _config.topicName.c_str(),
                  _partitionId,
                  response->requestuuid(),
                  ErrorCode_Name(ec).c_str());
        return ec;
    }
    if (ERROR_BROKER_SOME_MESSAGE_LOST == ec) {
        AUTIL_LOG(WARN,
                  "some message lost in[%s %d, %ld]! maybe broker read file failed",
                  _config.topicName.c_str(),
                  _partitionId,
                  response->requestuuid());
    }
    assert(response);
    if (response->has_sessionid()) {
        _sessionId = response->sessionid();
    }
    if (ERROR_SEALED_TOPIC_READ_FINISH == ec && 0 == _buffer.getUnReadMsgCount()) {
        _sealedTopicReadFinish = true;
        AUTIL_LOG(INFO,
                  "[%s:%d %ld] read finsh[%s]",
                  _config.topicName.c_str(),
                  _partitionId,
                  response->requestuuid(),
                  ErrorCode_Name(ec).c_str());
        return ec;
    }
    ErrorCode ec2;
    ec2 = fillPartitionInfo(_transportAdapter->getLastTransportClosureDoneTime(), *response);
    if (ec2 != ERROR_NONE) {
        return ec2;
    }
    if (!isValidateResponse(response, ec)) {
        return ERROR_CLIENT_INVALID_RESPONSE;
    }
    _lastSuccessResponseTime = TimeUtility::currentTime();
    int64_t maxMsgId = response->maxmsgid();
    uint32_t msgSize = 0;
    int64_t lastTimeStamp = -1;
    if (response->messageformat() == MF_FB) {
        protocol::FBMessageReader reader;
        if (!reader.init(response->fbmsgs(), false)) {
            return ERROR_CLIENT_INVALID_RESPONSE;
        }
        msgSize = reader.size();
        if (msgSize > 0) {
            lastTimeStamp = (reader.read(msgSize - 1))->timestamp();
        }
    } else {
        msgSize = response->msgs_size();
        if (msgSize > 0) {
            lastTimeStamp = (response->msgs()[msgSize - 1]).timestamp();
        }
    }
    if (response->has_maxtimestamp() && -1 != lastTimeStamp) {
        if (_metricsReporter) {
            ReaderDelayCollector delayCollector;
            delayCollector.readDelay = response->maxtimestamp() - lastTimeStamp;
            delayCollector.currentDelay = TimeUtility::currentTime() - lastTimeStamp;
            _metricsReporter->reportDelay(delayCollector, &_metricsTags);
        }
    }
    collector.requestMsgCount = msgSize;
    if (ec == ERROR_BROKER_NO_DATA) {
        ec = ERROR_CLIENT_NO_MORE_MESSAGE;
    } else if (msgSize > 0) {
        if (!_buffer.addResponse(auto_response.release())) {
            string errInfo = "[" + _config.topicName + " " + StringUtil::toString(_partitionId) + " " +
                             StringUtil::toString(response->requestuuid()) +
                             "] add response to read buffer failed. has msg count: " + StringUtil::toString(msgSize);
            AUTIL_LOG(ERROR, "%s", errInfo.c_str());
            ec = ERROR_CLIENT_INVALID_RESPONSE;
            auto *errCollector = ErrorCollectorSingleton::getInstance();
            errCollector->addRequestHasErrorDataInfo(errInfo);
            auto *responseCollector = ResponseCollectorSingleton::getInstance();
            string content;
            if (response->SerializeToString(&content)) {
                responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE, content);
            } else {
                responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE_FBMSG_PART, response->fbmsgs());
            }
            // return ec; // TODO, retry or skip request bug
        }
        _nextMsgId = response->nextmsgid();
    } else {
        if (_nextMsgId <= maxMsgId) {
            _nextMsgId = response->nextmsgid();
        } else {
            int64_t oldNextId = _nextMsgId;
            _nextMsgId = maxMsgId + 1;
            if (_nextMsgId < oldNextId) {
                AUTIL_LOG(WARN,
                          "response [%ld], Next msgId rollback from [%ld] to [%ld]",
                          response->requestuuid(),
                          oldNextId,
                          _nextMsgId);
            }
            ec = ERROR_CLIENT_NO_MORE_MESSAGE;
        }
    }
    if (response->has_nexttimestamp()) {
        _nextTimestamp = response->nexttimestamp();
    }
    return ec;
}