in aios/apps/facility/swift/client/SingleSwiftWriter.cpp [441:597]
void SingleSwiftWriter::handleSendMessageResponse() {
MessageResponse *response = NULL;
ErrorCode ec = _transportAdapter->stealResponse(response);
std::unique_ptr<MessageResponse> responsePtr(response);
if (response) {
AUTIL_LOG(DEBUG,
"handle [%s %d] response[%s]",
_config.topicName.c_str(),
_partitionId,
response->ShortDebugString().c_str());
}
setLastErrorCode(ec);
// invalid response
if (!response) {
AUTIL_LOG(WARN, "[%s %d] Invalid broker response, response is empty", _config.topicName.c_str(), _partitionId);
return;
}
if (ERROR_BROKER_INVALID_USER == ec) {
AUTIL_LOG(WARN,
"username[%s] not permitter to write[%s %d]",
_transportAdapter->getUsername().c_str(),
_config.topicName.c_str(),
_partitionId);
return;
}
if (_lastTopicVersion == -1 && response->has_topicversion()) {
_lastTopicVersion = response->topicversion();
}
if (ec == ERROR_BROKER_SESSION_CHANGED || ec == ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND) {
_writeBuffer.resetUnsendCursor();
if (_topicChangeFunc) {
int64_t topicVersion = 0;
ErrorCode funcEc;
if (response->has_topicversion()) {
topicVersion = response->topicversion();
}
if (topicVersion > 0 || _lastTopicVersion > 0) { // topicversion is empty when partiton not load
if (topicVersion != _lastTopicVersion) {
if (topicVersion != 0) {
funcEc = _topicChangeFunc(topicVersion);
} else {
funcEc = _topicChangeFunc(_lastTopicVersion);
}
if (funcEc != ERROR_NONE) {
AUTIL_LOG(WARN,
"[%s %d] set topic change failed, ec [%s]",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(funcEc).c_str());
return;
}
if (topicVersion != 0) {
AUTIL_LOG(INFO,
"[%s %d] topic version changed from [%ld] to [%ld]",
_config.topicName.c_str(),
_partitionId,
_lastTopicVersion,
topicVersion);
_lastTopicVersion = topicVersion;
}
}
} else { // compatible old server
AUTIL_LOG(INFO, "[%s %d] topic version may changed.", _config.topicName.c_str(), _partitionId);
funcEc = _topicChangeFunc(topicVersion);
if (funcEc != ERROR_NONE) {
AUTIL_LOG(WARN,
"[%s %d]set topic change failed, ec [%s]",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(funcEc).c_str());
return;
}
}
}
if (response->has_sessionid()) { // session changed
AUTIL_LOG(INFO,
"[%s %d] sessionid changed from [%ld] to [%ld]",
_config.topicName.c_str(),
_partitionId,
_sessionId,
response->sessionid());
_sessionId = response->sessionid();
}
return;
}
if (ec == ERROR_TOPIC_SEALED) {
_writeBuffer.setSealed(true);
return;
}
if (ERROR_BROKER_WRITE_VERSION_INVALID == ec) {
_hasVersionError = true;
AUTIL_LOG(ERROR,
"[%s %d] write version invalid, response[%s]",
_config.topicName.c_str(),
_partitionId,
response->ShortDebugString().c_str());
return;
}
if (!response->has_acceptedmsgcount()) {
AUTIL_LOG(WARN,
"[%s %d] Invalid response, miss field[acceptedmsgcount], ec [%s].",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
if (ec != ERROR_CLIENT_ARPC_ERROR) {
setLastErrorCode(ERROR_CLIENT_INVALID_RESPONSE);
}
return;
}
// other fatal error
if (ec != ERROR_NONE && ec != ERROR_BROKER_BUSY) {
AUTIL_LOG(WARN,
"[%s %u] sendMessage failed for [%s]!",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
_writeBuffer.resetUnsendCursor();
return;
}
// normal situation
setLastRefreshTime(_transportAdapter->getLastTransportClosureDoneTime());
if (ec == ERROR_BROKER_BUSY) {
AUTIL_LOG(WARN, "[%s %u] broker busy", _config.topicName.c_str(), _partitionId);
}
if (_sessionId == -1 && response->has_sessionid()) {
_sessionId = response->sessionid();
}
int64_t acceptedMsgCount = response->acceptedmsgcount();
int64_t checkpointId = -1;
if (!_config.safeMode || !response->has_sessionid() || !response->has_committedid() ||
!response->has_acceptedmsgbeginid()) {
_writeBuffer.updateBuffer(acceptedMsgCount, checkpointId);
} else {
vector<int64_t> msgTimestamps;
for (int i = 0; i < response->timestamps_size(); ++i) {
msgTimestamps.emplace_back(response->timestamps(i));
}
vector<pair<int64_t, int64_t>> committedCpTs;
_writeBuffer.updateBuffer(acceptedMsgCount,
response->committedid(),
response->acceptedmsgbeginid(),
checkpointId,
msgTimestamps,
committedCpTs);
if (_committedFunc && committedCpTs.size() > 0) {
if (_metricsReporter) {
_metricsReporter->reportCommitCallbackQps(&_metricsTags);
}
_committedFunc(committedCpTs);
}
}
if (checkpointId != -1) {
setCheckpointId(checkpointId);
}
}