void DrillClientImpl::handleRead()

in contrib/native/client/src/clientlib/drillClientImpl.cpp [1983:2128]


void DrillClientImpl::handleRead(ByteBuf_t inBuf,
        const boost::system::error_code& error,
        size_t bytes_transferred) {
    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
        <<  reinterpret_cast<int*>(inBuf) << std::endl;)
    if(DrillClientConfig::getQueryTimeout() > 0){
        // Cancel the timeout if handleRead is called
        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
        m_deadlineTimer.cancel();
    }
    if (error) {
        // boost error
        Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
            "Boost Communication Error: " << error.message() << std::endl;)
        handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
        return;
    }

    rpc::InBoundRpcMessage msg;
    boost::lock_guard<boost::mutex> lockPR(this->m_prMutex);

    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
    AllocatedBufferPtr allocatedBuffer=NULL;

    if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){
        delete allocatedBuffer;
        if(m_pendingRequests!=0){
            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
            getNextResult();
        }
        return;
    }

    if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away
        m_pendingRequests--;
        delete allocatedBuffer;
        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;)
        if(m_pendingRequests!=0){
            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
            getNextResult();
        }else{
            boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;)
            m_cv.notify_one();
        }

        return;
    }

    if(msg.m_mode == exec::rpc::RESPONSE) {
        status_t s;
        switch(msg.m_rpc_type) {
        case exec::user::QUERY_HANDLE:
            s = processQueryId(allocatedBuffer, msg);
            break;

        case exec::user::PREPARED_STATEMENT:
            s = processPreparedStatement(allocatedBuffer, msg);
            break;

        case exec::user::CATALOGS:
            s = processCatalogsResult(allocatedBuffer, msg);
            break;

        case exec::user::SCHEMAS:
            s = processSchemasResult(allocatedBuffer, msg);
            break;

        case exec::user::TABLES:
            s = processTablesResult(allocatedBuffer, msg);
            break;

        case exec::user::COLUMNS:
            s = processColumnsResult(allocatedBuffer, msg);
            break;

        case exec::user::HANDSHAKE:
            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
            delete allocatedBuffer;
            break;

        case exec::user::SASL_MESSAGE:
            processSaslChallenge(allocatedBuffer, msg);
            break;

        case exec::user::SERVER_META:
        	processServerMetaResult(allocatedBuffer, msg);
        	break;

        case exec::user::ACK:
            // Cancel requests will result in an ACK sent back.
            // Consume silently
            s = QRY_CANCELED;
            delete allocatedBuffer;
            break;

        default:
            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
                    << "QueryResult returned " << msg.m_rpc_type << std::endl;)
            delete allocatedBuffer;
            handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
        }

        if (m_pendingRequests != 0) {
            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
            getNextResult();
        }

        return;
    }

    if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) {
        status_t s;
        switch(msg.m_rpc_type) {
        case exec::user::QUERY_RESULT:
            s = processQueryResult(allocatedBuffer, msg);
            break;

        case exec::user::QUERY_DATA:
            s = processQueryData(allocatedBuffer, msg);
            break;

        default:
            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
                    << "QueryResult returned " << msg.m_rpc_type << std::endl;)
            delete allocatedBuffer;
            handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
        }

        if (m_pendingRequests != 0) {
            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
            getNextResult();
        }

        return;
    }

    // If not QUERY_RESULT, then we think something serious has gone wrong?
    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
        << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;)
    handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
    delete allocatedBuffer;

}