void PullCallbackWrap::operationComplete()

in src/common/AsyncCallbackWrap.cpp [152:189]


void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
  if (m_pAsyncCallBack == NULL) {
    LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
    return;
  }
  PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
  if (!pResponse) {
    string err = "unknow reseaon";
    if (!pResponseFuture->isSendRequestOK()) {
      err = "send request failed";

    } else if (pResponseFuture->isTimeOut()) {
      // pResponseFuture->setAsyncResponseFlag();
      err = "wait response timeout";
    }
    MQException exception(err, -1, __FILE__, __LINE__);
    LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
    if (pCallback && bProducePullRequest)
      pCallback->onException(exception);
  } else {
    try {
      if (m_pArg.pPullWrapper) {
        unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
        PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
        if (pCallback)
          pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
      } else {
        LOG_ERROR("pPullWrapper had been destroyed with consumer");
      }
    } catch (MQException& e) {
      LOG_ERROR("%s", e.what());
      MQException exception("pullResult error", -1, __FILE__, __LINE__);
      if (pCallback && bProducePullRequest)
        pCallback->onException(exception);
    }
  }
}