in src/common/AsyncCallbackWrap.cpp [152:189]
void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
if (m_pAsyncCallBack == NULL) {
LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
return;
}
PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
if (!pResponse) {
string err = "unknow reseaon";
if (!pResponseFuture->isSendRequestOK()) {
err = "send request failed";
} else if (pResponseFuture->isTimeOut()) {
// pResponseFuture->setAsyncResponseFlag();
err = "wait response timeout";
}
MQException exception(err, -1, __FILE__, __LINE__);
LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
if (pCallback && bProducePullRequest)
pCallback->onException(exception);
} else {
try {
if (m_pArg.pPullWrapper) {
unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
if (pCallback)
pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
} else {
LOG_ERROR("pPullWrapper had been destroyed with consumer");
}
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
MQException exception("pullResult error", -1, __FILE__, __LINE__);
if (pCallback && bProducePullRequest)
pCallback->onException(exception);
}
}
}