void SendCallbackWrap::operationComplete()

in src/common/AsyncCallbackWrap.cpp [59:129]


void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());

  if (m_pAsyncCallBack == NULL) {
    return;
  }
  int opaque = pResponseFuture->getOpaque();
  SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);

  if (!pResponse) {
    string err = "unknow reseaon";
    if (!pResponseFuture->isSendRequestOK()) {
      err = "send request failed";

    } else if (pResponseFuture->isTimeOut()) {
      // pResponseFuture->setAsyncResponseFlag();
      err = "wait response timeout";
    }
    if (pCallback) {
      MQException exception(err, -1, __FILE__, __LINE__);
      pCallback->onException(exception);
    }
    LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
  } else {
    try {
      SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
      if (pCallback) {
        LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
                  opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
        pCallback->onSuccess(ret);
      }
    } catch (MQException& e) {
      LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());

      // broker may return exception, need consider retry send
      int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
      int retryTimes = pResponseFuture->getRetrySendTimes();
      if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
        int64 left_timeout_ms = pResponseFuture->leftTime();
        string brokerAddr = pResponseFuture->getBrokerAddr();
        const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
        retryTimes += 1;
        LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
                 opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());

        bool exception_flag = false;
        try {
          m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
                                         (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
                                         retryTimes);
        } catch (MQClientException& e) {
          LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
                    retryTimes, m_msg.toString().data());
          exception_flag = true;
        }

        if (exception_flag == false) {
          return;  // send retry again, here need return
        }
      }

      if (pCallback) {
        MQException exception("process send response error", -1, __FILE__, __LINE__);
        pCallback->onException(exception);
      }
    }
  }
  if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
    deleteAndZero(pCallback);
  }
}