void SingleSwiftWriter::handleSendMessageResponse()

in aios/apps/facility/swift/client/SingleSwiftWriter.cpp [441:597]


void SingleSwiftWriter::handleSendMessageResponse() {
    MessageResponse *response = NULL;
    ErrorCode ec = _transportAdapter->stealResponse(response);
    std::unique_ptr<MessageResponse> responsePtr(response);
    if (response) {
        AUTIL_LOG(DEBUG,
                  "handle [%s %d] response[%s]",
                  _config.topicName.c_str(),
                  _partitionId,
                  response->ShortDebugString().c_str());
    }
    setLastErrorCode(ec);
    // invalid response
    if (!response) {
        AUTIL_LOG(WARN, "[%s %d] Invalid broker response, response is empty", _config.topicName.c_str(), _partitionId);
        return;
    }
    if (ERROR_BROKER_INVALID_USER == ec) {
        AUTIL_LOG(WARN,
                  "username[%s] not permitter to write[%s %d]",
                  _transportAdapter->getUsername().c_str(),
                  _config.topicName.c_str(),
                  _partitionId);
        return;
    }
    if (_lastTopicVersion == -1 && response->has_topicversion()) {
        _lastTopicVersion = response->topicversion();
    }
    if (ec == ERROR_BROKER_SESSION_CHANGED || ec == ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND) {
        _writeBuffer.resetUnsendCursor();
        if (_topicChangeFunc) {
            int64_t topicVersion = 0;
            ErrorCode funcEc;
            if (response->has_topicversion()) {
                topicVersion = response->topicversion();
            }
            if (topicVersion > 0 || _lastTopicVersion > 0) { // topicversion is empty when partiton not load
                if (topicVersion != _lastTopicVersion) {
                    if (topicVersion != 0) {
                        funcEc = _topicChangeFunc(topicVersion);
                    } else {
                        funcEc = _topicChangeFunc(_lastTopicVersion);
                    }
                    if (funcEc != ERROR_NONE) {
                        AUTIL_LOG(WARN,
                                  "[%s %d] set topic change failed, ec [%s]",
                                  _config.topicName.c_str(),
                                  _partitionId,
                                  ErrorCode_Name(funcEc).c_str());
                        return;
                    }
                    if (topicVersion != 0) {
                        AUTIL_LOG(INFO,
                                  "[%s %d] topic version changed from [%ld] to [%ld]",
                                  _config.topicName.c_str(),
                                  _partitionId,
                                  _lastTopicVersion,
                                  topicVersion);
                        _lastTopicVersion = topicVersion;
                    }
                }
            } else { // compatible old server
                AUTIL_LOG(INFO, "[%s %d] topic version may changed.", _config.topicName.c_str(), _partitionId);
                funcEc = _topicChangeFunc(topicVersion);
                if (funcEc != ERROR_NONE) {
                    AUTIL_LOG(WARN,
                              "[%s %d]set topic change failed, ec [%s]",
                              _config.topicName.c_str(),
                              _partitionId,
                              ErrorCode_Name(funcEc).c_str());
                    return;
                }
            }
        }
        if (response->has_sessionid()) { // session changed
            AUTIL_LOG(INFO,
                      "[%s %d] sessionid changed from [%ld] to [%ld]",
                      _config.topicName.c_str(),
                      _partitionId,
                      _sessionId,
                      response->sessionid());
            _sessionId = response->sessionid();
        }
        return;
    }
    if (ec == ERROR_TOPIC_SEALED) {
        _writeBuffer.setSealed(true);
        return;
    }
    if (ERROR_BROKER_WRITE_VERSION_INVALID == ec) {
        _hasVersionError = true;
        AUTIL_LOG(ERROR,
                  "[%s %d] write version invalid, response[%s]",
                  _config.topicName.c_str(),
                  _partitionId,
                  response->ShortDebugString().c_str());
        return;
    }
    if (!response->has_acceptedmsgcount()) {
        AUTIL_LOG(WARN,
                  "[%s %d] Invalid response, miss field[acceptedmsgcount], ec [%s].",
                  _config.topicName.c_str(),
                  _partitionId,
                  ErrorCode_Name(ec).c_str());
        if (ec != ERROR_CLIENT_ARPC_ERROR) {
            setLastErrorCode(ERROR_CLIENT_INVALID_RESPONSE);
        }
        return;
    }

    // other fatal error
    if (ec != ERROR_NONE && ec != ERROR_BROKER_BUSY) {
        AUTIL_LOG(WARN,
                  "[%s %u] sendMessage failed for [%s]!",
                  _config.topicName.c_str(),
                  _partitionId,
                  ErrorCode_Name(ec).c_str());
        _writeBuffer.resetUnsendCursor();
        return;
    }

    // normal situation
    setLastRefreshTime(_transportAdapter->getLastTransportClosureDoneTime());
    if (ec == ERROR_BROKER_BUSY) {
        AUTIL_LOG(WARN, "[%s %u] broker busy", _config.topicName.c_str(), _partitionId);
    }
    if (_sessionId == -1 && response->has_sessionid()) {
        _sessionId = response->sessionid();
    }
    int64_t acceptedMsgCount = response->acceptedmsgcount();
    int64_t checkpointId = -1;
    if (!_config.safeMode || !response->has_sessionid() || !response->has_committedid() ||
        !response->has_acceptedmsgbeginid()) {
        _writeBuffer.updateBuffer(acceptedMsgCount, checkpointId);
    } else {
        vector<int64_t> msgTimestamps;
        for (int i = 0; i < response->timestamps_size(); ++i) {
            msgTimestamps.emplace_back(response->timestamps(i));
        }
        vector<pair<int64_t, int64_t>> committedCpTs;
        _writeBuffer.updateBuffer(acceptedMsgCount,
                                  response->committedid(),
                                  response->acceptedmsgbeginid(),
                                  checkpointId,
                                  msgTimestamps,
                                  committedCpTs);
        if (_committedFunc && committedCpTs.size() > 0) {
            if (_metricsReporter) {
                _metricsReporter->reportCommitCallbackQps(&_metricsTags);
            }
            _committedFunc(committedCpTs);
        }
    }
    if (checkpointId != -1) {
        setCheckpointId(checkpointId);
    }
}