in src/common/AsyncCallbackWrap.cpp [59:129]
void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
if (m_pAsyncCallBack == NULL) {
return;
}
int opaque = pResponseFuture->getOpaque();
SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
if (!pResponse) {
string err = "unknow reseaon";
if (!pResponseFuture->isSendRequestOK()) {
err = "send request failed";
} else if (pResponseFuture->isTimeOut()) {
// pResponseFuture->setAsyncResponseFlag();
err = "wait response timeout";
}
if (pCallback) {
MQException exception(err, -1, __FILE__, __LINE__);
pCallback->onException(exception);
}
LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
} else {
try {
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
if (pCallback) {
LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
pCallback->onSuccess(ret);
}
} catch (MQException& e) {
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
// broker may return exception, need consider retry send
int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
int retryTimes = pResponseFuture->getRetrySendTimes();
if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
int64 left_timeout_ms = pResponseFuture->leftTime();
string brokerAddr = pResponseFuture->getBrokerAddr();
const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
retryTimes += 1;
LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
bool exception_flag = false;
try {
m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
(RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
retryTimes);
} catch (MQClientException& e) {
LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
retryTimes, m_msg.toString().data());
exception_flag = true;
}
if (exception_flag == false) {
return; // send retry again, here need return
}
}
if (pCallback) {
MQException exception("process send response error", -1, __FILE__, __LINE__);
pCallback->onException(exception);
}
}
}
if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
deleteAndZero(pCallback);
}
}