in contrib/native/client/src/clientlib/drillClientImpl.cpp [1983:2128]
void DrillClientImpl::handleRead(ByteBuf_t inBuf,
const boost::system::error_code& error,
size_t bytes_transferred) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
// Cancel the timeout if handleRead is called
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
m_deadlineTimer.cancel();
}
if (error) {
// boost error
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
"Boost Communication Error: " << error.message() << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}
rpc::InBoundRpcMessage msg;
boost::lock_guard<boost::mutex> lockPR(this->m_prMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
AllocatedBufferPtr allocatedBuffer=NULL;
if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){
delete allocatedBuffer;
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away
m_pendingRequests--;
delete allocatedBuffer;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}else{
boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
m_cv.notify_one();
}
return;
}
if(msg.m_mode == exec::rpc::RESPONSE) {
status_t s;
switch(msg.m_rpc_type) {
case exec::user::QUERY_HANDLE:
s = processQueryId(allocatedBuffer, msg);
break;
case exec::user::PREPARED_STATEMENT:
s = processPreparedStatement(allocatedBuffer, msg);
break;
case exec::user::CATALOGS:
s = processCatalogsResult(allocatedBuffer, msg);
break;
case exec::user::SCHEMAS:
s = processSchemasResult(allocatedBuffer, msg);
break;
case exec::user::TABLES:
s = processTablesResult(allocatedBuffer, msg);
break;
case exec::user::COLUMNS:
s = processColumnsResult(allocatedBuffer, msg);
break;
case exec::user::HANDSHAKE:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
delete allocatedBuffer;
break;
case exec::user::SASL_MESSAGE:
processSaslChallenge(allocatedBuffer, msg);
break;
case exec::user::SERVER_META:
processServerMetaResult(allocatedBuffer, msg);
break;
case exec::user::ACK:
// Cancel requests will result in an ACK sent back.
// Consume silently
s = QRY_CANCELED;
delete allocatedBuffer;
break;
default:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << std::endl;)
delete allocatedBuffer;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) {
status_t s;
switch(msg.m_rpc_type) {
case exec::user::QUERY_RESULT:
s = processQueryResult(allocatedBuffer, msg);
break;
case exec::user::QUERY_DATA:
s = processQueryData(allocatedBuffer, msg);
break;
default:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << std::endl;)
delete allocatedBuffer;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
// If not QUERY_RESULT, then we think something serious has gone wrong?
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;)
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
delete allocatedBuffer;
}